123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- <?php
- /*
- * @Author: 李康
- * @Date: 2022-09-28 16:48:06
- * @LastEditors: ‘likang’ zmcoding
- * @LastEditTime: 2022-09-30 15:06:44
- * @FilePath: /kafka_oracle/app/test/controller/Test.php
- * @Description:
- *
- * Copyright (c) 2022 by ‘likang’ zmcoding, All Rights Reserved.
- */
- namespace app\test\controller;
- use app\BaseController;
- use think\facade\Db;
- class Test extends BaseController
- {
- //public static $brokerList = '116.62.220.88:9092';
- /**
- * @description: 发送kafka记录
- * @return {*}
- */
- public function sent()
- {
- date_default_timezone_set('PRC');
- $config = \Kafka\ProducerConfig::getInstance();
-
- /* Topic的元信息刷新的间隔 */
- $config->setMetadataRefreshIntervalMs(10000);
-
- /* 设置broker的地址 */
- $config->setMetadataBrokerList('192.168.1.106:9092');
- /* 设置broker的代理版本 */
- $config->setBrokerVersion('1.0.0');
- /* 只需要leader确认消息 */
- $config->setRequiredAck(1);
- /* 选择异步 */
- $config->setIsAsyn(false);
- /* 每500毫秒发送消息 */
- $config->setProduceInterval(500);
- /* 创建⼀个⽣产者实例 */
- $producer = new \Kafka\Producer();
-
- for($i = 0; $i < 10; $i++ ) {
-
- $producer->send([
- [
- 'topic' => 'test1',
- 'value' => 'test'.$i,
- ],
- ]);
- }
- echo '发送成功</br>';
- }
- /**
- * @description: 接收记录
- * @return {*}
- */
- public function consumer()
- {
- $config = \Kafka\ConsumerConfig::getInstance();
- $config->setMetadataRefreshIntervalMs(10000);
- $config->setMetadataBrokerList('localhost:9092');
- $config->setGroupId('test');
- $config->setBrokerVersion('1.0.0');
- $config->setTopics(['test']);
-
- $consumer = new \Kafka\Consumer();
- //读取kafka内容并存储在数据库中
- $consumer->start(function($topic, $part, $message) {
- $json_data = json_decode($message,true);
- debug_log('接收记录',$message);
- if(empty($json_data))
- {
- debug_log('接收记录',"解析失败");
- return ;
- }
- $mac = Db::name('ADM_DEV')->where('LOGIN_NAME',$json_data['mac'])->find();
- if(empty($mac))
- {
- debug_log('接收记录',"查找不到该mac地址");
- return;
- }
- if($json_data['methond']=='track')
- {
- $this->track($json_data);
- }
- elseif($json_data['methond']=='login')
- {
- $this->login($json_data);
- }
- elseif($json_data['methond']=='heartbeat')
- {
- $this->heartbeat($json_data);
- }
- else
- {
- debug_log('接收记录',"类型错误");
- return;
- }
- debug_log('接收记录',"上传成功");
- });
- }
- //如果是轨迹则执行该方法
- private function track($data)
- {
- $mac = $data['mac'];
- $content = [];
- if($data['gps']['locationState']=="A")
- {
- $content['GPS_X']=$data['gps']['lat'];
- $content['GPS_Y']=$data['gps']['lng'];
- }
- Db::name('ADM_DEV')->where('LOGIN_NAME',$mac)->update($content);
- foreach($data['labels'] as $item)
- {
- $lable = [];
- $lable['RF_FLAGID']=$item['id'];
- $lable['RF_ID']=$mac;
- $lable['RF_DATE'] = $this->parsings($item['time']);
- if($item['event']['entry']==1)
- {
- $lable['RF_STAT']=1;
- }
- elseif($item['event']['leave']==1)
- {
- $lable['RF_STAT']=2;
- }else
- {
- $lable['RF_STAT'] = 0;
- }
- Db::name('W_DW_RF_RECORD')->insert($lable);
-
- }
- }
- //如果是登录则执行该方法
- private function login($data)
- {
- $mac = $data['mac'];
- $content = [];
- if($data['gps']['locationState']=="A")
- {
- $content['GPS_X']=$data['gps']['lat'];
- $content['GPS_Y']=$data['gps']['lng'];
- }
- $content['IS_ONLINE']=1;
- Db::name('ADM_DEV')->where('LOGIN_NAME',$mac)->update($content);
- }
- //如果是心跳则执行该方法
- private function heartbeat($data)
- {
- $mac = $data['mac'];
- $content = [];
- $content['IS_ONLINE']=1;
- Db::name('ADM_DEV')->where('LOGIN_NAME',$mac)->update($content);
-
- }
- //解析时间
- private function parsings($date)
- {
- $getdate = '20'.$date;
- $getdate = str_split($getdate,2);
- $resout =$getdate[0].$getdate[1]."-".$getdate[2]."-".$getdate[3]." ".$getdate[4].":".$getdate[5].":".$getdate[6];
- return $resout;
- }
-
- }
|