setLogLevel(LOG_DEBUG); $rk->addBrokers("127.0.0.1"); $start = microtime(true); $topic_obj = $rk->newTopic($topic); /********************* 初始化生产者配置项 end **************************/ if( empty($msg_data) ){ return; } if( is_array($msg_data) || is_object($msg_data) ){ // 对象、数组转换 $msg_data = json_encode($msg_data); } $topic_obj->produce(RD_KAFKA_PARTITION_UA, 0, $msg_data); $rk->poll(0); echo 'produce a msg to topic: '.$topic. PHP_EOL; } function mockProduce( ){ $msg_data = '{ "type":9, "title":"群推消息", "content":"这是一个广播" }'; kafkaProducer('gps_location_data',$msg_data); } function debug_log( $filename, $data ){ if(!APP_DEBUG){ return; } $file = SOLUTION_LOG_PATH .APP_PREFIX .'/'.date("Ymd", time()) ."/".$filename.".log"; $folder=dirname($file); if (!is_dir($folder)){ mkdir($folder,0777,true); } echo '[' . date('Y-m-d H:i:s') . ']' . $data . PHP_EOL; file_put_contents($file, '[' . date('Y-m-d H:i:s') . ']' . $data . PHP_EOL,FILE_APPEND); }