123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288 |
- <?php
- class NingboAction extends Action {
-
-
- public function dahua( ){
- Vendor('Workerman352.Autoloader');
- $worker = new \Workerman\Worker("Dahua://0.0.0.0:1234");
- $worker->count = 12;
- $worker->onWorkerStart = function($worker){
- $timeInterval = 60 * 5;//秒
- \Workerman\Lib\Timer::add($timeInterval, function()
- {
- \Jiaruan\DahuaUtil::rlog("timer exec");
- foreach(\Workerman\Connection\TcpConnection::$connections as $connection) {
- if (time() - $connection->recvTime > 10 * 60) {
- \Jiaruan\DahuaUtil::rlog("close idle", $connection->getRemoteIp() . ':' . $connection->getRemotePort());
- $connection->close();
- }
- }
- });
- \Jiaruan\DahuaUtil::rlog("workstart");
- };
-
- $worker->onConnect = function($connection){
- $connection->recvTime = time();
- \Jiaruan\DahuaUtil::rlog('imsync', "[newConn]", $connection->getRemoteIp() . ':' . $connection->getRemotePort());
- };
-
- $worker->onMessage = function($connection, $data){
- $fun = 'fun' . $data['cmd'];
- if (method_exists($this, $fun)){
- $this->$fun($connection, $data);
- } else {
- \Jiaruan\DahuaUtil::rlog("ERR", "fun " . $fun . " not exists");
- }
- };
-
- $worker->onClose = function($connection){
- \Jiaruan\DahuaUtil::rlog('imsync', "[shutConn]", $connection->getRemoteIp() . ':' . $connection->getRemotePort());
- };
-
- $worker->onError = function($connection, $code, $msg)
- {
- \Jiaruan\DahuaUtil::rlog("[ONERR]", $connection->getRemoteIp() . ':' . $connection->getRemotePort(), $code, $msg);
- };
-
- $worker->onBufferFull = function($connection)
- {
- \Jiaruan\DahuaUtil::rlog("[ONFULL]", $connection->getRemoteIp() . ':' . $connection->getRemotePort());
- };
- $worker->onBufferDrain = function($connection)
- {
- \Jiaruan\DahuaUtil::rlog("[ONDRAIN]", $connection->getRemoteIp() . ':' . $connection->getRemotePort());
- };
- $worker->reusePort = true;
- \Workerman\Worker::runAll();
- }
-
-
- private function fun83( $connection, $data ){
- //心跳 AAAAFF04B28FFE830000C984
- \Jiaruan\DahuaUtil::rlog("heartbeat", $data['mac']);
- $this->pushQueue(['methond' => 'heartbeat', 'mac' => $data['mac'], 'time' => time()]);
- $connection->send($data);
- }
-
-
- private function pushQueue( $data ){
- $data = json_encode($data);
- $startTime = microtime(true);
- \Jiaruan\DahuaUtil::rlog('[pushQueue]', $data);
- //return;
-
- $brokerlist = C('KAFKA_BROKER_LIST');
- if (empty($brokerlist)) {
- \Jiaruan\DahuaUtil::rlog("KAFKA_BROKER_LIST must be config!");
- }
-
- $topic = C('ROUTE_INDEX_KAFKA_TOPIC');
-
- if (empty($topic)) {
- \Jiaruan\DahuaUtil::rlog("error TRAVEL_ROUTE_INDEX_KAFKA_TOPIC empty");
- return;
- }
- static $rk;
- if (!extension_loaded('rdkafka')){
- \Jiaruan\DahuaUtil::rlog('pushToKafka fail,extension of rdkafka has not installed!!');
- return false;
- }
- if(!$rk){
- $conf = new Rdkafka\Conf();
- $conf->set('metadata.broker.list', $brokerlist);
- //$conf->set('group.id', $group); 这个参数是消费者的
- //$conf->set('batch.num.messages', 2);
- //$conf->set('linger.ms', 10);
- //$conf->set('log_level', (string) LOG_DEBUG);
- //$conf->set('debug', 'all');
- $conf->setErrorCb(function($producer, $err, $reason) {
- \Jiaruan\DahuaUtil::rlog('err', 'setErrorCb:' . rd_kafka_err2str($err) . ';' . $reason);
- });
- $conf->setDrMsgCb(function($producer, $msg) {
- if($msg->err) {
- \Jiaruan\DahuaUtil::rlog('err', 'setDrMsgCb:' . $msg->errstr());
- } else {
- \Jiaruan\DahuaUtil::rlog('info', "kafka sent ok.");
- }
- });
- $rk = new RdKafka\Producer($conf);
- }
-
- //$rk->setLogLevel(LOG_DEBUG);
- //$rk->addBrokers($brokerlist);
-
- $topic = $rk->newTopic($topic);
-
- $topic->produce(RD_KAFKA_PARTITION_UA, 0, $data);
-
-
- //$rk->poll(20000);
- \Jiaruan\DahuaUtil::rlog('info', "kafka start poll");
-
- $MAX_K = 2;
- $rec = 0;
- while ($rk->getOutQLen() > 30000) {//让回调成功或失败生效
- $rec++;
- if ($rec > $MAX_K) {
- break;
- }
- $rk->poll(10);
- }
-
- /*
- $result = $rk->flush(10000);
- if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
- \Jiaruan\Plate::log('error', 'push alarm err');
- }
- */
- $runTime = (microtime(true) - $startTime) * 1000;
- if ($rec > $MAX_K) {
- \Jiaruan\DahuaUtil::rlog('WARN', 'push perhaps failure runTime:' . intval($runTime) . 'ms');
- } else {
- \Jiaruan\DahuaUtil::rlog('info', 'push OK runTime:' . intval($runTime) . 'ms');
- }
-
-
-
- //sleep(2);
- }
-
-
- private function fun80( $connection, $data ){
- //登录 AAAAFF04B28FFF8000010077F5AAAAFF04B290FF8000010018F7
- $staionInfo = ord($data['buf'][0]);
- $gps = '';
- if (strlen($data['buf']) > 1) {
- $gps = \Jiaruan\DahuaUtil::parseLocation(rtrim(substr($data['buf'], 1), "\0"));//会在右边补null
- }
- $stationType = \Jiaruan\DahuaUtil::getBit($staionInfo, [7]);
- $version = \Jiaruan\DahuaUtil::getBit($staionInfo, ['0-5']);
- // $gpsStr = json_encode(json_encode);
- // \Jiaruan\DahuaUtil::rlog("login", $data['mac'], "stationType:{$stationType}", "version:{$version}", "gps:{$gpsStr}");
-
- $this->pushQueue(['methond' => 'login', 'mac' => $data['mac'], 'time' => time(),
- 'gps' => $gps, 'stationType' => $stationType, 'version' => $version]);
-
- //回复
- /*
- 数据索引 位 取值范围 备注
- 0~5 BCD码 6字节,系统时间,BCD码:YYMMDDHHMMSS,年在先
- 6 7 0/1 设备类型:0-野外基站,1-出入管理
- 6~0 0~126 心跳间隔,单位30S,(取值+1)*30S
- 7 7~0 0~254 野外基站模式RFID离开超时,单位2S,(取值+1)*2S,最大510S,8分钟多
- 8 0~254 射频触发读卡持续时间:单位秒。大端存储,0-射频常开,取值1~254,为射频触发读卡持续时间.(取值+1)×30S
- 9 0~254 白名单控制字节
- 位7~1,保留;
- 位0,白名单开启; 0000 0000
- 10~25 保留
- * */
- $bit0_5 = date('ymdHis', time()); //220928140554
-
- $bit6 = 0;
- $headBeat = 2;// 0 -> (0+1)*30=30秒 2 -> (2+1)*30=90秒
- $stationType = 0;
- \Jiaruan\DahuaUtil::setBit($bit6, ['0-6'], $headBeat);
- \Jiaruan\DahuaUtil::setBit($bit6, [7], $stationType);
-
- $bit7 = 0xf;// f -> (15+1)*2=32秒
- $bit8 = 0;
- $bit9 = 0;
- $bit10 = 0xb;//宁波这样回复的 不知道含义
- $bit11_25 = '';
-
- $data['reply']['buf'] = pack('H12C5a15', $bit0_5, $bit6, $bit7, $bit8, $bit9, $bit10, $bit11_25);
- $connection->send($data);
- }
-
-
- private function fun82( $connection, $data ){
- //确认包 AAAAFF04B28FFF820000F5D4
- \Jiaruan\DahuaUtil::rlog("ack", $data['mac']);
- }
-
-
- private function funfe( $connection, $data ){
- //上报两个读头的id号 客户端断开后 重新连接后不会发这个包 AAAAFFFFFFFC00FE00090AFF04B28FFF04B2904576
- $upk = unpack('H2unknow/H8id1/H8id2', $data['buf']);
- \Jiaruan\DahuaUtil::rlog("ID", $data['mac'], "id1:{$upk['id1']}", "id2:{$upk['id2']}");
- }
-
-
- private function fun84( $connection, $data ){
- //AAAAFF04B28F008400AA000003410F700096D714220930020104700096D708220930020106700096D714220930020109700096D708220930020111700096D714220930020114B05C
- $subpackageLen = 5;//头部5字节是分包
- $labelLen = 11;//每个标签
- $buf = $data['buf'];
-
-
- if (isset($data['lenBug'])) {//把有效数据取出来
- if (strlen($buf) < $subpackageLen + $labelLen) {//没有轨迹
- return;
- }
- $dropLen = (strlen($buf) - $subpackageLen) % $labelLen;
- $buf = substr($buf, 0, strlen($buf) - $dropLen);
- return;//先不处理错误的包了 这行反注释 就会处理错误的包
- }
-
-
- \Jiaruan\DahuaUtil::rlog("track", $data['mac']);
- $gps = '';
- if ((strlen($buf) - $subpackageLen) % $labelLen == 0) {
- //不含gps
- } else if ((strlen($buf) - $subpackageLen - 28) % $labelLen == 0) {//28 gps
- $gps = parseLocation(rtrim(substr($buf, -28), "\0"));//会在右边补null
- $buf = substr($buf, 0, strlen($buf) - 28);
- } else {
- //协议错误
- \Jiaruan\DahuaUtil::rlog('fun84 protocolLen error');
- return;
- }
-
- //轨迹包
- $recordSn = unpack('N', substr($buf, 0, $subpackageLen - 1))[1];
- $subpackageMax = \Jiaruan\DahuaUtil::getBit(ord($buf[$subpackageLen - 1]), ['4-7']);
- $subpackageCrrent = \Jiaruan\DahuaUtil::getBit(ord($buf[$subpackageLen - 1]), ['0-3']);
-
- $buf = substr($buf, $subpackageLen);
- $labelCount = strlen($buf) / $labelLen;
- $allLabel = [];
- for ($i = 0; $i < $labelCount; $i++) {
- $upk = unpack('H8rfid/C1event/H12time', $buf);//4+1+6 11 byte
-
- $item = [];
- $item['id'] = $upk['rfid'];
-
- /*
- 4 0/1 所处位置:0-场外,1-场内
- 3 0/1 离开事件:0-无效,1-离开
- 2 0/1 进入事件:0-无效,1-进入
- 1 0/1 低电事件:0-无效,1-低电
- * */
- $item['event']['dec'] = $upk['event'];
- $item['event']['lowBattery'] = \Jiaruan\DahuaUtil::getBit($upk['event'], [1]);
- $item['event']['entry'] = \Jiaruan\DahuaUtil::getBit($upk['event'], [2]);
- $item['event']['leave'] = \Jiaruan\DahuaUtil::getBit($upk['event'], [3]);
- $item['event']['in'] = \Jiaruan\DahuaUtil::getBit($upk['event'], [4]);
-
- $item['time'] = strtotime(substr(date('Y', time()), 0, 2) . $upk['time']);//221008155600 --> 1665215760
-
- $buf = substr($buf, $labelLen);
- $allLabel[] = $item;
- }
-
- $this->pushQueue(['methond' => 'track', 'mac' => $data['mac'], 'gps' => $gps, 'labels' => $allLabel,
- 'subpackageMax' => $subpackageMax, 'subpackageCrrent' => $subpackageCrrent, 'recordSn' => $recordSn]);
-
- // $gpsStr = json_encode($gps);
- // \Jiaruan\DahuaUtil::rlog("gps:{$gpsStr}", "recordSn:{$recordSn}",
- // "subpackageMax:{$subpackageMax}", "subpackageCrrent:{$subpackageCrrent}", "labelCount:{$labelCount}");
- $connection->send($data);
- }
-
- }
|