pub_func.php 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. <?php
  2. function kafkaProducer( $topic, $msg_data ){
  3. if (!extension_loaded('rdkafka')){
  4. echo 'rdkafka extension is not installed!!'.PHP_EOL;
  5. return false;
  6. }
  7. /********************* 初始化生产者配置项 start **************************/
  8. //考勤记录分析结果生产者
  9. $rk = new RdKafka\Producer();
  10. $rk->setLogLevel(LOG_DEBUG);
  11. $rk->addBrokers("127.0.0.1");
  12. $start = microtime(true);
  13. $topic_obj = $rk->newTopic($topic);
  14. /********************* 初始化生产者配置项 end **************************/
  15. if( empty($msg_data) ){
  16. return;
  17. }
  18. if( is_array($msg_data) || is_object($msg_data) ){ // 对象、数组转换
  19. $msg_data = json_encode($msg_data);
  20. }
  21. $topic_obj->produce(RD_KAFKA_PARTITION_UA, 0, $msg_data);
  22. $rk->poll(0);
  23. echo 'produce a msg to topic: '.$topic. PHP_EOL;
  24. }
  25. function mockProduce( ){
  26. $msg_data = '{
  27. "type":9,
  28. "title":"群推消息",
  29. "content":"这是一个广播"
  30. }';
  31. kafkaProducer('gps_location_data',$msg_data);
  32. }
  33. function debug_log( $filename, $data ){
  34. if(!APP_DEBUG){
  35. return;
  36. }
  37. $file = SOLUTION_LOG_PATH .APP_PREFIX .'/'.date("Ymd", time()) ."/".$filename.".log";
  38. $folder=dirname($file);
  39. if (!is_dir($folder)){
  40. mkdir($folder,0777,true);
  41. }
  42. echo '[' . date('Y-m-d H:i:s') . ']' . $data . PHP_EOL;
  43. file_put_contents($file, '[' . date('Y-m-d H:i:s') . ']' . $data . PHP_EOL,FILE_APPEND);
  44. }