deviceModel = $deviceModel; } /** * 列表 * @time 2022年02月14日 11:11 */ public function index() { $rk = new \RdKafka\Producer(); $broker_list=Env::get('kafka.kafka_broker_list'); $rk->addBrokers($broker_list); //kafka服务器地址 // $rk->addBrokers("10.0.0.1,10.0.0.2"); //多服务器地址 $kafka_topic=Env::get('kafka.kafka_topic'); $topic = $rk->newTopic($kafka_topic); //topic名称 $arr=array( 'DeviceId'=>'869688888888888', 'DeviceTime'=>time(), 'Latitude'=>30.192289977186, 'Longitude'=>120.20063757299, 'Altitude'=>108.951, ); $topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($arr)); $rk->poll(0); while ($rk->getOutQLen() > 0) { $rk->poll(50); } } /** * 轨迹消费 */ public function consumer(){ $broker_list=Env::get('kafka.kafka_broker_list'); if(!$broker_list){ echo 'KAFKA_BROKER_LIST must be config!'; return; } $kafka_group=Env::get('kafka.kafka_group'); if(!$kafka_group){ echo 'KAFKA_GROUP must be config!'; return; } $kafka_topic=Env::get('kafka.kafka_topic'); if(!$kafka_topic){ echo 'KAFKA_TOPIC must be config!'; return; } $conf = new \RdKafka\Conf(); $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; var_dump($partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(NULL); break; default: throw new \Exception($err); } }); #定义消费组 $conf->set('group.id', $kafka_group); $rk = new \RdKafka\Consumer($conf); #集群地址 $rk->addBrokers($broker_list); $topicConf = new \RdKafka\TopicConf(); #配置没有设置偏移初始量时的消费消息点 smallest表示从最开始消费 $topicConf->set('auto.offset.reset', 'latest'); #跟踪话题 $topic = $rk->newTopic($kafka_topic, $topicConf); #第一个参数 要消耗的分区 , 第二个参数 开始消耗的偏移量 开始使用0分区 $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { #第一个参数 要消耗的分区 , 第二个参数 超时时间 $message = $topic->consume(0, 120*1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: //没有错误打印信息 $data = json_decode($message->payload,true); if( $data ){ $result = $this->addGpsDataToTs($data); // var_dump($result); if (Env::get('APP_DEBUG') && isset($result['message'])) { echo $result['message'] . PHP_EOL; } } break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } } } /** * 添加gps数据到表格存储 * @param $info */ public function addGpsDataToTs($info){ /* 正常输出样例 $info = array( 'DeviceId' => $data['tid'], //'State' => (string)$data['state'], //'Speed' => $data['speed'], 'Longitude' => $row['lng'], 'Latitude' => $row['lat'], 'DeviceTime' => $row['time'], 'Altitude' => $row['altitude'], //'LBS' => $data['lbs'], //'Direction' => $data['direction'], ); */ var_dump($info); if(!is_array($info)){ echo 'addGpsRouteToTs failed,$list must be array!'.PHP_EOL; return false; } if (empty($info['DeviceId'])) { echo 'addGpsHeartBeatToTs failed,GpsDeviceNumber! empty $info'.PHP_EOL; return false; } $device_info=$this->deviceModel->where('imei',$info['DeviceId'])->find(); if(!$device_info){ //设备未入库 直接返回 return array('success'=>false,'message'=>$info['DeviceId'].' not exist'); // return false; } //检测围栏 $fence_res = checkIsCrollFence($info, $device_info); echo $fence_res['message']. PHP_EOL;//输出围栏检测结果日志 //跳过无效定位数据 if($info['DeviceTime'] < (time()-3600) ){ return false; } $rows = []; //更新数据库 $sql_data = array( 'online_time' =>date('Y-m-d H:i:s',$info['DeviceTime']), 'loc_mode'=>'gps', ); if ( ($info['Longitude'] >0.065) && ($info['Latitude'] >0.065)) { $sql_data['longitude'] = $info['Longitude']; $sql_data['latitude'] = $info['Latitude']; $tmp = array( 'DeviceNumber' => $info['DeviceId'], 'Timestamp' =>$info['DeviceTime'], 'Longitude' => $info['Longitude'], 'Latitude' => $info['Latitude'], 'Altitude'=>$info['Altitude'], 'AlarmType'=>0 ); $rows[] = $tmp; } $res=$this->deviceModel->updateBy($device_info['id'],$sql_data); if($res===false){ echo 'update device '.$device_info['imei'].' fail';//修改失败提示 但不终止 } //没数据则返回 if(!$rows){ return array('success'=>true,'message'=>$info['DeviceId'].' no pressure data to add'); } //存入表格存储 $options = array( 'table_name' => 'wxt_route_gps', 'primary_key' => 'DeviceNumber,Timestamp', ); $res = addGpsRouteToTs($rows,$options); if(!$res['success']){ return array('success'=>false,'message'=>$info['DeviceId'].' addGpsToTsError'); } return array('success'=>true,'message'=>'add ts success'); } /** * 测试 */ public function test(){ $wx_tmp = new \Jiaruan\WxTmp(); $res= $wx_tmp->sendMessage(array()); $redis=Cache::store('redis')->handler(); echo 1111; // $p = Redis("get_access_token","hash"); // $val = $p->get($appid); $res=$redis->hset('test', '1111111', '22222222222'); echo 222; $val = $redis->hget('test', '1111111'); var_dump($val); } }