Test.php 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. <?php
  2. /*
  3. * @Author: 李康
  4. * @Date: 2022-09-28 16:48:06
  5. * @LastEditors: ‘likang’ zmcoding
  6. * @LastEditTime: 2022-09-30 15:06:44
  7. * @FilePath: /kafka_oracle/app/test/controller/Test.php
  8. * @Description:
  9. *
  10. * Copyright (c) 2022 by ‘likang’ zmcoding, All Rights Reserved.
  11. */
  12. namespace app\test\controller;
  13. use app\BaseController;
  14. use think\facade\Db;
  15. class Test extends BaseController
  16. {
  17. //public static $brokerList = '116.62.220.88:9092';
  18. /**
  19. * @description: 发送kafka记录
  20. * @return {*}
  21. */
  22. public function sent()
  23. {
  24. date_default_timezone_set('PRC');
  25. $config = \Kafka\ProducerConfig::getInstance();
  26. /* Topic的元信息刷新的间隔 */
  27. $config->setMetadataRefreshIntervalMs(10000);
  28. /* 设置broker的地址 */
  29. $config->setMetadataBrokerList('192.168.1.106:9092');
  30. /* 设置broker的代理版本 */
  31. $config->setBrokerVersion('1.0.0');
  32. /* 只需要leader确认消息 */
  33. $config->setRequiredAck(1);
  34. /* 选择异步 */
  35. $config->setIsAsyn(false);
  36. /* 每500毫秒发送消息 */
  37. $config->setProduceInterval(500);
  38. /* 创建⼀个⽣产者实例 */
  39. $producer = new \Kafka\Producer();
  40. for($i = 0; $i < 10; $i++ ) {
  41. $producer->send([
  42. [
  43. 'topic' => 'test1',
  44. 'value' => 'test'.$i,
  45. ],
  46. ]);
  47. }
  48. echo '发送成功</br>';
  49. }
  50. /**
  51. * @description: 接收记录
  52. * @return {*}
  53. */
  54. public function consumer()
  55. {
  56. $config = \Kafka\ConsumerConfig::getInstance();
  57. $config->setMetadataRefreshIntervalMs(10000);
  58. $config->setMetadataBrokerList('localhost:9092');
  59. $config->setGroupId('test');
  60. $config->setBrokerVersion('1.0.0');
  61. $config->setTopics(['test']);
  62. $consumer = new \Kafka\Consumer();
  63. //读取kafka内容并存储在数据库中
  64. $consumer->start(function($topic, $part, $message) {
  65. $json_data = json_decode($message,true);
  66. debug_log('接收记录',$message);
  67. if(empty($json_data))
  68. {
  69. debug_log('接收记录',"解析失败");
  70. return ;
  71. }
  72. $mac = Db::name('ADM_DEV')->where('LOGIN_NAME',$json_data['mac'])->find();
  73. if(empty($mac))
  74. {
  75. debug_log('接收记录',"查找不到该mac地址");
  76. return;
  77. }
  78. if($json_data['methond']=='track')
  79. {
  80. $this->track($json_data);
  81. }
  82. elseif($json_data['methond']=='login')
  83. {
  84. $this->login($json_data);
  85. }
  86. elseif($json_data['methond']=='heartbeat')
  87. {
  88. $this->heartbeat($json_data);
  89. }
  90. else
  91. {
  92. debug_log('接收记录',"类型错误");
  93. return;
  94. }
  95. debug_log('接收记录',"上传成功");
  96. });
  97. }
  98. //如果是轨迹则执行该方法
  99. private function track($data)
  100. {
  101. $mac = $data['mac'];
  102. $content = [];
  103. if($data['gps']['locationState']=="A")
  104. {
  105. $content['GPS_X']=$data['gps']['lat'];
  106. $content['GPS_Y']=$data['gps']['lng'];
  107. }
  108. Db::name('ADM_DEV')->where('LOGIN_NAME',$mac)->update($content);
  109. foreach($data['labels'] as $item)
  110. {
  111. $lable = [];
  112. $lable['RF_FLAGID']=$item['id'];
  113. $lable['RF_ID']=$mac;
  114. $lable['RF_DATE'] = $this->parsings($item['time']);
  115. if($item['event']['entry']==1)
  116. {
  117. $lable['RF_STAT']=1;
  118. }
  119. elseif($item['event']['leave']==1)
  120. {
  121. $lable['RF_STAT']=2;
  122. }else
  123. {
  124. $lable['RF_STAT'] = 0;
  125. }
  126. Db::name('W_DW_RF_RECORD')->insert($lable);
  127. }
  128. }
  129. //如果是登录则执行该方法
  130. private function login($data)
  131. {
  132. $mac = $data['mac'];
  133. $content = [];
  134. if($data['gps']['locationState']=="A")
  135. {
  136. $content['GPS_X']=$data['gps']['lat'];
  137. $content['GPS_Y']=$data['gps']['lng'];
  138. }
  139. $content['IS_ONLINE']=1;
  140. Db::name('ADM_DEV')->where('LOGIN_NAME',$mac)->update($content);
  141. }
  142. //如果是心跳则执行该方法
  143. private function heartbeat($data)
  144. {
  145. $mac = $data['mac'];
  146. $content = [];
  147. $content['IS_ONLINE']=1;
  148. Db::name('ADM_DEV')->where('LOGIN_NAME',$mac)->update($content);
  149. }
  150. //解析时间
  151. private function parsings($date)
  152. {
  153. $getdate = '20'.$date;
  154. $getdate = str_split($getdate,2);
  155. $resout =$getdate[0].$getdate[1]."-".$getdate[2]."-".$getdate[3]." ".$getdate[4].":".$getdate[5].":".$getdate[6];
  156. return $resout;
  157. }
  158. }