routeStore = new \Jiaruan\RouteStore();// tablestore $this->routeStore->reAddTimeoutInfo(); // 从 topic :rlstation_rfid_location 取轨迹 $conf = new RdKafka\Conf(); // Set a rebalance callback to log partition assignments (optional)(当有新的消费进程加入或者退出消费组时,kafka 会自动重新分配分区给消费者进程,这里注册了一个回调函数,当分区被重新分配时触发) $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; var_dump($partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(NULL); break; default: throw new \Exception($err); } }); // Configure the group.id. All consumer with the same group.id will consume( 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于这个topic 分区的数量是没有意义的。) // different partitions. $conf->set('group.id', $group); // Initial list of Kafka brokers(添加 kafka集群服务器地址) $conf->set('metadata.broker.list', $broker_list); $topicConf = new RdKafka\TopicConf(); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'smallest': start from the beginning $topicConf->set('auto.offset.reset', 'earliest'); // Set the configuration to use for subscribed/assigned topics $conf->setDefaultTopicConf($topicConf); $consumer = new RdKafka\KafkaConsumer($conf); // 订阅轨迹数据topic $consumer->subscribe($topics); while (true) { $message = $consumer->consume(120*1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: $data = json_decode($message->payload,true); if( $data ){ $result = $this->addGpsDataToTs($data); if (APP_DEBUG && isset($result['message'])) { echo $result['message'] . PHP_EOL; } } break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } } } private function addGpsDataToTs( $info ){ if(!is_array($info)){ echo 'addGpsRouteToTs failed,$list must be array!'.PHP_EOL; return false; } if (empty($info['DeviceId'])) { echo 'addGpsRouteToTs failed,GpsDeviceNumber! empty $info'.PHP_EOL; return false; } $device_model = M('devices'); if ( !$device_model->where(array('imei'=>$info['DeviceId'],'deleted_at'=>0))->count()) { echo 'addGpsRouteToTs failed,device not exist '.PHP_EOL; return false; } $rows = []; // 存入数据库 $cond = array('imei'=>$info['DeviceId']); if($info['DeviceTime'] < (time()-3600) ){ return false; } if($info['WifiMacs']){ //wifimac不需要更新用户表 在wifi定位kafka中更新 $sql_data = array( 'wifi_online_time' => date('Y-m-d H:i:s',$info['DeviceTime']), 'loc_mode'=>'wifi', ); $tmp = array( 'DeviceNumber' => $info['DeviceId'], 'Timestamp' =>$info['DeviceTime'], 'WifiMacs' => $info['WifiMacs'], ); }else{ $sql_data = array( 'online_time' => date('Y-m-d H:i:s',$info['DeviceTime']), 'loc_mode'=>'gps', ); if ( ($info['Longitude'] >0.065) && ($info['Latitude'] >0.065)) { $sql_data['longitude'] = $info['Longitude']; $sql_data['latitude'] = $info['Latitude']; $sql_data['wifi_macs'] = ''; $tmp = array( 'DeviceNumber' => $info['DeviceId'], 'Timestamp' =>$info['DeviceTime'], 'Longitude' => $info['Longitude'], 'Latitude' => $info['Latitude'], 'Altitude'=>$info['Altitude'], 'AlarmType'=>0 ); } } $res = $device_model->createSave($cond,$sql_data); if($res === false){ echo 'update student status fail'.PHP_EOL; return false; } if (!$tmp['WifiMacs'] && (!$tmp['Longitude'] || !$tmp['Latitude'])) { create_log($tmp,'wxt_drop_gps_route');// 添加日志 return false; } $rows[] = $tmp; if(!$rows){ return array('success'=>true,'message'=>'no pressure data to add'); } // 存入表格存储 $options = array( 'table_name' => 'wxt_route_gps', 'primary_key' => 'DeviceNumber,Timestamp', ); $res = $this->routeStore->addData($rows,$options); //create_log($rows,'add_wxt_rfid_route');// 添加日志 if(!$res['success']){ return array('success'=>false,'message'=>$res['message']); } return array('success'=>true,'message'=>'succes'); } public function wifi_kafka( ){ $broker_list = C('KAFKA_BROKER_LIST'); if (empty($broker_list)) { exit("KAFKA_BROKER_LIST must be config!".PHP_EOL); } $group = C('CATCH_FD_DATA_KAFKA_GROUP'); if (empty($group)) { exit("CATCH_FD_DATA_KAFKA_GROUP must be config!".PHP_EOL); } $topics = C('CATCH_FD_WIFI_MACS_TOPIC'); if (empty($topics)) { exit("CATCH_FD_WIFI_MACS_TOPIC must be config!".PHP_EOL); } $topics = explode(',',$topics); $conf = new RdKafka\Conf(); // Set a rebalance callback to log partition assignments (optional)(当有新的消费进程加入或者退出消费组时,kafka 会自动重新分配分区给消费者进程,这里注册了一个回调函数,当分区被重新分配时触发) $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; var_dump($partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(NULL); break; default: throw new \Exception($err); } }); // Configure the group.id. All consumer with the same group.id will consume( 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于这个topic 分区的数量是没有意义的。) // different partitions. $conf->set('group.id', $group); // Initial list of Kafka brokers(添加 kafka集群服务器地址) $conf->set('metadata.broker.list', $broker_list); $topicConf = new RdKafka\TopicConf(); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'smallest': start from the beginning $topicConf->set('auto.offset.reset', 'earliest'); // Set the configuration to use for subscribed/assigned topics $conf->setDefaultTopicConf($topicConf); $consumer = new RdKafka\KafkaConsumer($conf); // 订阅轨迹数据topic $consumer->subscribe($topics); while (true) { $message = $consumer->consume(120*1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: $data = json_decode($message->payload,true); if( $data ){ //var_dump($data); $result = $this->handleWifiMacsData($data); if (APP_DEBUG && isset($result['message'])) { echo $result['message'] . PHP_EOL; } } break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } } } private function handleWifiMacsData( $data ){ if(!is_array($data)){ return array('success'=>false,'message'=>'failed,data must be array!'); } $len=count($data); for($i=$len-1;$i>=0;$i--){ $last_location=$data[$i]; $wifi_macs=$last_location['locations']['0']['wifi_macs']; if( !M('devices')->where(array('imei'=>$last_location['tid'],'deleted_at'=>0))->count() ){ return array('success'=>false,'message'=>'failed,'.$last_location['tid'].'devices not existed!'); } echo 'imei: '.$last_location['tid'].' wifi_macs: '.$wifi_macs.PHP_EOL; $wifi_data=array('WifiMacs'=>$wifi_macs,'DeviceNumber'=>$last_location['tid']); //$res = requestWifiLBS($wifi_data); $res=requestWifiLBS_gaode($wifi_data); if ($res['success']) { $cond = array('imei'=>$last_location['tid'],'deleted_at'=>0); $device_info = M('devices')->where($cond)->find(); $save_data=array( 'loc_mode'=>'wifi', 'wifi_macs'=>$wifi_macs, 'wifi_online_time'=>date('Y-m-d H;i:s',$last_location['locations']['0']['time']), 'address'=>$res['data']['address'], 'wifi_longitude'=>$res['data']['lon'], 'wifi_latitude'=>$res['data']['lat'], ); $cond = array('imei'=>$last_location['tid']); $res = M('devices')->createSave($cond,$save_data); return array('success'=>true,'message'=>$last_location['tid'].'handle success'); } } return array('success'=>false,'message'=>'handle wifi_macs failed'); } }