Parcourir la source

Merge branch 'master' of http://gogs.renlianiot.com:4000/zmcoding/station-test-api

nana_sen il y a 1 an
Parent
commit
d4bb1953ea

+ 234 - 0
catch/api/service/DahuaUtil.php

@@ -0,0 +1,234 @@
+<?php
+
+
+class DahuaUtil
+{
+    /*位操作*/
+    public static function getb($val, $pos)
+    {
+        //获取整数$val第$pos位的值
+        return (($val & 1 << $pos) > 0) ? 1 : 0;
+    }
+
+    public static function setb(&$val, $pos, $newVal)
+    {
+        //设置整数$val第$pos位的值 $newVal为0或1
+        if ($newVal) {//设置为1
+            $val |= (1 << $pos);
+        } else {//设置为0
+            $val &= ~(1 << $pos);
+        }
+    }
+
+    public static function getBit($val, $posArr)
+    {
+        //$val整数  $posArr位置数组[1,2,3]  或  $posArr[0]='1-3'; $posArr = [0, 1, 2, 3]
+        //返回值 整数
+        if (strpos($posArr[0], '-')) {
+            $tmp = explode('-', $posArr[0], 2);
+            $posArr = range($tmp[0], $tmp[1]);
+        }
+        $ret = '';
+        foreach ($posArr as $pos) {
+            $ret = DahuaUtil::getb($val, $pos) . $ret;
+        }
+        return bindec($ret);
+    }
+
+    public static function setBit(&$val, $posArr, $newVal)
+    {
+        //$val 整数 旧值
+        //$posArr 数组 要更改的位 $posArr[0]='1-3'; $posArr = [0, 1, 2, 3]
+        //$newVal 整数 新值
+        if (strpos($posArr[0], '-')) {
+            $tmp = explode('-', $posArr[0], 2);
+            $posArr = range($tmp[0], $tmp[1]);
+        }
+        $ival = strrev(decbin($newVal));//反转下
+        $k = 0;
+        foreach ($posArr as $pos) {
+            if (!isset($ival[$k])) {
+                $ival[$k] = 0;
+            }
+            DahuaUtil::setb($val, $pos, $ival[$k]);
+            $k++;
+        }
+    }
+
+    /*位操作*/
+
+
+    public static function parseLocation($str)
+    {
+        $location_arr = explode(',', $str);
+
+        $location = [];
+        $location['locationState'] = $location_arr[0];
+        $lat_d = substr($location_arr[1], 0, 2);
+        $lat_m = substr($location_arr[1], 2);
+        $location['lat'] = $lat_d + $lat_m * 60;
+        $location['latType'] = $location_arr[2];
+        $lng_d = substr($location_arr[3], 0, 3);
+        $lng_m = substr($location_arr[3], 3);
+        $location['lng'] = $lng_d + $lng_m * 60;
+        $location['lngType'] = $location_arr[4];
+        return $location;
+    }
+
+    public static function rlog(...$args)
+    {
+        //函数参数是可变参数 使用前查下static开始的大写变量配置是否正确
+        //rlog('info', [1,2,3], $a = null, '[]', $this, false);
+        $args = func_get_args();
+//        if (empty($args[0])) {
+//            return;
+//        }
+        static $LOG_CONSOLE = true;//是否输出到控制台 fpm需要为false cli可以为true
+        static $LOG_NAME = "nibo_synch_mysql.log";//值为空时 不写入文件
+        static $LOG_SIZE = 128 * 1024 * 1024;//文件最大尺寸 大于这个尺寸时 会生成个后缀.old的文件
+
+        static $LOG_CACHE = false;//是否缓存日志内容 用于批量写入文件 如需强制刷新第一个参数传sync
+        static $CACHE_DURATION = 10;//缓存最大时间 秒
+        static $CACHE_SIZE = 1024;//缓存大小
+        static $cacheStartTime = 0;
+        static $cacheBuf = '';
+
+        static $LOG_TIMES = 10;//调用这个函数最大次数 超过次数或$logCount==1 会判断下文件大小 决定是否新生成文件
+        static $logCount = 0;
+
+        static $MAX_LEN = 5048;//数据不能超过这个  不然就截断了
+
+        $sync = false;//如果是true 使用$LOG_CACHE时会把数据保存到磁盘
+
+        $implicit0 = ['imsync', 'imtrace'];//这个函数的参数0 隐藏用法
+        $buf = '';
+        if (count($args) == 1 && $args[0] == "\n") {//只有换行时 不写入时间戳了
+            $buf = "\n";
+        } else {
+            $sync = strtolower($args[0]) == $implicit0[0];
+
+            $pid = '';//进程id
+            if (function_exists('posix_getpid')) {
+                $pid = ' ' . posix_getpid() . ' ';
+            }
+            $fileLine = '';//文件名:行号
+            {
+                $debug = debug_backtrace();
+                $backtrace = 0;
+                if (strpos($args[0], $implicit0[1]) === 0) {
+                    $backtrace = intval(str_replace($implicit0[1], '', $args[0]));
+                    unset($args[0]);
+                } else if ($sync) {
+                    unset($args[0]);
+                }
+                $fileLine = ($pid == '' ? ' ' : '') . basename($debug[$backtrace]['file'])
+                    . ':' . $debug[$backtrace]['line'] . '';
+
+            }
+
+            $allPara = '';
+            foreach ($args as $para) {
+                if (is_array($para)) {
+                    $allPara .= json_encode($para) . ' ';
+                } else if (is_object($para)) {
+                    if (method_exists($para, '__toString')) {
+                        $allPara .= $para . ' ';
+                    } else {
+                        $allPara .= get_class($para) . json_encode($para) . ' ';
+                    }
+                } else if (is_bool($para)) {
+                    $allPara .= $para ? 'true ' : 'false ';
+                } else if (is_null($para)) {
+                    $allPara .= 'null ';
+                } else {
+                    $allPara .= $para . ' ';
+                }
+            }
+
+            $len = strlen($allPara);
+            if ($len > $MAX_LEN) {
+                $allPara = substr($allPara, 0, $MAX_LEN) . "({$len})......";
+            }
+
+            $buf = "[" . date("y-m-d H:i:s") . "{$pid}{$fileLine}]" . $allPara . "\n";
+        }
+
+        $logCount++;
+        if (!empty($LOG_NAME)) {
+            if ($LOG_CACHE) {
+                if ($cacheBuf == '') {
+                    $cacheStartTime = time();
+                }
+                $cacheBuf .= $buf;
+                //超过缓存尺寸 或者 超过缓存时长 写缓存到文件
+                if (strlen($cacheBuf) > $CACHE_SIZE || time() - $cacheStartTime > $CACHE_DURATION) {
+                    $cacheStartTime = time();
+                    goto write;
+                } else {
+                    if ($sync) {
+                        goto write;
+                    } else {
+                        goto skipWrite;
+                    }
+                }
+            } else {
+                $cacheBuf = $buf;
+            }
+            write: {
+                if ($logCount > 100) {//会有缓存
+                    clearstatcache();
+                }
+                //超过尺寸后 删除旧文件 把新文件重命名为旧文件
+                if (($logCount == 1 || $logCount > $LOG_TIMES) && filesize($LOG_NAME) > $LOG_SIZE) {
+
+                    //获取独占锁
+                    $fp = fopen($LOG_NAME . '.lock', 'a');
+                    $k = 0;
+                    do {
+                        $isLock = flocK($fp, LOCK_EX);
+                        $k++;
+                        if (!$isLock && $k > 1000) {
+                            echo "lock 1000\n";
+                            goto PUT;
+                        }
+                    } while (!$isLock);
+
+                    //另外一个进程进入时 重新判断下
+                    clearstatcache();
+                    if (filesize($LOG_NAME) <= $LOG_SIZE) {
+                        goto UN;
+                    }
+
+                    //删除旧文件和重命名文件
+                    $oldLogName = $LOG_NAME . '.old';
+                    if (file_exists($oldLogName)) {
+                        if (!unlink($oldLogName)) {
+                            echo "unlink err\n";
+                        }
+                    }
+                    if (!rename($LOG_NAME, $oldLogName)) {
+                        echo "rename err\n";
+                    }
+
+                    //解锁
+                    UN:
+                    flock($fp, LOCK_UN);
+                    fclose($fp);
+
+
+                    $logCount = 0;
+                }
+                PUT:
+                if (!file_put_contents($LOG_NAME, $cacheBuf, FILE_APPEND)) {
+                    echo "file_put_contents err\n";
+                }
+                $cacheBuf = '';
+            }
+            skipWrite:{
+            }
+        }
+        if ($LOG_CONSOLE) {
+            echo $buf;
+        }
+    }
+}

+ 1 - 1
catch/api/service/dispose.php

@@ -541,7 +541,7 @@ public function get_station_config($mac){
         "history_filter_signal"=>82,//历史记录过滤的信号强度
         "init_data"=>3, //需要三条数据确定初始方向
         "change_data"=>5, //需要5条数据确定变换的方向
-        "timeout"=>6,
+        "timeout"=>6,//信号超时服务时间
         "second_create_res"=>[
 
         ]//二次根据历史记录生成配置 数组中的参数配置  start_time,end_time,dir 1 前 2后;

+ 216 - 70
catch/api/service/mysqlToKafuka01.php

@@ -1,8 +1,13 @@
 <?php
 namespace catchAdmin\api\service;
-
+include "./DahuaUtil.php";
+ini_set('memory_limit', '1024M');
+use Exception;
+use thans\jwt\claim\Expiration;
 use think\facade\Cache;
+use think\facade\Config;
 use think\facade\Db;
+use think\facade\Env;
 
 //脚本衢州 目前不使用
  //mysql 监听器
@@ -11,20 +16,47 @@ class mysqlToKafuka01
    //1 kafka,2  redis
     private $type;
     private $redis;
+    private $rk;
+    private $topic;
+  
     public function __construct($type,$redis)
     {
        $this->type=$type;
        $this->redis=$redis;
+       
     }
     
 
-    
+
     public function sql_monitor(){
-        $tables=["fjd_zcdj","fjdc_cbc_zcdj"];
+        if (!extension_loaded('rdkafka')){
+            $this->write_log('pushToKafka fail,extension of rdkafka has not installed!!'.PHP_EOL);
+            $this->write_log('请重新安装kafka扩展');
+		    return false;
+		}
+		if (!extension_loaded('redis')){
+						
+            $this->write_log('redis fail,extension of rdkafka has not installed!!'.PHP_EOL);
+            $this->write_log('请重新安装redis扩展');
+			return false;
+		}
+
+
+        $tables=["fjd_zcdj"];
+        $this->write_log("开始同步数据库:fjd_zcdj");
         while(true){
-            foreach($tables as $item){
-                $this->initData($item);
+            try{
+                foreach($tables as $item){
+                    
+                    $this->initData($item);
+                }
+
+            }catch(Exception $e){
+                $this->write_log("同步数据库抛出异常:".$e->getMessage());
+                $this->write_log("程序强行暂停");
+                exit(0);
             }
+           
         }
 
     }
@@ -34,42 +66,82 @@ class mysqlToKafuka01
      * @return void
      */
     public function initData($table){
+        if($this->type==1){
+            $this->init_kafka();
+        }
+
         $time=$this->getUpdateTime($table,1);
-      
-        $list=[];
+
+        $this->write_log("获取数据库同步最新时间".$time);
         if(empty($time)){
-            $time=time();
+            $this->write_log("获取失败,无最新时间,开始初始化将当前所有数据进行同步");
+            $limit=100;
+            $count= Db::name($table)->count();
+            if($count==0){
+                $this->write_log("当前数据库中无数据,开始监听");
+                return;
+            }
+            $pageTotal= ceil($count/$limit);
+            $this->write_log("当前共有数据.$count.条,按照.$limit.条数据为一页,共计.$pageTotal.页");
+            $this->write_log("同步开始");
+            for($i=1;$i<=$pageTotal;$i++){
+                $this->write_log("第.$i.页进行同步.....");
+                $list= Db::name($table)->order("ZHGXSJ","asc")->page($i)->limit($limit)->select()->toArray();
+                
+                $this->setData($table,$list);
+                if($i==$pageTotal){
+                    $time= strtotime($list[count($list)-1]["ZHGXSJ"]);
+                    $this->getUpdateTime($table,2,$time);
+                }
+                $this->write_log("第.$i.页进行同步完成");
+            }
+            $this->write_log("同步全部完成.当前时间:".date("Y-m-d H:i:s",$time));
+            $this->write_log("开始监听最新数据");
             
-            $list= Db::name($table)->where("ZHGXSJ","<=",date("Y-m-d H:i:s", $time))->order("ZHGXSJ","desc")->select();
-           
         }else{
-            $list= Db::name($table)->where("ZHGXSJ","<=",date("Y-m-d H:i:s",$time))->order("ZHGXSJ","desc")->select();
+            $this->write_log("开始监听当前时间之后的数据".date("Y-m-d H:i:s",$time));
+            $list= Db::name($table)->where("ZHGXSJ",">",date("Y-m-d H:i:s",$time))->order("ZHGXSJ","asc")->select()->toArray();
+            if(empty($list)){
+                $this->write_log("数据库:.$table"."数据无更新的数据休眠1s");
+                sleep(1);
+                return;
+            }
+            $this->write_log("同步开始");
+            $this->setData($table,$list);
+            $this->write_log("同步结束");
+           
+            $time= strtotime($list[count($list)-1]["ZHGXSJ"]);
+            $this->write_log("存入最新时间:".date("Y-m-d H:i:s",$time));
+            $this->getUpdateTime($table,2,$time);
         }
         
-        if(empty($list)){
-            debug_log("数据库解析","数据库:.$table"."数据无更新的数据休眠1s");
-            sleep(1);
-            return;
-        }
        
-        $this->setData($list,$table);
-        $time=strtotime("Y-m-d H:i:s",$list[count($list)-1]["ZHGXSJ"]);
-        $this->getUpdateTime($table,2,$time);
+ 
+       
     }
-
-    private function setData($list,$table){
+    /**
+     * 存入数据 function
+     *
+     * @param [type] $table
+     * @param [type] $list
+     * @return void
+     */
+    private function setData($table,$list){
 
         foreach($list as $item){
-           
+          
             $data=$this->Operation($table, $item);
-           
+         
+         
             if(!empty($data)){
                 if($this->type==1){
                     //存入kafuka
-                    $this->toKafuka($data);
+                    $this->toKafka($data);
                 }else{
                     //存入redis
+                 
                     $this->toRedis($data);
+                   
                 }
             }
         }
@@ -83,27 +155,29 @@ class mysqlToKafuka01
      * @return void
      */
     public function Operation($table_name,$data){
-      
+       
+
         switch($table_name)
         {
            case "fjd_zcdj":
                 return $this->fjd_zcdj($table_name,$data);  
            case "fjdc_cbc_zcdj":
                 return $this->fjdc_cbc_zcdj($table_name,$data);
-    
+            default:
+                return false;
+
         }
         
     }
     public function fjdc_cbc_zcdj($table_name,$data){
         $json_data=[];
-        $time=strtotime("Y-m-d H:i:s",$data["ZHGXSJ"]);
-        $type= $this->getTableIdType($table_name,$data["FJDCXH"],$time);
+        $time=$data["ZHGXSJ"];
+        $type= $this->getTableIdType($table_name,$data["FJDCXH"],strtotime($time));
     
         if($type=="add"){
             $json_data=[
                 "PLATE_NO"=>$data["HPHM"],
                 "RFID_SN"=>"12345678",
-                "CAR_TYPE"=>$data["CLZL"],
                 "CAR_BRAND"=>$data['ZWPP'],
                 "NAME"=>$data["CZXM"],
                 "ID_CARD_NUMBER"=>$data['CZSFZMHM'],
@@ -118,7 +192,7 @@ class mysqlToKafuka01
                 "PLATE_NO"=>$data["HPHM"],
                 "OLD_NO"=>$data["HPHM"],
                 "RFID_SN"=>"77778888",
-                "CAR_TYPE"=>$data["CLZL"],
+                // "CAR_TYPE"=>$data["CLZL"],
                 "CAR_BRAND"=>$data['ZWPP'],
                 "NAME"=>$data["CZXM"],
                 "ID_CARD_NUMBER"=>$data['CZSFZMHM'],
@@ -137,18 +211,19 @@ class mysqlToKafuka01
         $json_data=[];
         $time=$data["ZHGXSJ"];
         $type= $this->getTableIdType($table_name,$data["FJDCXH"],strtotime($time));
-        var_dump($data);
+        $brand_id=$this->match_brand($data['ZWPP']);
+        $car_type_id=$this->match_car_type($data['CLZL']);
         if($type=="add"){
             $json_data=[
                 "PLATE_NO"=>$data["HPHM"],
-                "RFID_SN"=>"12345678",
-                "CAR_TYPE"=>$data["CLZL"],
-                "CAR_BRAND"=>$data['ZWPP'],
+                "RFID_SN"=>$data["GYHM"],
+                "CAR TYPE"=>$car_type_id,
+                "CAR_BRAND"=>$brand_id,
                 "NAME"=>$data["CZXM"],
                 "ID_CARD_NUMBER"=>$data['CZSFZMHM'],
                 "MOBILE_NUMBER"=>$data['LXDH'],
                 "INSTA_DATE"=>$time,
-                "INSTALLER"=>$data["CLQRR"],
+                "INSTALLER"=>1,
                 "DATA_TYPE"=>"vehicle_save"
             ];
                 
@@ -158,10 +233,9 @@ class mysqlToKafuka01
            
             $json_data=[
                 "PLATE_NO"=>$data["HPHM"],
-                "OLD_NO"=>$data["HPHM"],
-                "RFID_SN"=>"77778888",
-                "CAR_TYPE"=>$data["CLZL"],
-                "CAR_BRAND"=>$data['ZWPP'],
+                "RFID_SN"=>$data["GYHM"],
+                "CAR TYPE"=>$car_type_id,
+                "CAR_BRAND"=>$brand_id,
                 "NAME"=>$data["CZXM"],
                 "ID_CARD_NUMBER"=>$data['CZSFZMHM'],
                 "MOBILE_NUMBER"=>$data['LXDH'],
@@ -170,7 +244,7 @@ class mysqlToKafuka01
            
 
         }
-      
+     
         return $json_data;
 
     }
@@ -191,7 +265,6 @@ class mysqlToKafuka01
         }else{
             return "update";
         }
-      
 
     }
 
@@ -213,7 +286,7 @@ class mysqlToKafuka01
            $this->setHash($hashKey,$table,["time"=>$time]);
            return null;
         }else{
-           $data= $this->getHash($hashKey,$table,$time); 
+           $data= $this->getHash($hashKey,$table); 
            if(empty($data)){
              return false;
            }else{
@@ -241,7 +314,24 @@ class mysqlToKafuka01
          $this->redis->hDel($hashKey,$key);
      }
  
-
+    public function init_kafka(){
+        $this->write_log("初始化kafka");  
+        $conf = new Rdkafka\Conf();
+        $kafka_ip= Env::get('kafka.ip', '');
+        $topic=Env::get('kafka.topic', '');
+        if(empty($kafka_ip)){
+            throw new Exception("kafka的ip不存在");
+        }
+        if(empty($topic)){
+            throw new Exception("kafka的主题不存在");
+        }
+		$conf->set('metadata.broker.list',$kafka_ip);
+        $conf->setErrorCb(function ($kafka, $err, $reason) {
+            throw new Exception("kafka错误: err:.$err.reason:.$reason");
+        });
+        $this->rk= new RdKafka\Producer($conf);
+		$this->topic = $this->rk->newTopic($topic);
+    }
 
     
     /**
@@ -250,33 +340,14 @@ class mysqlToKafuka01
      * @param [type] $data
      * @return void
      */
-    public function toKafuka($data){
-
-        // $conf = new RdKafka\Conf();
-        // $conf->setDrMsgCb(function ($kafka, $message) {
-        //     file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
-        // });
-        // $conf->setErrorCb(function ($kafka, $err, $reason) {
-        //         file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
-        // });
-
-        // $rk = new RdKafka\Producer($conf);
-        // $rk->setLogLevel(LOG_DEBUG);
-        // $rk->addBrokers("127.0.0.1");
-        // $cf = new RdKafka\TopicConf();
-        // $cf->set('request.required.acks', 0);
-        // $topic = $rk->newTopic("test", $cf);
-        // $option = 'qkl';
-        // for ($i = 0; $i < 20; $i++) {
-        //      $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);
-        // }
-        // $len = $rk->getOutQLen();
-        //     while ($len > 0) {
-        //     $len = $rk->getOutQLen();
-            
-        //     $rk->poll(50);
-        // }
-
+    public function toKafka($data){
+       
+        
+        $this->topic->produce(RD_KAFKA_PARTITION_UA, 0,json_encode($data));
+		$this->rk->poll(0);			
+		while ($this->rk->getOutQLen() >300) {
+			$this->rk->poll(10);
+		}	
 
     }
       /**
@@ -291,6 +362,81 @@ class mysqlToKafuka01
         $redis->Rpush($key,json_encode($data));
     }
 
+    public function write_log($text){
+
+        DahuaUtil::rlog($text);
+
+    }
+    /**
+     * function 车牌匹配
+     * @param [type] brand  车牌名称
+     * @return void
+     */
+    private function match_brand($brand){
+
+        $brand_list= Config::get("app.CAR_BRAND");
+        $brand_key=99;
+      
+        
+        if(empty($brand_list)){
+
+            throw new Exception("车辆品牌不存在");
+        }
+        if(empty($brand)){
+
+          return  $brand_key;
+        }
+        
+        $bool=false;
+        foreach($brand_list as $key=>$val){
+           if(strlen($val)<=strlen($brand)){
+                $bool= stristr($brand,$val);
+           }else{
+                 $bool= stristr($val,$brand);    
+           } 
+           if($bool){
+                 $brand_key=$key;
+                 break;
+           }
+        }
+        
+
+        return $brand_key;
+
+    }
+
+    private function match_car_type($car_type){
+
+        $type_list= Config::get("app.CAR_TYPE");
+        $type_key=4;
+        if(empty($type_list)){
+
+            throw new Exception("车辆类型列表不存在");
+        }
+        if(empty($car_type)){
+
+            return $type_key;
+        }
+        
+        $bool=false;
+        foreach($type_list as $key=>$val){
+           if(strlen($val)<=strlen($car_type)){
+                $bool= stristr($car_type,$val);
+           }else{
+                 $bool= stristr($val,$car_type);    
+           } 
+           if($bool){
+                $type_key=$key;
+                 break;
+           }
+        }
+       
+
+        return $type_key;
+
+
+    }
+
 
 
 

+ 4 - 2
composer.json

@@ -36,8 +36,10 @@
         "aliyun/aliyun-tablestore-sdk-php": "~5.0",
         "alibabacloud/sts-20150401": "1.0.1",
         "phpmailer/phpmailer": "^6.6",
-        "php-mqtt/client": "^1.8",
-        "nmred/kafka-php": "0.2.*"
+        "php-mqtt/client": "^1.8"
+      
+   
+       
        
     },
     "require-dev": {

+ 26 - 0
config/app.php

@@ -1,4 +1,5 @@
 <?php
+
 // +----------------------------------------------------------------------
 // | 应用设置
 // +----------------------------------------------------------------------
@@ -33,4 +34,29 @@ return [
     'error_message'    => '页面错误!请稍后再试~',
     // 显示错误信息
     'show_error_msg'   => true,
+    "CAR_BRAND"=>array(
+        1=>"绿源",
+        2=>"绿驹",
+        3=>"爱玛",
+        4=>"新日",
+        5=>"雅马哈",
+        6=>"捷安特",
+        7=>"雅迪",
+        8=>"台铃",
+        9=>"阿玛尼",
+        10=>"邦德富士达",
+        11=>"风爵",
+        12=>"星月神",
+        13=>"莫拉克",
+        99=>"其它",
+    ),
+    "CAR_TYPE"=>array(
+         6=>"轻便电动车",
+         1=>"电动车",
+         2=>"三轮车",
+         3=>"轮椅车",
+         5=>"电摩",
+         4=>"其他"
+    ),
+
 ];