count = 12; $worker->onWorkerStart = function($worker){ $timeInterval = 60 * 5;//秒 \Workerman\Lib\Timer::add($timeInterval, function() { \Jiaruan\DahuaUtil::rlog("timer exec"); foreach(\Workerman\Connection\TcpConnection::$connections as $connection) { if (time() - $connection->recvTime > 10 * 60) { \Jiaruan\DahuaUtil::rlog("close idle", $connection->getRemoteIp() . ':' . $connection->getRemotePort()); $connection->close(); } } }); \Jiaruan\DahuaUtil::rlog("workstart"); }; $worker->onConnect = function($connection){ $connection->recvTime = time(); \Jiaruan\DahuaUtil::rlog('imsync', "[newConn]", $connection->getRemoteIp() . ':' . $connection->getRemotePort()); }; $worker->onMessage = function($connection, $data){ $fun = 'fun' . $data['cmd']; if (method_exists($this, $fun)){ $this->$fun($connection, $data); } else { \Jiaruan\DahuaUtil::rlog("ERR", "fun " . $fun . " not exists"); } }; $worker->onClose = function($connection){ \Jiaruan\DahuaUtil::rlog('imsync', "[shutConn]", $connection->getRemoteIp() . ':' . $connection->getRemotePort()); }; $worker->onError = function($connection, $code, $msg) { \Jiaruan\DahuaUtil::rlog("[ONERR]", $connection->getRemoteIp() . ':' . $connection->getRemotePort(), $code, $msg); }; $worker->onBufferFull = function($connection) { \Jiaruan\DahuaUtil::rlog("[ONFULL]", $connection->getRemoteIp() . ':' . $connection->getRemotePort()); }; $worker->onBufferDrain = function($connection) { \Jiaruan\DahuaUtil::rlog("[ONDRAIN]", $connection->getRemoteIp() . ':' . $connection->getRemotePort()); }; $worker->reusePort = true; \Workerman\Worker::runAll(); } private function fun83( $connection, $data ){ //心跳 AAAAFF04B28FFE830000C984 \Jiaruan\DahuaUtil::rlog("heartbeat", $data['mac']); $this->pushQueue(['methond' => 'heartbeat', 'mac' => $data['mac'], 'time' => time()]); $connection->send($data); } private function pushQueue( $data ){ $data = json_encode($data); $startTime = microtime(true); \Jiaruan\DahuaUtil::rlog('[pushQueue]', $data); //return; $brokerlist = C('KAFKA_BROKER_LIST'); if (empty($brokerlist)) { \Jiaruan\DahuaUtil::rlog("KAFKA_BROKER_LIST must be config!"); } $topic = C('ROUTE_INDEX_KAFKA_TOPIC'); if (empty($topic)) { \Jiaruan\DahuaUtil::rlog("error TRAVEL_ROUTE_INDEX_KAFKA_TOPIC empty"); return; } static $rk; if (!extension_loaded('rdkafka')){ \Jiaruan\DahuaUtil::rlog('pushToKafka fail,extension of rdkafka has not installed!!'); return false; } if(!$rk){ $conf = new Rdkafka\Conf(); $conf->set('metadata.broker.list', $brokerlist); //$conf->set('group.id', $group); 这个参数是消费者的 //$conf->set('batch.num.messages', 2); //$conf->set('linger.ms', 10); //$conf->set('log_level', (string) LOG_DEBUG); //$conf->set('debug', 'all'); $conf->setErrorCb(function($producer, $err, $reason) { \Jiaruan\DahuaUtil::rlog('err', 'setErrorCb:' . rd_kafka_err2str($err) . ';' . $reason); }); $conf->setDrMsgCb(function($producer, $msg) { if($msg->err) { \Jiaruan\DahuaUtil::rlog('err', 'setDrMsgCb:' . $msg->errstr()); } else { \Jiaruan\DahuaUtil::rlog('info', "kafka sent ok."); } }); $rk = new RdKafka\Producer($conf); } //$rk->setLogLevel(LOG_DEBUG); //$rk->addBrokers($brokerlist); $topic = $rk->newTopic($topic); $topic->produce(RD_KAFKA_PARTITION_UA, 0, $data); //$rk->poll(20000); \Jiaruan\DahuaUtil::rlog('info', "kafka start poll"); $MAX_K = 2; $rec = 0; while ($rk->getOutQLen() > 30000) {//让回调成功或失败生效 $rec++; if ($rec > $MAX_K) { break; } $rk->poll(10); } /* $result = $rk->flush(10000); if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) { \Jiaruan\Plate::log('error', 'push alarm err'); } */ $runTime = (microtime(true) - $startTime) * 1000; if ($rec > $MAX_K) { \Jiaruan\DahuaUtil::rlog('WARN', 'push perhaps failure runTime:' . intval($runTime) . 'ms'); } else { \Jiaruan\DahuaUtil::rlog('info', 'push OK runTime:' . intval($runTime) . 'ms'); } //sleep(2); } private function fun80( $connection, $data ){ //登录 AAAAFF04B28FFF8000010077F5AAAAFF04B290FF8000010018F7 $staionInfo = ord($data['buf'][0]); $gps = ''; if (strlen($data['buf']) > 1) { $gps = \Jiaruan\DahuaUtil::parseLocation(rtrim(substr($data['buf'], 1), "\0"));//会在右边补null } $stationType = \Jiaruan\DahuaUtil::getBit($staionInfo, [7]); $version = \Jiaruan\DahuaUtil::getBit($staionInfo, ['0-5']); // $gpsStr = json_encode(json_encode); // \Jiaruan\DahuaUtil::rlog("login", $data['mac'], "stationType:{$stationType}", "version:{$version}", "gps:{$gpsStr}"); $this->pushQueue(['methond' => 'login', 'mac' => $data['mac'], 'time' => time(), 'gps' => $gps, 'stationType' => $stationType, 'version' => $version]); //回复 /* 数据索引 位 取值范围 备注 0~5 BCD码 6字节,系统时间,BCD码:YYMMDDHHMMSS,年在先 6 7 0/1 设备类型:0-野外基站,1-出入管理 6~0 0~126 心跳间隔,单位30S,(取值+1)*30S 7 7~0 0~254 野外基站模式RFID离开超时,单位2S,(取值+1)*2S,最大510S,8分钟多 8 0~254 射频触发读卡持续时间:单位秒。大端存储,0-射频常开,取值1~254,为射频触发读卡持续时间.(取值+1)×30S 9 0~254 白名单控制字节 位7~1,保留; 位0,白名单开启; 0000 0000 10~25 保留 * */ $bit0_5 = date('ymdHis', time()); //220928140554 $bit6 = 0; $headBeat = 2;// 0 -> (0+1)*30=30秒 2 -> (2+1)*30=90秒 $stationType = 0; \Jiaruan\DahuaUtil::setBit($bit6, ['0-6'], $headBeat); \Jiaruan\DahuaUtil::setBit($bit6, [7], $stationType); $bit7 = 0xf;// f -> (15+1)*2=32秒 $bit8 = 0; $bit9 = 0; $bit10 = 0xb;//宁波这样回复的 不知道含义 $bit11_25 = ''; $data['reply']['buf'] = pack('H12C5a15', $bit0_5, $bit6, $bit7, $bit8, $bit9, $bit10, $bit11_25); $connection->send($data); } private function fun82( $connection, $data ){ //确认包 AAAAFF04B28FFF820000F5D4 \Jiaruan\DahuaUtil::rlog("ack", $data['mac']); } private function funfe( $connection, $data ){ //上报两个读头的id号 客户端断开后 重新连接后不会发这个包 AAAAFFFFFFFC00FE00090AFF04B28FFF04B2904576 $upk = unpack('H2unknow/H8id1/H8id2', $data['buf']); \Jiaruan\DahuaUtil::rlog("ID", $data['mac'], "id1:{$upk['id1']}", "id2:{$upk['id2']}"); } private function fun84( $connection, $data ){ //AAAAFF04B28F008400AA000003410F700096D714220930020104700096D708220930020106700096D714220930020109700096D708220930020111700096D714220930020114B05C $subpackageLen = 5;//头部5字节是分包 $labelLen = 11;//每个标签 $buf = $data['buf']; if (isset($data['lenBug'])) {//把有效数据取出来 if (strlen($buf) < $subpackageLen + $labelLen) {//没有轨迹 return; } $dropLen = (strlen($buf) - $subpackageLen) % $labelLen; $buf = substr($buf, 0, strlen($buf) - $dropLen); return;//先不处理错误的包了 这行反注释 就会处理错误的包 } \Jiaruan\DahuaUtil::rlog("track", $data['mac']); $gps = ''; if ((strlen($buf) - $subpackageLen) % $labelLen == 0) { //不含gps } else if ((strlen($buf) - $subpackageLen - 28) % $labelLen == 0) {//28 gps $gps = parseLocation(rtrim(substr($buf, -28), "\0"));//会在右边补null $buf = substr($buf, 0, strlen($buf) - 28); } else { //协议错误 \Jiaruan\DahuaUtil::rlog('fun84 protocolLen error'); return; } //轨迹包 $recordSn = unpack('N', substr($buf, 0, $subpackageLen - 1))[1]; $subpackageMax = \Jiaruan\DahuaUtil::getBit(ord($buf[$subpackageLen - 1]), ['4-7']); $subpackageCrrent = \Jiaruan\DahuaUtil::getBit(ord($buf[$subpackageLen - 1]), ['0-3']); $buf = substr($buf, $subpackageLen); $labelCount = strlen($buf) / $labelLen; $allLabel = []; for ($i = 0; $i < $labelCount; $i++) { $upk = unpack('H8rfid/C1event/H12time', $buf);//4+1+6 11 byte $item = []; $item['id'] = $upk['rfid']; /* 4 0/1 所处位置:0-场外,1-场内 3 0/1 离开事件:0-无效,1-离开 2 0/1 进入事件:0-无效,1-进入 1 0/1 低电事件:0-无效,1-低电 * */ $item['event']['dec'] = $upk['event']; $item['event']['lowBattery'] = \Jiaruan\DahuaUtil::getBit($upk['event'], [1]); $item['event']['entry'] = \Jiaruan\DahuaUtil::getBit($upk['event'], [2]); $item['event']['leave'] = \Jiaruan\DahuaUtil::getBit($upk['event'], [3]); $item['event']['in'] = \Jiaruan\DahuaUtil::getBit($upk['event'], [4]); $item['time'] = strtotime(substr(date('Y', time()), 0, 2) . $upk['time']);//221008155600 --> 1665215760 $buf = substr($buf, $labelLen); $allLabel[] = $item; } $this->pushQueue(['methond' => 'track', 'mac' => $data['mac'], 'gps' => $gps, 'labels' => $allLabel, 'subpackageMax' => $subpackageMax, 'subpackageCrrent' => $subpackageCrrent, 'recordSn' => $recordSn]); // $gpsStr = json_encode($gps); // \Jiaruan\DahuaUtil::rlog("gps:{$gpsStr}", "recordSn:{$recordSn}", // "subpackageMax:{$subpackageMax}", "subpackageCrrent:{$subpackageCrrent}", "labelCount:{$labelCount}"); $connection->send($data); } }