CatchFdRouteKafkaAction.class.php 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. <?php
  2. class CatchFdRouteKafkaAction extends Action {
  3. public function gps_kafka( ){
  4. if (empty(C('OTS_DSN'))) {
  5. exit('OTS_DSN not defined!'.PHP_EOL);
  6. }
  7. $broker_list = C('KAFKA_BROKER_LIST');
  8. if (empty($broker_list)) {
  9. exit("KAFKA_BROKER_LIST must be config!".PHP_EOL);
  10. }
  11. $group = C('CATCH_FD_GPS_DATA_KAFKA_GROUP');
  12. if (empty($group)) {
  13. exit("CATCH_FD_GPS_DATA_KAFKA_GROUP must be config!".PHP_EOL);
  14. }
  15. $topics = C('CATCH_FD_GPS_ROUTE_KAFKA_TOPIC');
  16. if (empty($topics)) {
  17. exit("CATCH_FD_GPS_ROUTE_KAFKA_TOPIC must be config!".PHP_EOL);
  18. }
  19. $topics = explode(',',$topics);
  20. $this->routeStore = new \Jiaruan\RouteStore();// tablestore
  21. $this->routeStore->reAddTimeoutInfo();
  22. // 从 topic :rlstation_rfid_location 取轨迹
  23. $conf = new RdKafka\Conf();
  24. // Set a rebalance callback to log partition assignments (optional)(当有新的消费进程加入或者退出消费组时,kafka 会自动重新分配分区给消费者进程,这里注册了一个回调函数,当分区被重新分配时触发)
  25. $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
  26. switch ($err) {
  27. case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
  28. echo "Assign: ";
  29. var_dump($partitions);
  30. $kafka->assign($partitions);
  31. break;
  32. case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
  33. echo "Revoke: ";
  34. var_dump($partitions);
  35. $kafka->assign(NULL);
  36. break;
  37. default:
  38. throw new \Exception($err);
  39. }
  40. });
  41. // Configure the group.id. All consumer with the same group.id will consume( 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于这个topic 分区的数量是没有意义的。)
  42. // different partitions.
  43. $conf->set('group.id', $group);
  44. // Initial list of Kafka brokers(添加 kafka集群服务器地址)
  45. $conf->set('metadata.broker.list', $broker_list);
  46. $topicConf = new RdKafka\TopicConf();
  47. // Set where to start consuming messages when there is no initial offset in
  48. // offset store or the desired offset is out of range.
  49. // 'smallest': start from the beginning
  50. $topicConf->set('auto.offset.reset', 'earliest');
  51. // Set the configuration to use for subscribed/assigned topics
  52. $conf->setDefaultTopicConf($topicConf);
  53. $consumer = new RdKafka\KafkaConsumer($conf);
  54. // 订阅轨迹数据topic
  55. $consumer->subscribe($topics);
  56. while (true) {
  57. $message = $consumer->consume(120*1000);
  58. switch ($message->err) {
  59. case RD_KAFKA_RESP_ERR_NO_ERROR:
  60. $data = json_decode($message->payload,true);
  61. if( $data ){
  62. $result = $this->addGpsDataToTs($data);
  63. if (APP_DEBUG && isset($result['message'])) {
  64. echo $result['message'] . PHP_EOL;
  65. }
  66. }
  67. break;
  68. case RD_KAFKA_RESP_ERR__PARTITION_EOF:
  69. echo "No more messages; will wait for more\n";
  70. break;
  71. case RD_KAFKA_RESP_ERR__TIMED_OUT:
  72. echo "Timed out\n";
  73. break;
  74. default:
  75. throw new \Exception($message->errstr(), $message->err);
  76. break;
  77. }
  78. }
  79. }
  80. private function addGpsDataToTs( $info ){
  81. if(!is_array($info)){
  82. echo 'addGpsRouteToTs failed,$list must be array!'.PHP_EOL;
  83. return false;
  84. }
  85. if (empty($info['DeviceId'])) {
  86. echo 'addGpsRouteToTs failed,GpsDeviceNumber! empty $info'.PHP_EOL;
  87. return false;
  88. }
  89. $device_model = M('devices');
  90. if ( !$device_model->where(array('imei'=>$info['DeviceId'],'deleted_at'=>0))->count()) {
  91. echo 'addGpsRouteToTs failed,device not exist '.PHP_EOL;
  92. return false;
  93. }
  94. $rows = [];
  95. // 存入数据库
  96. $cond = array('imei'=>$info['DeviceId']);
  97. if($info['DeviceTime'] < (time()-3600) ){
  98. return false;
  99. }
  100. if($info['WifiMacs']){
  101. //wifimac不需要更新用户表 在wifi定位kafka中更新
  102. $sql_data = array(
  103. 'wifi_online_time' => date('Y-m-d H:i:s',$info['DeviceTime']),
  104. 'loc_mode'=>'wifi',
  105. );
  106. $tmp = array(
  107. 'DeviceNumber' => $info['DeviceId'],
  108. 'Timestamp' =>$info['DeviceTime'],
  109. 'WifiMacs' => $info['WifiMacs'],
  110. );
  111. }else{
  112. $sql_data = array(
  113. 'online_time' => date('Y-m-d H:i:s',$info['DeviceTime']),
  114. 'loc_mode'=>'gps',
  115. );
  116. if ( ($info['Longitude'] >0.065) && ($info['Latitude'] >0.065)) {
  117. $sql_data['longitude'] = $info['Longitude'];
  118. $sql_data['latitude'] = $info['Latitude'];
  119. $sql_data['wifi_macs'] = '';
  120. $tmp = array(
  121. 'DeviceNumber' => $info['DeviceId'],
  122. 'Timestamp' =>$info['DeviceTime'],
  123. 'Longitude' => $info['Longitude'],
  124. 'Latitude' => $info['Latitude'],
  125. 'Altitude'=>$info['Altitude'],
  126. 'AlarmType'=>0
  127. );
  128. }
  129. }
  130. $res = $device_model->createSave($cond,$sql_data);
  131. if($res === false){
  132. echo 'update student status fail'.PHP_EOL;
  133. return false;
  134. }
  135. if (!$tmp['WifiMacs'] && (!$tmp['Longitude'] || !$tmp['Latitude'])) {
  136. create_log($tmp,'wxt_drop_gps_route');// 添加日志
  137. return false;
  138. }
  139. $rows[] = $tmp;
  140. if(!$rows){
  141. return array('success'=>true,'message'=>'no pressure data to add');
  142. }
  143. // 存入表格存储
  144. $options = array(
  145. 'table_name' => 'wxt_route_gps',
  146. 'primary_key' => 'DeviceNumber,Timestamp',
  147. );
  148. $res = $this->routeStore->addData($rows,$options);
  149. //create_log($rows,'add_wxt_rfid_route');// 添加日志
  150. if(!$res['success']){
  151. return array('success'=>false,'message'=>$res['message']);
  152. }
  153. return array('success'=>true,'message'=>'succes');
  154. }
  155. public function wifi_kafka( ){
  156. $broker_list = C('KAFKA_BROKER_LIST');
  157. if (empty($broker_list)) {
  158. exit("KAFKA_BROKER_LIST must be config!".PHP_EOL);
  159. }
  160. $group = C('CATCH_FD_DATA_KAFKA_GROUP');
  161. if (empty($group)) {
  162. exit("CATCH_FD_DATA_KAFKA_GROUP must be config!".PHP_EOL);
  163. }
  164. $topics = C('CATCH_FD_WIFI_MACS_TOPIC');
  165. if (empty($topics)) {
  166. exit("CATCH_FD_WIFI_MACS_TOPIC must be config!".PHP_EOL);
  167. }
  168. $topics = explode(',',$topics);
  169. $conf = new RdKafka\Conf();
  170. // Set a rebalance callback to log partition assignments (optional)(当有新的消费进程加入或者退出消费组时,kafka 会自动重新分配分区给消费者进程,这里注册了一个回调函数,当分区被重新分配时触发)
  171. $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
  172. switch ($err) {
  173. case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
  174. echo "Assign: ";
  175. var_dump($partitions);
  176. $kafka->assign($partitions);
  177. break;
  178. case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
  179. echo "Revoke: ";
  180. var_dump($partitions);
  181. $kafka->assign(NULL);
  182. break;
  183. default:
  184. throw new \Exception($err);
  185. }
  186. });
  187. // Configure the group.id. All consumer with the same group.id will consume( 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于这个topic 分区的数量是没有意义的。)
  188. // different partitions.
  189. $conf->set('group.id', $group);
  190. // Initial list of Kafka brokers(添加 kafka集群服务器地址)
  191. $conf->set('metadata.broker.list', $broker_list);
  192. $topicConf = new RdKafka\TopicConf();
  193. // Set where to start consuming messages when there is no initial offset in
  194. // offset store or the desired offset is out of range.
  195. // 'smallest': start from the beginning
  196. $topicConf->set('auto.offset.reset', 'earliest');
  197. // Set the configuration to use for subscribed/assigned topics
  198. $conf->setDefaultTopicConf($topicConf);
  199. $consumer = new RdKafka\KafkaConsumer($conf);
  200. // 订阅轨迹数据topic
  201. $consumer->subscribe($topics);
  202. while (true) {
  203. $message = $consumer->consume(120*1000);
  204. switch ($message->err) {
  205. case RD_KAFKA_RESP_ERR_NO_ERROR:
  206. $data = json_decode($message->payload,true);
  207. if( $data ){
  208. //var_dump($data);
  209. $result = $this->handleWifiMacsData($data);
  210. if (APP_DEBUG && isset($result['message'])) {
  211. echo $result['message'] . PHP_EOL;
  212. }
  213. }
  214. break;
  215. case RD_KAFKA_RESP_ERR__PARTITION_EOF:
  216. echo "No more messages; will wait for more\n";
  217. break;
  218. case RD_KAFKA_RESP_ERR__TIMED_OUT:
  219. echo "Timed out\n";
  220. break;
  221. default:
  222. throw new \Exception($message->errstr(), $message->err);
  223. break;
  224. }
  225. }
  226. }
  227. private function handleWifiMacsData( $data ){
  228. if(!is_array($data)){
  229. return array('success'=>false,'message'=>'failed,data must be array!');
  230. }
  231. $len=count($data);
  232. for($i=$len-1;$i>=0;$i--){
  233. $last_location=$data[$i];
  234. $wifi_macs=$last_location['locations']['0']['wifi_macs'];
  235. if( !M('devices')->where(array('imei'=>$last_location['tid'],'deleted_at'=>0))->count() ){
  236. return array('success'=>false,'message'=>'failed,'.$last_location['tid'].'devices not existed!');
  237. }
  238. echo 'imei: '.$last_location['tid'].' wifi_macs: '.$wifi_macs.PHP_EOL;
  239. $wifi_data=array('WifiMacs'=>$wifi_macs,'DeviceNumber'=>$last_location['tid']);
  240. //$res = requestWifiLBS($wifi_data);
  241. $res=requestWifiLBS_gaode($wifi_data);
  242. if ($res['success']) {
  243. $cond = array('imei'=>$last_location['tid'],'deleted_at'=>0);
  244. $device_info = M('devices')->where($cond)->find();
  245. $save_data=array(
  246. 'loc_mode'=>'wifi',
  247. 'wifi_macs'=>$wifi_macs,
  248. 'wifi_online_time'=>date('Y-m-d H;i:s',$last_location['locations']['0']['time']),
  249. 'address'=>$res['data']['address'],
  250. 'wifi_longitude'=>$res['data']['lon'],
  251. 'wifi_latitude'=>$res['data']['lat'],
  252. );
  253. $cond = array('imei'=>$last_location['tid']);
  254. $res = M('devices')->createSave($cond,$save_data);
  255. return array('success'=>true,'message'=>$last_location['tid'].'handle success');
  256. }
  257. }
  258. return array('success'=>false,'message'=>'handle wifi_macs failed');
  259. }
  260. }