123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354 |
- <?php
-
- function kafkaProducer( $topic, $msg_data ){
- if (!extension_loaded('rdkafka')){
- echo 'rdkafka extension is not installed!!'.PHP_EOL;
- return false;
- }
- /********************* 初始化生产者配置项 start **************************/
- //考勤记录分析结果生产者
- $rk = new RdKafka\Producer();
- $rk->setLogLevel(LOG_DEBUG);
- $rk->addBrokers("127.0.0.1");
- $start = microtime(true);
- $topic_obj = $rk->newTopic($topic);
- /********************* 初始化生产者配置项 end **************************/
- if( empty($msg_data) ){
- return;
- }
- if( is_array($msg_data) || is_object($msg_data) ){ // 对象、数组转换
- $msg_data = json_encode($msg_data);
- }
- $topic_obj->produce(RD_KAFKA_PARTITION_UA, 0, $msg_data);
- $rk->poll(0);
- echo 'produce a msg to topic: '.$topic. PHP_EOL;
-
- }
-
-
- function mockProduce( ){
- $msg_data = '{
- "type":9,
- "title":"群推消息",
- "content":"这是一个广播"
- }';
- kafkaProducer('gps_location_data',$msg_data);
- }
-
-
- function debug_log( $filename, $data ){
- if(!APP_DEBUG){
- return;
- }
- $file = SOLUTION_LOG_PATH .APP_PREFIX .'/'.date("Ymd", time()) ."/".$filename.".log";
- $folder=dirname($file);
- if (!is_dir($folder)){
- mkdir($folder,0777,true);
- }
- echo '[' . date('Y-m-d H:i:s') . ']' . $data . PHP_EOL;
- file_put_contents($file, '[' . date('Y-m-d H:i:s') . ']' . $data . PHP_EOL,FILE_APPEND);
- }
-
|