NingboAction.class.php 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. <?php
  2. class NingboAction extends Action {
  3. public function dahua( ){
  4. Vendor('Workerman352.Autoloader');
  5. $worker = new \Workerman\Worker("Dahua://0.0.0.0:1234");
  6. $worker->count = 12;
  7. $worker->onWorkerStart = function($worker){
  8. $timeInterval = 60 * 5;//秒
  9. \Workerman\Lib\Timer::add($timeInterval, function()
  10. {
  11. \Jiaruan\DahuaUtil::rlog("timer exec");
  12. foreach(\Workerman\Connection\TcpConnection::$connections as $connection) {
  13. if (time() - $connection->recvTime > 10 * 60) {
  14. \Jiaruan\DahuaUtil::rlog("close idle", $connection->getRemoteIp() . ':' . $connection->getRemotePort());
  15. $connection->close();
  16. }
  17. }
  18. });
  19. \Jiaruan\DahuaUtil::rlog("workstart");
  20. };
  21. $worker->onConnect = function($connection){
  22. $connection->recvTime = time();
  23. \Jiaruan\DahuaUtil::rlog('imsync', "[newConn]", $connection->getRemoteIp() . ':' . $connection->getRemotePort());
  24. };
  25. $worker->onMessage = function($connection, $data){
  26. $fun = 'fun' . $data['cmd'];
  27. if (method_exists($this, $fun)){
  28. $this->$fun($connection, $data);
  29. } else {
  30. \Jiaruan\DahuaUtil::rlog("ERR", "fun " . $fun . " not exists");
  31. }
  32. };
  33. $worker->onClose = function($connection){
  34. \Jiaruan\DahuaUtil::rlog('imsync', "[shutConn]", $connection->getRemoteIp() . ':' . $connection->getRemotePort());
  35. };
  36. $worker->onError = function($connection, $code, $msg)
  37. {
  38. \Jiaruan\DahuaUtil::rlog("[ONERR]", $connection->getRemoteIp() . ':' . $connection->getRemotePort(), $code, $msg);
  39. };
  40. $worker->onBufferFull = function($connection)
  41. {
  42. \Jiaruan\DahuaUtil::rlog("[ONFULL]", $connection->getRemoteIp() . ':' . $connection->getRemotePort());
  43. };
  44. $worker->onBufferDrain = function($connection)
  45. {
  46. \Jiaruan\DahuaUtil::rlog("[ONDRAIN]", $connection->getRemoteIp() . ':' . $connection->getRemotePort());
  47. };
  48. $worker->reusePort = true;
  49. \Workerman\Worker::runAll();
  50. }
  51. private function fun83( $connection, $data ){
  52. //心跳 AAAAFF04B28FFE830000C984
  53. \Jiaruan\DahuaUtil::rlog("heartbeat", $data['mac']);
  54. $this->pushQueue(['methond' => 'heartbeat', 'mac' => $data['mac'], 'time' => time()]);
  55. $connection->send($data);
  56. }
  57. private function pushQueue( $data ){
  58. $data = json_encode($data);
  59. $startTime = microtime(true);
  60. \Jiaruan\DahuaUtil::rlog('[pushQueue]', $data);
  61. //return;
  62. $brokerlist = C('KAFKA_BROKER_LIST');
  63. if (empty($brokerlist)) {
  64. \Jiaruan\DahuaUtil::rlog("KAFKA_BROKER_LIST must be config!");
  65. }
  66. $topic = C('ROUTE_INDEX_KAFKA_TOPIC');
  67. if (empty($topic)) {
  68. \Jiaruan\DahuaUtil::rlog("error TRAVEL_ROUTE_INDEX_KAFKA_TOPIC empty");
  69. return;
  70. }
  71. static $rk;
  72. if (!extension_loaded('rdkafka')){
  73. \Jiaruan\DahuaUtil::rlog('pushToKafka fail,extension of rdkafka has not installed!!');
  74. return false;
  75. }
  76. if(!$rk){
  77. $conf = new Rdkafka\Conf();
  78. $conf->set('metadata.broker.list', $brokerlist);
  79. //$conf->set('group.id', $group); 这个参数是消费者的
  80. //$conf->set('batch.num.messages', 2);
  81. //$conf->set('linger.ms', 10);
  82. //$conf->set('log_level', (string) LOG_DEBUG);
  83. //$conf->set('debug', 'all');
  84. $conf->setErrorCb(function($producer, $err, $reason) {
  85. \Jiaruan\DahuaUtil::rlog('err', 'setErrorCb:' . rd_kafka_err2str($err) . ';' . $reason);
  86. });
  87. $conf->setDrMsgCb(function($producer, $msg) {
  88. if($msg->err) {
  89. \Jiaruan\DahuaUtil::rlog('err', 'setDrMsgCb:' . $msg->errstr());
  90. } else {
  91. \Jiaruan\DahuaUtil::rlog('info', "kafka sent ok.");
  92. }
  93. });
  94. $rk = new RdKafka\Producer($conf);
  95. }
  96. //$rk->setLogLevel(LOG_DEBUG);
  97. //$rk->addBrokers($brokerlist);
  98. $topic = $rk->newTopic($topic);
  99. $topic->produce(RD_KAFKA_PARTITION_UA, 0, $data);
  100. //$rk->poll(20000);
  101. \Jiaruan\DahuaUtil::rlog('info', "kafka start poll");
  102. $MAX_K = 2;
  103. $rec = 0;
  104. while ($rk->getOutQLen() > 30000) {//让回调成功或失败生效
  105. $rec++;
  106. if ($rec > $MAX_K) {
  107. break;
  108. }
  109. $rk->poll(10);
  110. }
  111. /*
  112. $result = $rk->flush(10000);
  113. if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
  114. \Jiaruan\Plate::log('error', 'push alarm err');
  115. }
  116. */
  117. $runTime = (microtime(true) - $startTime) * 1000;
  118. if ($rec > $MAX_K) {
  119. \Jiaruan\DahuaUtil::rlog('WARN', 'push perhaps failure runTime:' . intval($runTime) . 'ms');
  120. } else {
  121. \Jiaruan\DahuaUtil::rlog('info', 'push OK runTime:' . intval($runTime) . 'ms');
  122. }
  123. //sleep(2);
  124. }
  125. private function fun80( $connection, $data ){
  126. //登录 AAAAFF04B28FFF8000010077F5AAAAFF04B290FF8000010018F7
  127. $staionInfo = ord($data['buf'][0]);
  128. $gps = '';
  129. if (strlen($data['buf']) > 1) {
  130. $gps = \Jiaruan\DahuaUtil::parseLocation(rtrim(substr($data['buf'], 1), "\0"));//会在右边补null
  131. }
  132. $stationType = \Jiaruan\DahuaUtil::getBit($staionInfo, [7]);
  133. $version = \Jiaruan\DahuaUtil::getBit($staionInfo, ['0-5']);
  134. // $gpsStr = json_encode(json_encode);
  135. // \Jiaruan\DahuaUtil::rlog("login", $data['mac'], "stationType:{$stationType}", "version:{$version}", "gps:{$gpsStr}");
  136. $this->pushQueue(['methond' => 'login', 'mac' => $data['mac'], 'time' => time(),
  137. 'gps' => $gps, 'stationType' => $stationType, 'version' => $version]);
  138. //回复
  139. /*
  140. 数据索引 位 取值范围 备注
  141. 0~5 BCD码 6字节,系统时间,BCD码:YYMMDDHHMMSS,年在先
  142. 6 7 0/1 设备类型:0-野外基站,1-出入管理
  143. 6~0 0~126 心跳间隔,单位30S,(取值+1)*30S
  144. 7 7~0 0~254 野外基站模式RFID离开超时,单位2S,(取值+1)*2S,最大510S,8分钟多
  145. 8 0~254 射频触发读卡持续时间:单位秒。大端存储,0-射频常开,取值1~254,为射频触发读卡持续时间.(取值+1)×30S
  146. 9 0~254 白名单控制字节
  147. 位7~1,保留;
  148. 位0,白名单开启; 0000 0000
  149. 10~25 保留
  150. * */
  151. $bit0_5 = date('ymdHis', time()); //220928140554
  152. $bit6 = 0;
  153. $headBeat = 2;// 0 -> (0+1)*30=30秒 2 -> (2+1)*30=90秒
  154. $stationType = 0;
  155. \Jiaruan\DahuaUtil::setBit($bit6, ['0-6'], $headBeat);
  156. \Jiaruan\DahuaUtil::setBit($bit6, [7], $stationType);
  157. $bit7 = 0xf;// f -> (15+1)*2=32秒
  158. $bit8 = 0;
  159. $bit9 = 0;
  160. $bit10 = 0xb;//宁波这样回复的 不知道含义
  161. $bit11_25 = '';
  162. $data['reply']['buf'] = pack('H12C5a15', $bit0_5, $bit6, $bit7, $bit8, $bit9, $bit10, $bit11_25);
  163. $connection->send($data);
  164. }
  165. private function fun82( $connection, $data ){
  166. //确认包 AAAAFF04B28FFF820000F5D4
  167. \Jiaruan\DahuaUtil::rlog("ack", $data['mac']);
  168. }
  169. private function funfe( $connection, $data ){
  170. //上报两个读头的id号 客户端断开后 重新连接后不会发这个包 AAAAFFFFFFFC00FE00090AFF04B28FFF04B2904576
  171. $upk = unpack('H2unknow/H8id1/H8id2', $data['buf']);
  172. \Jiaruan\DahuaUtil::rlog("ID", $data['mac'], "id1:{$upk['id1']}", "id2:{$upk['id2']}");
  173. }
  174. private function fun84( $connection, $data ){
  175. //AAAAFF04B28F008400AA000003410F700096D714220930020104700096D708220930020106700096D714220930020109700096D708220930020111700096D714220930020114B05C
  176. $subpackageLen = 5;//头部5字节是分包
  177. $labelLen = 11;//每个标签
  178. $buf = $data['buf'];
  179. if (isset($data['lenBug'])) {//把有效数据取出来
  180. if (strlen($buf) < $subpackageLen + $labelLen) {//没有轨迹
  181. return;
  182. }
  183. $dropLen = (strlen($buf) - $subpackageLen) % $labelLen;
  184. $buf = substr($buf, 0, strlen($buf) - $dropLen);
  185. return;//先不处理错误的包了 这行反注释 就会处理错误的包
  186. }
  187. \Jiaruan\DahuaUtil::rlog("track", $data['mac']);
  188. $gps = '';
  189. if ((strlen($buf) - $subpackageLen) % $labelLen == 0) {
  190. //不含gps
  191. } else if ((strlen($buf) - $subpackageLen - 28) % $labelLen == 0) {//28 gps
  192. $gps = parseLocation(rtrim(substr($buf, -28), "\0"));//会在右边补null
  193. $buf = substr($buf, 0, strlen($buf) - 28);
  194. } else {
  195. //协议错误
  196. \Jiaruan\DahuaUtil::rlog('fun84 protocolLen error');
  197. return;
  198. }
  199. //轨迹包
  200. $recordSn = unpack('N', substr($buf, 0, $subpackageLen - 1))[1];
  201. $subpackageMax = \Jiaruan\DahuaUtil::getBit(ord($buf[$subpackageLen - 1]), ['4-7']);
  202. $subpackageCrrent = \Jiaruan\DahuaUtil::getBit(ord($buf[$subpackageLen - 1]), ['0-3']);
  203. $buf = substr($buf, $subpackageLen);
  204. $labelCount = strlen($buf) / $labelLen;
  205. $allLabel = [];
  206. for ($i = 0; $i < $labelCount; $i++) {
  207. $upk = unpack('H8rfid/C1event/H12time', $buf);//4+1+6 11 byte
  208. $item = [];
  209. $item['id'] = $upk['rfid'];
  210. /*
  211. 4 0/1 所处位置:0-场外,1-场内
  212. 3 0/1 离开事件:0-无效,1-离开
  213. 2 0/1 进入事件:0-无效,1-进入
  214. 1 0/1 低电事件:0-无效,1-低电
  215. * */
  216. $item['event']['dec'] = $upk['event'];
  217. $item['event']['lowBattery'] = \Jiaruan\DahuaUtil::getBit($upk['event'], [1]);
  218. $item['event']['entry'] = \Jiaruan\DahuaUtil::getBit($upk['event'], [2]);
  219. $item['event']['leave'] = \Jiaruan\DahuaUtil::getBit($upk['event'], [3]);
  220. $item['event']['in'] = \Jiaruan\DahuaUtil::getBit($upk['event'], [4]);
  221. $item['time'] = strtotime(substr(date('Y', time()), 0, 2) . $upk['time']);//221008155600 --> 1665215760
  222. $buf = substr($buf, $labelLen);
  223. $allLabel[] = $item;
  224. }
  225. $this->pushQueue(['methond' => 'track', 'mac' => $data['mac'], 'gps' => $gps, 'labels' => $allLabel,
  226. 'subpackageMax' => $subpackageMax, 'subpackageCrrent' => $subpackageCrrent, 'recordSn' => $recordSn]);
  227. // $gpsStr = json_encode($gps);
  228. // \Jiaruan\DahuaUtil::rlog("gps:{$gpsStr}", "recordSn:{$recordSn}",
  229. // "subpackageMax:{$subpackageMax}", "subpackageCrrent:{$subpackageCrrent}", "labelCount:{$labelCount}");
  230. $connection->send($data);
  231. }
  232. }