GpsRoute.php 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. <?php
  2. namespace catchAdmin\kafka\controller;
  3. use catcher\base\CatchRequest as Request;
  4. use catcher\CatchResponse;
  5. use catcher\base\CatchController;
  6. use think\facade\Env;
  7. use catchAdmin\device\model\Device as deviceModel;
  8. use think\facade\Cache;
  9. class GpsRoute extends CatchController
  10. {
  11. protected $deviceModel;
  12. public function __construct(DeviceModel $deviceModel)
  13. {
  14. $this->deviceModel = $deviceModel;
  15. }
  16. /**
  17. * 列表
  18. * @time 2022年02月14日 11:11
  19. */
  20. public function index()
  21. {
  22. $rk = new \RdKafka\Producer();
  23. $broker_list=Env::get('kafka.kafka_broker_list');
  24. $rk->addBrokers($broker_list); //kafka服务器地址
  25. // $rk->addBrokers("10.0.0.1,10.0.0.2"); //多服务器地址
  26. $kafka_topic=Env::get('kafka.kafka_topic');
  27. $topic = $rk->newTopic($kafka_topic); //topic名称
  28. $arr=array(
  29. 'DeviceId'=>'869688888888888',
  30. 'DeviceTime'=>time(),
  31. 'Latitude'=>30.192289977186,
  32. 'Longitude'=>120.20063757299,
  33. 'Altitude'=>108.951,
  34. );
  35. $topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($arr));
  36. $rk->poll(0);
  37. while ($rk->getOutQLen() > 0) {
  38. $rk->poll(50);
  39. }
  40. }
  41. /**
  42. * 轨迹消费
  43. */
  44. public function consumer(){
  45. $broker_list=Env::get('kafka.kafka_broker_list');
  46. if(!$broker_list){
  47. echo 'KAFKA_BROKER_LIST must be config!';
  48. return;
  49. }
  50. $kafka_group=Env::get('kafka.kafka_group');
  51. if(!$kafka_group){
  52. echo 'KAFKA_GROUP must be config!';
  53. return;
  54. }
  55. $kafka_topic=Env::get('kafka.kafka_topic');
  56. if(!$kafka_topic){
  57. echo 'KAFKA_TOPIC must be config!';
  58. return;
  59. }
  60. $conf = new \RdKafka\Conf();
  61. $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
  62. switch ($err) {
  63. case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
  64. echo "Assign: ";
  65. var_dump($partitions);
  66. $kafka->assign($partitions);
  67. break;
  68. case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
  69. echo "Revoke: ";
  70. var_dump($partitions);
  71. $kafka->assign(NULL);
  72. break;
  73. default:
  74. throw new \Exception($err);
  75. }
  76. });
  77. #定义消费组
  78. $conf->set('group.id', $kafka_group);
  79. $rk = new \RdKafka\Consumer($conf);
  80. #集群地址
  81. $rk->addBrokers($broker_list);
  82. $topicConf = new \RdKafka\TopicConf();
  83. #配置没有设置偏移初始量时的消费消息点 smallest表示从最开始消费
  84. $topicConf->set('auto.offset.reset', 'latest');
  85. #跟踪话题
  86. $topic = $rk->newTopic($kafka_topic, $topicConf);
  87. #第一个参数 要消耗的分区 , 第二个参数 开始消耗的偏移量 开始使用0分区
  88. $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
  89. while (true) {
  90. #第一个参数 要消耗的分区 , 第二个参数 超时时间
  91. $message = $topic->consume(0, 120*1000);
  92. switch ($message->err) {
  93. case RD_KAFKA_RESP_ERR_NO_ERROR:
  94. //没有错误打印信息
  95. $data = json_decode($message->payload,true);
  96. if( $data ){
  97. $result = $this->addGpsDataToTs($data);
  98. // var_dump($result);
  99. if (Env::get('APP_DEBUG') && isset($result['message'])) {
  100. echo $result['message'] . PHP_EOL;
  101. }
  102. }
  103. break;
  104. case RD_KAFKA_RESP_ERR__PARTITION_EOF:
  105. echo "No more messages; will wait for more\n";
  106. break;
  107. case RD_KAFKA_RESP_ERR__TIMED_OUT:
  108. echo "Timed out\n";
  109. break;
  110. default:
  111. throw new \Exception($message->errstr(), $message->err);
  112. break;
  113. }
  114. }
  115. }
  116. /**
  117. * 添加gps数据到表格存储
  118. * @param $info
  119. */
  120. public function addGpsDataToTs($info){
  121. /* 正常输出样例
  122. $info = array(
  123. 'DeviceId' => $data['tid'],
  124. //'State' => (string)$data['state'],
  125. //'Speed' => $data['speed'],
  126. 'Longitude' => $row['lng'],
  127. 'Latitude' => $row['lat'],
  128. 'DeviceTime' => $row['time'],
  129. 'Altitude' => $row['altitude'],
  130. //'LBS' => $data['lbs'],
  131. //'Direction' => $data['direction'],
  132. );
  133. */
  134. var_dump($info);
  135. if(!is_array($info)){
  136. echo 'addGpsRouteToTs failed,$list must be array!'.PHP_EOL;
  137. return false;
  138. }
  139. if (empty($info['DeviceId'])) {
  140. echo 'addGpsHeartBeatToTs failed,GpsDeviceNumber! empty $info'.PHP_EOL;
  141. return false;
  142. }
  143. $device_info=$this->deviceModel->where('imei',$info['DeviceId'])->find();
  144. if(!$device_info){
  145. //设备未入库 直接返回
  146. return array('success'=>false,'message'=>$info['DeviceId'].' not exist');
  147. // return false;
  148. }
  149. //检测围栏
  150. $fence_res = checkIsCrollFence($info, $device_info);
  151. echo $fence_res['message']. PHP_EOL;//输出围栏检测结果日志
  152. //跳过无效定位数据
  153. if($info['DeviceTime'] < (time()-3600) ){
  154. return false;
  155. }
  156. $rows = [];
  157. //更新数据库
  158. $sql_data = array(
  159. 'online_time' =>date('Y-m-d H:i:s',$info['DeviceTime']),
  160. 'loc_mode'=>'gps',
  161. );
  162. if ( ($info['Longitude'] >0.065) && ($info['Latitude'] >0.065)) {
  163. $sql_data['longitude'] = $info['Longitude'];
  164. $sql_data['latitude'] = $info['Latitude'];
  165. $tmp = array(
  166. 'DeviceNumber' => $info['DeviceId'],
  167. 'Timestamp' =>$info['DeviceTime'],
  168. 'Longitude' => $info['Longitude'],
  169. 'Latitude' => $info['Latitude'],
  170. 'Altitude'=>$info['Altitude'],
  171. 'AlarmType'=>0
  172. );
  173. $rows[] = $tmp;
  174. }
  175. $res=$this->deviceModel->updateBy($device_info['id'],$sql_data);
  176. if($res===false){
  177. echo 'update device '.$device_info['imei'].' fail';//修改失败提示 但不终止
  178. }
  179. //没数据则返回
  180. if(!$rows){
  181. return array('success'=>true,'message'=>$info['DeviceId'].' no pressure data to add');
  182. }
  183. //存入表格存储
  184. $options = array(
  185. 'table_name' => 'wxt_route_gps',
  186. 'primary_key' => 'DeviceNumber,Timestamp',
  187. );
  188. $res = addGpsRouteToTs($rows,$options);
  189. if(!$res['success']){
  190. return array('success'=>false,'message'=>$info['DeviceId'].' addGpsToTsError');
  191. }
  192. return array('success'=>true,'message'=>'add ts success');
  193. }
  194. /**
  195. * 测试
  196. */
  197. public function test(){
  198. $wx_tmp = new \Jiaruan\WxTmp();
  199. $res= $wx_tmp->sendMessage(array());
  200. $redis=Cache::store('redis')->handler();
  201. echo 1111;
  202. // $p = Redis("get_access_token","hash");
  203. // $val = $p->get($appid);
  204. $res=$redis->hset('test', '1111111', '22222222222');
  205. echo 222;
  206. $val = $redis->hget('test', '1111111');
  207. var_dump($val);
  208. }
  209. }