12345678910111213141516171819202122232425262728293031323334353637383940 |
- <?php
-
- function kafkaProducer( $topic, $msg_data ){
- if (!extension_loaded('rdkafka')){
- echo 'rdkafka extension is not installed!!'.PHP_EOL;
- return false;
- }
- /********************* 初始化生产者配置项 start **************************/
- //考勤记录分析结果生产者
- $rk = new RdKafka\Producer();
- $rk->setLogLevel(LOG_DEBUG);
- $rk->addBrokers("127.0.0.1");
- $start = microtime(true);
- $topic_obj = $rk->newTopic($topic);
- /********************* 初始化生产者配置项 end **************************/
- if( empty($msg_data) ){
- return;
- }
- if( is_array($msg_data) || is_object($msg_data) ){ // 对象、数组转换
- $msg_data = json_encode($msg_data);
- }
- $topic_obj->produce(RD_KAFKA_PARTITION_UA, 0, $msg_data);
- $rk->poll(0);
- echo 'produce a msg to topic: '.$topic. PHP_EOL;
-
- }
-
-
- function mockProduce( ){
- $msg_data = '{
- "type":9,
- "title":"群推消息",
- "content":"这是一个广播"
- }';
- kafkaProducer('gps_location_data',$msg_data);
- }
-
|