123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300 |
- <?php
- class CatchFdRouteKafkaAction extends Action {
-
-
- public function gps_kafka( ){
- if (empty(C('OTS_DSN'))) {
- exit('OTS_DSN not defined!'.PHP_EOL);
- }
- $broker_list = C('KAFKA_BROKER_LIST');
- if (empty($broker_list)) {
- exit("KAFKA_BROKER_LIST must be config!".PHP_EOL);
- }
- $group = C('CATCH_FD_GPS_DATA_KAFKA_GROUP');
- if (empty($group)) {
- exit("CATCH_FD_GPS_DATA_KAFKA_GROUP must be config!".PHP_EOL);
- }
- $topics = C('CATCH_FD_GPS_ROUTE_KAFKA_TOPIC');
- if (empty($topics)) {
- exit("CATCH_FD_GPS_ROUTE_KAFKA_TOPIC must be config!".PHP_EOL);
- }
- $topics = explode(',',$topics);
-
- $this->routeStore = new \Jiaruan\RouteStore();// tablestore
- $this->routeStore->reAddTimeoutInfo();
- // 从 topic :rlstation_rfid_location 取轨迹
- $conf = new RdKafka\Conf();
- // Set a rebalance callback to log partition assignments (optional)(当有新的消费进程加入或者退出消费组时,kafka 会自动重新分配分区给消费者进程,这里注册了一个回调函数,当分区被重新分配时触发)
- $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
- switch ($err) {
- case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
- echo "Assign: ";
- var_dump($partitions);
- $kafka->assign($partitions);
- break;
-
- case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
- echo "Revoke: ";
- var_dump($partitions);
- $kafka->assign(NULL);
- break;
-
- default:
- throw new \Exception($err);
- }
- });
- // Configure the group.id. All consumer with the same group.id will consume( 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于这个topic 分区的数量是没有意义的。)
- // different partitions.
- $conf->set('group.id', $group);
- // Initial list of Kafka brokers(添加 kafka集群服务器地址)
-
- $conf->set('metadata.broker.list', $broker_list);
- $topicConf = new RdKafka\TopicConf();
- // Set where to start consuming messages when there is no initial offset in
- // offset store or the desired offset is out of range.
- // 'smallest': start from the beginning
- $topicConf->set('auto.offset.reset', 'earliest');
- // Set the configuration to use for subscribed/assigned topics
- $conf->setDefaultTopicConf($topicConf);
- $consumer = new RdKafka\KafkaConsumer($conf);
- // 订阅轨迹数据topic
- $consumer->subscribe($topics);
- while (true) {
- $message = $consumer->consume(120*1000);
- switch ($message->err) {
- case RD_KAFKA_RESP_ERR_NO_ERROR:
- $data = json_decode($message->payload,true);
- if( $data ){
- $result = $this->addGpsDataToTs($data);
-
- if (APP_DEBUG && isset($result['message'])) {
- echo $result['message'] . PHP_EOL;
- }
- }
- break;
- case RD_KAFKA_RESP_ERR__PARTITION_EOF:
- echo "No more messages; will wait for more\n";
- break;
- case RD_KAFKA_RESP_ERR__TIMED_OUT:
- echo "Timed out\n";
- break;
- default:
- throw new \Exception($message->errstr(), $message->err);
- break;
- }
- }
- }
-
-
- private function addGpsDataToTs( $info ){
-
- if(!is_array($info)){
- echo 'addGpsRouteToTs failed,$list must be array!'.PHP_EOL;
- return false;
- }
- if (empty($info['DeviceId'])) {
- echo 'addGpsRouteToTs failed,GpsDeviceNumber! empty $info'.PHP_EOL;
- return false;
- }
- $device_model = M('devices');
- if ( !$device_model->where(array('imei'=>$info['DeviceId'],'deleted_at'=>0))->count()) {
- echo 'addGpsRouteToTs failed,device not exist '.PHP_EOL;
- return false;
- }
-
-
- $rows = [];
- // 存入数据库
- $cond = array('imei'=>$info['DeviceId']);
-
- if($info['DeviceTime'] < (time()-3600) ){
- return false;
- }
- if($info['WifiMacs']){
- //wifimac不需要更新用户表 在wifi定位kafka中更新
- $sql_data = array(
- 'wifi_online_time' => date('Y-m-d H:i:s',$info['DeviceTime']),
- 'loc_mode'=>'wifi',
- );
- $tmp = array(
- 'DeviceNumber' => $info['DeviceId'],
- 'Timestamp' =>$info['DeviceTime'],
- 'WifiMacs' => $info['WifiMacs'],
- );
- }else{
- $sql_data = array(
- 'online_time' => date('Y-m-d H:i:s',$info['DeviceTime']),
- 'loc_mode'=>'gps',
- );
-
- if ( ($info['Longitude'] >0.065) && ($info['Latitude'] >0.065)) {
- $sql_data['longitude'] = $info['Longitude'];
- $sql_data['latitude'] = $info['Latitude'];
- $sql_data['wifi_macs'] = '';
- $tmp = array(
- 'DeviceNumber' => $info['DeviceId'],
- 'Timestamp' =>$info['DeviceTime'],
- 'Longitude' => $info['Longitude'],
- 'Latitude' => $info['Latitude'],
- 'Altitude'=>$info['Altitude'],
- 'AlarmType'=>0
- );
- }
-
- }
- $res = $device_model->createSave($cond,$sql_data);
- if($res === false){
- echo 'update student status fail'.PHP_EOL;
- return false;
- }
- if (!$tmp['WifiMacs'] && (!$tmp['Longitude'] || !$tmp['Latitude'])) {
- create_log($tmp,'wxt_drop_gps_route');// 添加日志
- return false;
- }
- $rows[] = $tmp;
- if(!$rows){
- return array('success'=>true,'message'=>'no pressure data to add');
- }
- // 存入表格存储
- $options = array(
- 'table_name' => 'wxt_route_gps',
- 'primary_key' => 'DeviceNumber,Timestamp',
- );
- $res = $this->routeStore->addData($rows,$options);
- //create_log($rows,'add_wxt_rfid_route');// 添加日志
- if(!$res['success']){
- return array('success'=>false,'message'=>$res['message']);
- }
- return array('success'=>true,'message'=>'succes');
- }
-
-
- public function wifi_kafka( ){
- $broker_list = C('KAFKA_BROKER_LIST');
- if (empty($broker_list)) {
- exit("KAFKA_BROKER_LIST must be config!".PHP_EOL);
- }
- $group = C('CATCH_FD_DATA_KAFKA_GROUP');
- if (empty($group)) {
- exit("CATCH_FD_DATA_KAFKA_GROUP must be config!".PHP_EOL);
- }
- $topics = C('CATCH_FD_WIFI_MACS_TOPIC');
- if (empty($topics)) {
- exit("CATCH_FD_WIFI_MACS_TOPIC must be config!".PHP_EOL);
- }
- $topics = explode(',',$topics);
- $conf = new RdKafka\Conf();
- // Set a rebalance callback to log partition assignments (optional)(当有新的消费进程加入或者退出消费组时,kafka 会自动重新分配分区给消费者进程,这里注册了一个回调函数,当分区被重新分配时触发)
- $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
- switch ($err) {
- case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
- echo "Assign: ";
- var_dump($partitions);
- $kafka->assign($partitions);
- break;
-
- case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
- echo "Revoke: ";
- var_dump($partitions);
- $kafka->assign(NULL);
- break;
-
- default:
- throw new \Exception($err);
- }
- });
- // Configure the group.id. All consumer with the same group.id will consume( 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于这个topic 分区的数量是没有意义的。)
- // different partitions.
- $conf->set('group.id', $group);
- // Initial list of Kafka brokers(添加 kafka集群服务器地址)
-
- $conf->set('metadata.broker.list', $broker_list);
- $topicConf = new RdKafka\TopicConf();
- // Set where to start consuming messages when there is no initial offset in
- // offset store or the desired offset is out of range.
- // 'smallest': start from the beginning
- $topicConf->set('auto.offset.reset', 'earliest');
- // Set the configuration to use for subscribed/assigned topics
- $conf->setDefaultTopicConf($topicConf);
- $consumer = new RdKafka\KafkaConsumer($conf);
- // 订阅轨迹数据topic
- $consumer->subscribe($topics);
- while (true) {
- $message = $consumer->consume(120*1000);
- switch ($message->err) {
- case RD_KAFKA_RESP_ERR_NO_ERROR:
- $data = json_decode($message->payload,true);
- if( $data ){
- //var_dump($data);
- $result = $this->handleWifiMacsData($data);
- if (APP_DEBUG && isset($result['message'])) {
- echo $result['message'] . PHP_EOL;
- }
- }
- break;
- case RD_KAFKA_RESP_ERR__PARTITION_EOF:
- echo "No more messages; will wait for more\n";
- break;
- case RD_KAFKA_RESP_ERR__TIMED_OUT:
- echo "Timed out\n";
- break;
- default:
- throw new \Exception($message->errstr(), $message->err);
- break;
- }
- }
- }
-
-
- private function handleWifiMacsData( $data ){
-
- if(!is_array($data)){
- return array('success'=>false,'message'=>'failed,data must be array!');
- }
-
- $len=count($data);
- for($i=$len-1;$i>=0;$i--){
- $last_location=$data[$i];
-
- $wifi_macs=$last_location['locations']['0']['wifi_macs'];
-
-
- if( !M('devices')->where(array('imei'=>$last_location['tid'],'deleted_at'=>0))->count() ){
- return array('success'=>false,'message'=>'failed,'.$last_location['tid'].'devices not existed!');
- }
- echo 'imei: '.$last_location['tid'].' wifi_macs: '.$wifi_macs.PHP_EOL;
- $wifi_data=array('WifiMacs'=>$wifi_macs,'DeviceNumber'=>$last_location['tid']);
- //$res = requestWifiLBS($wifi_data);
- $res=requestWifiLBS_gaode($wifi_data);
- if ($res['success']) {
-
- $cond = array('imei'=>$last_location['tid'],'deleted_at'=>0);
- $device_info = M('devices')->where($cond)->find();
-
- $save_data=array(
- 'loc_mode'=>'wifi',
- 'wifi_macs'=>$wifi_macs,
- 'wifi_online_time'=>date('Y-m-d H;i:s',$last_location['locations']['0']['time']),
- 'address'=>$res['data']['address'],
- 'wifi_longitude'=>$res['data']['lon'],
- 'wifi_latitude'=>$res['data']['lat'],
- );
- $cond = array('imei'=>$last_location['tid']);
- $res = M('devices')->createSave($cond,$save_data);
- return array('success'=>true,'message'=>$last_location['tid'].'handle success');
- }
-
- }
-
-
-
- return array('success'=>false,'message'=>'handle wifi_macs failed');
- }
-
- }
|