pub_func.php 1020 B

12345678910111213141516171819202122232425262728293031323334353637383940
  1. <?php
  2. function kafkaProducer( $topic, $msg_data ){
  3. if (!extension_loaded('rdkafka')){
  4. echo 'rdkafka extension is not installed!!'.PHP_EOL;
  5. return false;
  6. }
  7. /********************* 初始化生产者配置项 start **************************/
  8. //考勤记录分析结果生产者
  9. $rk = new RdKafka\Producer();
  10. $rk->setLogLevel(LOG_DEBUG);
  11. $rk->addBrokers("127.0.0.1");
  12. $start = microtime(true);
  13. $topic_obj = $rk->newTopic($topic);
  14. /********************* 初始化生产者配置项 end **************************/
  15. if( empty($msg_data) ){
  16. return;
  17. }
  18. if( is_array($msg_data) || is_object($msg_data) ){ // 对象、数组转换
  19. $msg_data = json_encode($msg_data);
  20. }
  21. $topic_obj->produce(RD_KAFKA_PARTITION_UA, 0, $msg_data);
  22. $rk->poll(0);
  23. echo 'produce a msg to topic: '.$topic. PHP_EOL;
  24. }
  25. function mockProduce( ){
  26. $msg_data = '{
  27. "type":9,
  28. "title":"群推消息",
  29. "content":"这是一个广播"
  30. }';
  31. kafkaProducer('gps_location_data',$msg_data);
  32. }