123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237 |
- <?php
- namespace catchAdmin\kafka\controller;
- use catcher\base\CatchRequest as Request;
- use catcher\CatchResponse;
- use catcher\base\CatchController;
- use think\facade\Env;
- use catchAdmin\device\model\Device as deviceModel;
- use think\facade\Cache;
- class GpsRoute extends CatchController
- {
-
-
- protected $deviceModel;
-
- public function __construct(DeviceModel $deviceModel)
- {
- $this->deviceModel = $deviceModel;
- }
-
- /**
- * 列表
- * @time 2022年02月14日 11:11
- */
- public function index()
- {
- $rk = new \RdKafka\Producer();
- $broker_list=Env::get('kafka.kafka_broker_list');
- $rk->addBrokers($broker_list); //kafka服务器地址
- // $rk->addBrokers("10.0.0.1,10.0.0.2"); //多服务器地址
- $kafka_topic=Env::get('kafka.kafka_topic');
- $topic = $rk->newTopic($kafka_topic); //topic名称
- $arr=array(
- 'DeviceId'=>'869688888888888',
- 'DeviceTime'=>time(),
- 'Latitude'=>30.192289977186,
- 'Longitude'=>120.20063757299,
- 'Altitude'=>108.951,
- );
- $topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($arr));
- $rk->poll(0);
-
-
- while ($rk->getOutQLen() > 0) {
- $rk->poll(50);
- }
-
- }
- /**
- * 轨迹消费
- */
- public function consumer(){
-
- $broker_list=Env::get('kafka.kafka_broker_list');
- if(!$broker_list){
- echo 'KAFKA_BROKER_LIST must be config!';
- return;
- }
- $kafka_group=Env::get('kafka.kafka_group');
- if(!$kafka_group){
- echo 'KAFKA_GROUP must be config!';
- return;
- }
- $kafka_topic=Env::get('kafka.kafka_topic');
- if(!$kafka_topic){
- echo 'KAFKA_TOPIC must be config!';
- return;
- }
- $conf = new \RdKafka\Conf();
- $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
- switch ($err) {
- case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
- echo "Assign: ";
- var_dump($partitions);
- $kafka->assign($partitions);
- break;
-
- case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
- echo "Revoke: ";
- var_dump($partitions);
- $kafka->assign(NULL);
- break;
-
- default:
- throw new \Exception($err);
- }
- });
- #定义消费组
- $conf->set('group.id', $kafka_group);
- $rk = new \RdKafka\Consumer($conf);
- #集群地址
- $rk->addBrokers($broker_list);
- $topicConf = new \RdKafka\TopicConf();
- #配置没有设置偏移初始量时的消费消息点 smallest表示从最开始消费
- $topicConf->set('auto.offset.reset', 'latest');
- #跟踪话题
- $topic = $rk->newTopic($kafka_topic, $topicConf);
-
- #第一个参数 要消耗的分区 , 第二个参数 开始消耗的偏移量 开始使用0分区
- $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
-
- while (true) {
- #第一个参数 要消耗的分区 , 第二个参数 超时时间
- $message = $topic->consume(0, 120*1000);
- switch ($message->err) {
- case RD_KAFKA_RESP_ERR_NO_ERROR:
- //没有错误打印信息
- $data = json_decode($message->payload,true);
-
- if( $data ){
- $result = $this->addGpsDataToTs($data);
- // var_dump($result);
- if (Env::get('APP_DEBUG') && isset($result['message'])) {
- echo $result['message'] . PHP_EOL;
- }
- }
- break;
- case RD_KAFKA_RESP_ERR__PARTITION_EOF:
- echo "No more messages; will wait for more\n";
- break;
- case RD_KAFKA_RESP_ERR__TIMED_OUT:
- echo "Timed out\n";
- break;
- default:
- throw new \Exception($message->errstr(), $message->err);
- break;
- }
- }
- }
- /**
- * 添加gps数据到表格存储
- * @param $info
- */
- public function addGpsDataToTs($info){
- /* 正常输出样例
- $info = array(
- 'DeviceId' => $data['tid'],
- //'State' => (string)$data['state'],
- //'Speed' => $data['speed'],
- 'Longitude' => $row['lng'],
- 'Latitude' => $row['lat'],
- 'DeviceTime' => $row['time'],
- 'Altitude' => $row['altitude'],
- //'LBS' => $data['lbs'],
- //'Direction' => $data['direction'],
- );
- */
- var_dump($info);
- if(!is_array($info)){
- echo 'addGpsRouteToTs failed,$list must be array!'.PHP_EOL;
- return false;
- }
- if (empty($info['DeviceId'])) {
- echo 'addGpsHeartBeatToTs failed,GpsDeviceNumber! empty $info'.PHP_EOL;
- return false;
- }
- $device_info=$this->deviceModel->where('imei',$info['DeviceId'])->find();
- if(!$device_info){
- //设备未入库 直接返回
- return array('success'=>false,'message'=>$info['DeviceId'].' not exist');
- // return false;
- }
- //检测围栏
- $fence_res = checkIsCrollFence($info, $device_info);
- echo $fence_res['message']. PHP_EOL;//输出围栏检测结果日志
- //跳过无效定位数据
- if($info['DeviceTime'] < (time()-3600) ){
- return false;
- }
-
- $rows = [];
- //更新数据库
- $sql_data = array(
- 'online_time' =>date('Y-m-d H:i:s',$info['DeviceTime']),
- 'loc_mode'=>'gps',
- );
-
- if ( ($info['Longitude'] >0.065) && ($info['Latitude'] >0.065)) {
- $sql_data['longitude'] = $info['Longitude'];
- $sql_data['latitude'] = $info['Latitude'];
-
- $tmp = array(
- 'DeviceNumber' => $info['DeviceId'],
- 'Timestamp' =>$info['DeviceTime'],
- 'Longitude' => $info['Longitude'],
- 'Latitude' => $info['Latitude'],
- 'Altitude'=>$info['Altitude'],
- 'AlarmType'=>0
- );
- $rows[] = $tmp;
- }
- $res=$this->deviceModel->updateBy($device_info['id'],$sql_data);
- if($res===false){
- echo 'update device '.$device_info['imei'].' fail';//修改失败提示 但不终止
- }
- //没数据则返回
- if(!$rows){
- return array('success'=>true,'message'=>$info['DeviceId'].' no pressure data to add');
- }
- //存入表格存储
- $options = array(
- 'table_name' => 'wxt_route_gps',
- 'primary_key' => 'DeviceNumber,Timestamp',
- );
- $res = addGpsRouteToTs($rows,$options);
- if(!$res['success']){
- return array('success'=>false,'message'=>$info['DeviceId'].' addGpsToTsError');
- }
- return array('success'=>true,'message'=>'add ts success');
- }
- /**
- * 测试
- */
- public function test(){
- $wx_tmp = new \Jiaruan\WxTmp();
- $res= $wx_tmp->sendMessage(array());
- $redis=Cache::store('redis')->handler();
- echo 1111;
- // $p = Redis("get_access_token","hash");
- // $val = $p->get($appid);
- $res=$redis->hset('test', '1111111', '22222222222');
- echo 222;
- $val = $redis->hget('test', '1111111');
- var_dump($val);
- }
-
-
- }
|