setLogLevel(LOG_DEBUG); $rk->addBrokers("127.0.0.1"); $start = microtime(true); $topic_obj = $rk->newTopic($topic); /********************* 初始化生产者配置项 end **************************/ if( empty($msg_data) ){ return; } if( is_array($msg_data) || is_object($msg_data) ){ // 对象、数组转换 $msg_data = json_encode($msg_data); } $topic_obj->produce(RD_KAFKA_PARTITION_UA, 0, $msg_data); $rk->poll(0); echo 'produce a msg to topic: '.$topic. PHP_EOL; } function mockProduce( ){ $msg_data = '{ "type":9, "title":"群推消息", "content":"这是一个广播" }'; kafkaProducer('gps_location_data',$msg_data); }