Parcourir la source

Merge branch 'master' of http://gogs.renlianiot.com:4000/zmcoding/station-test-api

nana_sen il y a 1 an
Parent
commit
84f38e02fc

+ 5 - 3
.env

@@ -8,7 +8,7 @@ DOMAIN = http://app
 [DATABASE]
 TYPE = mysql
 HOSTNAME = 127.0.0.1
-DATABASE = station
+DATABASE = test
 USERNAME = root
 PASSWORD = root
 HOSTPORT = 3306
@@ -19,11 +19,13 @@ DEBUG = true
 [REDIS]
 HOST = 127.0.0.1
 PORT = 6379
-PASSWORD = '123456'
-SELECT = 3
+PASSWORD = 'R!478gH*%23nPn'
+SELECT = 2
 
 [LANG]
 default_lang = zh-cn
 
 [JWT]
 SECRET=2e06b643e36b27b2899ff4fea0c7ee2f
+[KAFUKA]
+

+ 66 - 103
catch/api/controller/Api.php

@@ -57,22 +57,23 @@ class Api extends CatchController
     public function redis_to_mysql(){
 
         $ues_redis=Cache::store('redis')->handler();
+        $dispose = new dispose($ues_redis); 
         $text=null;
-        
+       
       
         while(1){
             $jsonData= $ues_redis->rpop("mqtt_data");
-            debug_log("InAndOUT",'数据redis'.$jsonData);
-            
             if(empty($jsonData)){
-                debug_log("InAndOUT",'没有数据');
                 sleep(1);
                 continue;
-                
             }
+            debug_log("InAndOUT",'数据redis'.$jsonData);
+           
             $data=json_decode($jsonData,true);
             $time=$data['time'];
             $mac=$data['devId'];
+            //存入基站的最新时间
+            $dispose->setStations($mac,$time);
             $list=[];
             $text=$text.$data['cnt'];
             
@@ -103,26 +104,18 @@ class Api extends CatchController
                         'rssi4' => hexdec($rssi4),
                         'report_time'=>$time
                     ];
-                    
-                  
-                   
-                    // $num=dechex($label);
                      $vs=false;
                      $history_status=false;
-                    //  $DA['num']=$num;
+                
                      $DA['other_time']=date("Y-m-d H:i:s",$time);
+                     debug_log("label_log","cssh: ".json_encode($DA));
 
                      if($DA['rssi1']<=85||$DA['rssi2']<=85||$DA['rssi3']<=85||$DA['rssi4']<=85){
                         $history_status=true;
-                    }else{
-                        debug_log("label_log","cache_del:".json_encode($DA));
-                    }
-                    debug_log("label_log","cssh: ".json_encode($DA));
+                     }
+                   
                     if($DA['rssi1']<=72||$DA['rssi2']<=72||$DA['rssi3']<=72||$DA['rssi4']<=72){
                         $vs=true;
-                    }else{
-                        debug_log("label_log","delete:".json_encode($DA));
-
                     }
                     $DA['rssi1']=$DA['rssi1']==255?100:$DA['rssi1'];
                     $DA['rssi2']=$DA['rssi2']==255?100:$DA['rssi2'];
@@ -139,28 +132,20 @@ class Api extends CatchController
                         debug_log("label_log","cache: ".json_encode($DA));
                         $diff= $DA["rssi1"]-$DA["rssi2"];
                        if(abs($diff)>2){
-                          $dispose = new dispose($ues_redis); 
                           $dispose->set_label_history($DA);
                        }
                        debug_log("label_log","cache_diff: ".$diff);
-                      
                     }
-                    
-
-                    $this->temporary_label($ues_redis,$label,1);
+                
                  
                      if($vs&&(substr($da,8,2)=='01')){
-                     
-                        try{
-                            $dispose = new dispose($ues_redis);
+                      try{
                             $DA= $dispose->check_data($DA);
                             debug_log("label_log","xsj:".json_encode($DA));
                             $dispose->computeResout($DA);
-                            
                         }catch(Exception $e){
-                            debug_log("InAndOUT","抛出异常".json_encode($DA,true));
+                            debug_log("InAndOUT","抛出异常:".$e->getMessage());
                         }
-                        
                      }
                    
                      $text=substr($text,24);
@@ -207,50 +192,37 @@ class Api extends CatchController
                 }
                 foreach($keys as $item){
                     if( $json_data=$redis->hget($key,$item)){
-                        debug_log("clear_label",'基站'.$station);
-                        debug_log("clear_label",'标签'.$item);
-                        debug_log("clear_label",'获取的数据'.$json_data);
+                      
+                        debug_log("clear_label",'基站'.$station.'获取的数据'.'标签'.$item.$json_data);
                         $json_data=json_decode($json_data,true);
                         //列表
                         $a=$json_data['a'];
                         //状态
                         $status=$json_data['status'];
-                        //进出
+                        //是否形成进出考勤
                         $in_and_out=$json_data["in_and_out"];
                        
                         //总数
                         $count= count($a);
-                        if((time()-$json_data["time"])<=7){
+                        $time= $dispose->getNowStationTime($station);
+                        
+                        debug_log("clear_label","标签最新最新时间". date('Y-m-d H:i:s', $time));
+                        if(($time-$json_data["time"])<=7){
+
                             debug_log("clear_label", $item.':'. "标签上报间隔没有超过7秒进行跳过");
                             continue;
-                        }
-                    
+                        } 
                         if($count<5){
-                           
                             if($count>2){
                                 debug_log("clear_label", $item.':'. "标签信号个数等于超过2个,参与计算");
                                 $frist=$a[0]['dirt'];
                                 $end=$a[$count-1]['dirt'];
+
                                 if($frist!=$end){
-                                    $data_array=[];
-                                    $data_array[]=[
-                                        "label"=>$item,
-                                        'time'=>$a[$count-1]['time'],
-                                        'dirt'=> $frist==1?1:2 //1是进 2是出
-                                    ];
-                                    $url_data=[
-                                        "mac"=>$station,
-                                        "data"=>$data_array
-                                    ];
                                     debug_log("clear_label", $item.':'. "标签信号个数等于超过2个,参与计算,得出结果:".$frist);
-                                    //设置key
-                                    $dispose->set_time_results($station,$item,$a[$count-1]['time'],$frist==1?1:2);
-                                    $dispose->getRemoteData($url_data);
-                                    // debug_log("InAndOUT","得出计算结果:".json_encode($arr));
-                                    debug_log("clear_label","发送给远程".json_encode($url_data));
-
+                                    $dispose->network_push($station,$item,$a[$count-1]['time'],$frist==1?1:2);
                                 }else{
-                                    debug_log("clear_label", $item.':'. "标签信号个数没有超过4个,全部去清除");
+                                    debug_log("clear_label", $item.':'. "标签信号个数没有超过2个,全部去清除");
                                 }
 
                             }
@@ -283,32 +255,12 @@ class Api extends CatchController
     
                             if($status['dirt']!=$res){
                                 debug_log("clear_label", $item.':'. "最后一个信号,方向相反,得出结果,清除缓存数据");
-                                $redis->hdel($key,$item);
-                                $data_array=[];
-                                $data_array[]=[
-                                    "label"=>$item,
-                                    'time'=>$a[$count-1]['time'],
-                                    'dirt'=> $status['dirt']==1?1:2 //1是进 2是出
-                                ];
-                                $url_data=[
-                                    "mac"=>$station,
-                                    "data"=>$data_array
-                                ];
-                                //设置key
-                                $dispose->set_time_results($station,$item,$a[$count-1]['time'],$status['dirt']==1?1:2);
-                                // debug_log("InAndOUT","得出计算结果:".json_encode($arr));
-                                debug_log("clear_label","发送给远程".json_encode($url_data));
-                                //远程推送
-                               $dispose->getRemoteData($url_data);
-                               //清除缓存数据
-                                $this->temporary_label($redis,$item,2);
-                             
+
+                                $dispose->network_push($station,$item,$a[$count-1]['time'],$status['dirt']==1?1:2);
+
                             }else{
                                 //上午不做二次计算
                                 if(empty($in_and_out)){
-                                    //二次生成结果
-                                    
-                                    
                                     $res=$dispose->second_create_direction($station,$item);
                                     if(!empty($res)){
                                         $res_time= $dispose->get_time_results($station,$item);
@@ -318,24 +270,26 @@ class Api extends CatchController
                                         //判断如果结果超过时间
                                         if($diff_time>10){
                                             debug_log("clear_label", $item.':'. "二次生成进出");
-                                            $data_array=[];
-                                            $data_array[]=[
-                                                "label"=>$item,
-                                                'time'=>$res['time'],
-                                                'dirt'=>$res['dirt'],
-                                            ];
-                                            $url_data=[
-                                                "mac"=>$station,
-                                                "data"=>$data_array
-                                            ];
-                                            debug_log("clear_label","二次生成发送给远程".json_encode($url_data));
-                                            //远程推送时间
-                                            $dispose->set_time_results($station,$item,$res['time'],$res['dirt']);
-                                            //远程推送
-                                            $dispose->getRemoteData($url_data);
 
+                                            $dispose->network_push($station,$item,$res['time'],$res['dirt']);
                                         }
 
+                                    }else{
+                                        //存入没有生成考勤时的初始防线
+                                        //获取朝前的最大值
+                                        //获取朝后的最大值
+                                        $frontMax=  $dispose->get_label_history(4,$station,$item);
+                                        $backMax=  $dispose->get_label_history(5,$station,$item);
+                                        $records=false;
+                                        if($status['dirt']==1){
+                                            $records=$frontMax<65;
+                                        }
+                                        if($status['dirt']==2){
+                                            $records=$backMax<65;
+                                        }
+                                        if($records){
+                                            $dispose->no_check_data($station,$item,$status['dirt'],$a[$count-1]['time'],1);
+                                        }
                                     }
 
                                 }
@@ -375,7 +329,7 @@ class Api extends CatchController
             $redis=Cache::store('redis')->handler();
             $key="push_data";
            $url="http://localhost:8115/api/accessReport";
-          // $url="http://127.0.0.1:8000/api/accessReport";
+         
            sleep(2);
             while($data=$redis->lPop($key)){
                 if(!empty($data)){
@@ -510,18 +464,27 @@ class Api extends CatchController
         
     }
 
-  public function temporary_label($redis,$label,$type){
-        $key="temporary_label";
-        if($type==1){
-            $res= $redis->hGet($key,$label);
-            if($res==false){
-                $redis->hSet($key,$label,1);
-            }
-        }else{
-            $redis->hSet($key,$label,3);
-        }
 
-  }
+
+
+
+
+
+
+
+
+//   public function temporary_label($redis,$label,$type){
+//         $key="temporary_label";
+//         if($type==1){
+//             $res= $redis->hGet($key,$label);
+//             if($res==false){
+//                 $redis->hSet($key,$label,1);
+//             }
+//         }else{
+//             $redis->hSet($key,$label,3);
+//         }
+
+//   }
 
 
     //消息处理--重点区域

+ 35 - 0
catch/api/controller/sqlLister.php

@@ -0,0 +1,35 @@
+<?php
+namespace catchAdmin\api\controller;
+
+use catchAdmin\api\service\mysqlToKafuka01;
+use catcher\base\CatchController;
+use think\facade\Cache;
+
+class sqlLister extends CatchController
+{
+    /**
+     * 监听者 function
+     *
+     * @return void
+     */
+    public function lister(){
+       $redis=Cache::store('redis')->handler();
+       $sqlTokafuka=  new mysqlToKafuka01(2,$redis);
+       
+       $sqlTokafuka->sql_monitor(); 
+
+    }
+
+
+
+
+
+
+
+
+
+
+
+
+
+}

+ 3 - 1
catch/api/route.php

@@ -5,7 +5,7 @@
  * @Author: likang
  * @Date: 2022-06-09 10:11:32
  * @LastEditors: likang 1186820806@qq.com
- * @LastEditTime: 2023-05-19 15:32:33
+ * @LastEditTime: 2023-06-14 17:17:04
  */
 // +----------------------------------------------------------------------
 // | CatchAdmin [Just Like ~ ]
@@ -41,4 +41,6 @@ $router->get('api/redis_indoor', '\catchAdmin\api\controller\Api@redis_to_mysql_
 
 //更新软件版本测试
 //$router->get('api/detectionVersion', '\catchAdmin\api\controller\Api@detectionVersion');
+$router->get('api/redis_parea', '\catchAdmin\api\controller\Api@redis_to_mysql_parea');
 
+$router->get('sql/lister', '\catchAdmin\api\controller\sqlLister@lister');

+ 123 - 172
catch/api/service/dispose.php

@@ -13,78 +13,11 @@ class dispose
 
     }
     
-    // public function computeData($data){
-    //     debug_log("InAndOUT","基站:".$data['mac'] .' label :'.$data['label']);
-    //     debug_log("InAndOUT","新数据:".json_encode($data));
-    //     $hashKey=$data['mac'].'station';
-    //     $key=$data['label'];
-    //     $list=[];
-    //     $length=0;
-    //     $res= $this->selectHash($hashKey,$key);
-       
-    //     if($res==false){
-    //         debug_log("InAndOUT","redis没有数据,存入新数据");
-    //         array_push($list,$data);
-    //         debug_log("InAndOUT","当前数据队列",json_encode($list));
-            
-    //     }else{
-    //         debug_log("InAndOUT","redis初始数据 :".json_encode($res));
-    //         $list=$res;
-    //         debug_log("InAndOUT","上次上报的数据 :".json_encode($list[count($list)-1],true));
-           
-    //         // if(($data['report_time']-$list[count($list)-1]['report_time'])>=120){
-    //         //     debug_log("InAndOUT","上报次数间隔超过120s,清空redis缓存,从新存入数据");
-    //         //     $list=[];
-    //         //     array_push($list,$data);
-    //         // }else{
-    //             $list=$res;
-    //             array_push($list,$data);
-               
-    //             //按照个数
-    //             while(count($list)>5){
-    //                 array_shift($list);
-    //             }
-
-
-    //        // }  
-
-    //     }
-    //     //数据
-    //     $count=count($list);
-    //     debug_log("InAndOUT","当前数据队列 :".json_encode($list));
-    //     debug_log("InAndOUT","新数据长度 :".$count);
-    //     //现在的数据长度
+    
 
-      
-    //     //获取配置
-    //     $config= $this->selectHash("anbang_four_wire",$data['mac']);
+    public function check_data($data){
         
-
-    //     if(empty($config)){
-    //         debug_log("InAndOUT","没有配置");
-    //     }else{
-    //         debug_log("InAndOUT","配置参数为:".json_encode($config));
-
-    //     }
-    //     $sortlist=  $this->sortlist($list);
        
-        
-    //     $data["rssi1"]=$sortlist[0];
-    //     $data["rssi2"]=$sortlist[1];
-    //     $data["rssi3"]=$sortlist[2];
-    //     $data["rssi4"]=$sortlist[3];
-    //     debug_log("InAndOUT","计算出的平均数据 :".json_encode($data));
-    //     //保存到hash中
-    //     debug_log("InAndOUT","存入redis缓存的数据 :".json_encode($list));
-    //     $this->setHash($hashKey,$key,$list);
-    //     debug_log("InAndOUT","=======================================");
-    //     return $data;
-    // }
-
-
-    public function check_data($data){
-        $this->setStations($data['mac']);
-        $res='';
         $hashKey=$data['mac'].'station';
         $key=$data['label'];
         $old_data=$this->selectHash($hashKey,$key);
@@ -92,14 +25,12 @@ class dispose
             if($old_data["rssi1"]<90){  
                 $data["rssi1"]=$old_data["rssi1"]+8;
                 $data["rssi1"]= $data["rssi1"]>90 ? 90:$data["rssi1"];
-                $res=$res."rssi1 ";
             }
         }
         if($data["rssi2"]==90&&!empty($old_data)){
             if($old_data["rssi2"]<90){
                 $data["rssi2"]=$old_data["rssi2"]+8;
                 $data["rssi2"]= $data["rssi2"]>90 ? 90:$data["rssi2"];
-                $res=$res."rssi2";
             }
         }
         $this->setHash($hashKey,$key,$data);
@@ -107,23 +38,12 @@ class dispose
     }
 
 
-
-
-
-
-
-
-
-
-
-
-
     //计算结果设计
     public function computeResout($data){
   
-        debug_log("InAndOUT","=======================================");
+       
         debug_log("InAndOUT",'接收新数据'.json_encode($data));   
-        debug_log("InAndOUT","=======================================");    
+       
        $res=0;
        //进出
     
@@ -220,8 +140,9 @@ class dispose
                 $status['dirt']= $res;
                 $status['time']= $resArray['time'];
                 debug_log("InAndOUT","初始化状态为:".json_encode($status));
-            }else{
 
+            }else{
+                $this->no_check_data($data['mac'],$data['label'],$status['dirt'],$time,2);
                 //判断是否连贯
                $IS_OK=true;
                $res=0;
@@ -232,36 +153,15 @@ class dispose
                       $IS_OK=false;
                    } 
                }
-            //    if(($reverse[0]['dirt']==$reverse[1]['dirt'])&&($reverse[1]['dirt']==$reverse[2]['dirt'])){
-            //       $res=$reverse[0]['dirt'];
-            //       $IS_OK=true;
-            //    }
-
+            
                 //进行保存
                 if($res!=$status['dirt']&&$IS_OK){
-                  
-                    // if(!empty($status['status']['dirt'])){
-
-                    //     if($resArray['time']-$status['status']['time']>5){
 
-                            $data_array=[];
-                            $data_array[]=[
-                                "label"=>$data['label'],
-                                'time'=>$resArray['time'],
-                                'dirt'=> $status['dirt']==1?1:2 //1是进 2是出
-                            ];
                             $inAndOut=$status['dirt']==1?1:2;
-                            $url_data=[
-                                "mac"=>$data['mac'],
-                                "data"=>$data_array
-                            ];
-                            //设定最后结果
-                            $this->set_time_results($data['mac'],$data['label'],$resArray['time'],$inAndOut);
-                            //推送远程数据
-                            $this->getRemoteData($url_data);
-                            //清除缓存数据
+                           //网络推送
+                            $this->network_push($data['mac'],$data['label'],$resArray['time'],$inAndOut);
+                            //清除历史记录
                             $this->get_label_history(3,$data['mac'],$data['label']);
-                            $this->temporary_label($this->redis,$data['label'],2);
                           $status['dirt']=$res;
                           $status['time']=$time;
 
@@ -286,6 +186,67 @@ class dispose
 
     }
 
+    /**
+     * 用于记录 没有生成考勤的数据信息
+     * @param $mac    基站
+     * @param $lable  标签
+     * @param $dir  1 外面  2 里面
+     * @param $type  1 存入 2获取,3 清除
+     * @param $time  时间戳
+     * @return void
+     */
+    public function no_check_data($mac,$lable,$dir=0,$time=null,$type){
+        $HASH_KEY=$mac."_no_check";
+        $KEY=$lable;
+        if($type==1){
+            debug_log("没有生成考勤",$mac." ".$lable."初始方向:$dir.时间.$time");
+            $frontMax=  $this->get_label_history(4,$mac,$lable);
+            $backMax=  $this->get_label_history(5,$mac,$lable);
+            $records=false;
+            debug_log("没有生成考勤",$mac." ".$lable."初始方向:$dir.时间.$time.前面最小值:$frontMax.里面最小值.$backMax");
+            if($dir==1){
+                $records=$frontMax<65;
+            }
+            if($dir==2){
+                $records=$backMax<65;
+            }
+            if($records){
+                debug_log("没有生成考勤",$mac." ".$lable."初始方向:$dir.时间.$time.存入redis");
+                $this->setHash( $HASH_KEY,$KEY,["time"=>$time,"dir"=>$dir]);
+            }
+
+        }
+        if($type==2){
+            $data= $this->selectHash($HASH_KEY,$KEY);
+            if(empty($data)){
+                return false;
+            }else{
+                debug_log("没有生成考勤","$mac.=======.$lable.新初始方向确定.$dir.原来方向确定.".$data["dir"] );
+
+                if($data["dir"]!=$dir){
+                    if($dir==1){
+                        $frontMax=  $this->get_label_history(4,$mac,$lable);
+                        if($frontMax<65){
+                            debug_log("没有生成考勤","$mac.=======.$lable.新初始方向确定.$dir.原来方向确定.".$data["dir"]."生成进出 2");
+                            $this->network_push($mac,$lable,$data["time"],2);
+                        }
+
+                    }else{
+                        $backMax= $this->get_label_history(5,$mac,$lable);
+                        if($backMax<65){
+                            debug_log("没有生成考勤","$mac.=======.$lable.新初始方向确定.$dir.原来方向确定.".$data["dir"]."生成进出 1");
+                            $this->network_push($mac,$lable,$data["time"],1);
+                        }
+
+                    }
+                }
+            }
+        }
+        if($type==3){
+            $this->delHash($HASH_KEY,$KEY);
+        }
+
+    }
 
 
     //获取所有基站
@@ -294,55 +255,24 @@ class dispose
         $list= $this->redis->hKeys($key);
         return $list;
     }
+    //获取基站最新时间
+    public function getNowStationTime($mac){
+        $key=$this->stations;
+        $time= $this->selectHash($key,$mac);
+        if(empty($time))
+        {
+            return time();
+        }
+        return $time;
+    }
     //存入基站号
-    public function setStations($Stations){
+    public function setStations($Stations,$time){
         $key=$this->stations;
-        if(!$this->redis->hExists($key,$Stations)){
-            $this->redis->hSet($key,$Stations,0);
-        }  
+        $this->redis->hSet($key,$Stations,$time);
     }
 
 
-    //排序
-    // public function sortlist($list){
-    //     $list01=[];
-    //     $list02=[];
-    //     $list03=[];
-    //     $list04=[];
-    //     debug_log("InAndOUT","排序的数据:".json_encode($list));
-    //     foreach($list as $item){
-    //        array_push($list01,$item['rssi1']);
-    //        array_push($list02,$item['rssi2']);
-    //        array_push($list03,$item['rssi3']);
-    //        array_push($list04,$item['rssi4']);
-    //     }
-
-    //     if(count($list)>=3){
-    //         sort($list01);
-    //         sort($list02);
-    //         sort($list03);
-    //         sort($list04);
-    //         array_pop($list01);
-    //         array_pop($list02);
-    //         array_pop($list03);
-    //         array_pop($list04);
-    //         array_shift($list01);
-    //         array_shift($list02);
-    //         array_shift($list03);
-    //         array_shift($list04);
-    //     }
-    //     debug_log("InAndOUT","排序后的队列:".json_encode($list01));
-    //     debug_log("InAndOUT","排序后的队列:".json_encode($list02));
-    //     debug_log("InAndOUT","排序后的队列:".json_encode($list03));
-    //     debug_log("InAndOUT","排序后的队列:".json_encode($list04));
-    //     $rssi1= array_sum($list01)/count($list01);
-    //     $rssi2= array_sum($list02)/count($list02);
-    //     $rssi3= array_sum($list03)/count($list03);
-    //     $rssi4= array_sum($list04)/count($list04);
-    //     $res=[$rssi1,$rssi2,$rssi3,$rssi4];
-    //     debug_log("InAndOUT","结算后的结果:".json_encode($res));
-    //     return $res;
-    // }
+
 /**
  * 获取远程推送的数据
  *
@@ -455,10 +385,45 @@ public function get_label_history($type,$mac,$label){
         return $backMax;
     }
 
+}
+
+    /**
+     * @param $mac
+     * @param $label
+     * @param $time
+     * @param $dir  1 进 ,2 出
+     * @return void
+     */
+    public function  network_push($mac,$label,$time,$dir)
+    {
+        $data_array=[];
+        $data_array[]=[
+            "label"=>$label,
+            'time'=>$time,
+            'dirt'=>$dir,
+        ];
+        $url_data=[
+            "mac"=>$mac,
+            "data"=>$data_array
+        ];
+        debug_log("clear_label","二次生成发送给远程".json_encode($url_data));
+        //远程推送时间
+        $this->set_time_results($mac,$label,$time,$dir);
+        $this->no_check_data($mac,$label,null,null,3);
+        //远程推送
+        $this->getRemoteData($url_data);
+
+    }
+
+
+
+
+
+
+
 
 
 
-}
 /**
  * 二次生成进出
  *
@@ -556,9 +521,6 @@ public function second_create_direction($mac,$label){
         debug_log("second_dirt","数据为空,进行生成");
     }
    
-    
-
-
    debug_log("second_dirt","最终结果".json_encode(["dirt"=>$res,"time"=>$time]));
     return ["dirt"=>$res,"time"=>$time];
 
@@ -580,7 +542,9 @@ public function get_station_config($mac){
         "init_data"=>3, //需要三条数据确定初始方向
         "change_data"=>5, //需要5条数据确定变换的方向
         "timeout"=>6,
-        "second_create_res"=>[]//二次根据历史记录生成配置 数组中的参数配置  start_time,end_time,dir 1 前 2后;
+        "second_create_res"=>[
+
+        ]//二次根据历史记录生成配置 数组中的参数配置  start_time,end_time,dir 1 前 2后;
      ];
      $config_data=$this->selectHash($Key,$mackey);
      if(!empty($config_data)){
@@ -657,20 +621,7 @@ public function get_station_config($mac){
 
 
 
-    public function temporary_label($redis,$label,$type){
 
-        return;
-        $key="temporary_label";
-        if($type==1){
-            $res= $redis->hGet($key,$label);
-            if($res==false){
-                $redis->hSet($key,$label,1);
-            }
-        }else{
-            $redis->hSet($key,$label,2);
-        }
-
-    }
     /**
      * 解析日志
      */

+ 19 - 39
catch/api/service/mysqlToKafuka.php

@@ -1,9 +1,10 @@
 <?php
 namespace catchAdmin\api\service;
 
+use think\facade\Cache;
 use think\facade\Db;
 
-//脚本衢州 从mysql中存到kafka/redis中
+//脚本衢州 目前不使用
  //mysql 监听器
 class mysqlToKafuka
 {
@@ -17,17 +18,25 @@ class mysqlToKafuka
         
     }
 
-
-
     
     public function sql_monitor(){
         $table="test";
-        $list=Db::name($table)->where("status",1)->select();
+        $list=Db::name($table)->where("status",1)->order("id","asc")->select();
         foreach($list as $key=>$item){
             $table_name=$item["table_name"];
             $type=$item["type"];
             $table_id=$item["table_id"];
-            $this->Operation($table_name,$table_id,$type);
+            $data=$this->Operation($table_name,$table_id,$type);
+            if(!empty($data)){
+                if($this->type==1){
+                    //存入kafuka
+                    $this->toKafuka($data);
+                }else{
+                    //存入redis
+                    $this->toRedis($data);
+                }
+            }
+            Db::name($table)->where("id",$item["id"])->delete();
         }
 
     }
@@ -54,13 +63,13 @@ class mysqlToKafuka
     public function fjd_zcdj($table_name,$table_id,$type){
         $data=Db::name($table_name)->where("id",$table_id)->find();
         $json_data=false;
-
         if(empty($data)){
             return false;
         }
+        
         if($type=="add"){
             $json_data=[
-                "PLATE_NO"=>"测试AAAAAA",
+                "PLATE_NO"=>$data["HPHM"],
                 "RFID_SN"=>"12345678",
                 "CAR_TYPE"=>"1",
                 "CAR_BRAND"=>"1",
@@ -108,32 +117,6 @@ class mysqlToKafuka
      */
     public function toKafuka($data){
 
-<<<<<<< HEAD
-        // $conf = new RdKafka\Conf();
-        // $conf->setDrMsgCb(function ($kafka, $message) {
-        //     file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
-        // });
-        // $conf->setErrorCb(function ($kafka, $err, $reason) {
-        //         file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
-        // });
-
-        // $rk = new RdKafka\Producer($conf);
-        // $rk->setLogLevel(LOG_DEBUG);
-        // $rk->addBrokers("127.0.0.1");
-        // $cf = new RdKafka\TopicConf();
-        // $cf->set('request.required.acks', 0);
-        // $topic = $rk->newTopic("test", $cf);
-        // $option = 'qkl';
-        // for ($i = 0; $i < 20; $i++) {
-        //      $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);
-        // }
-        // $len = $rk->getOutQLen();
-        //     while ($len > 0) {
-        //     $len = $rk->getOutQLen();
-            
-        //     $rk->poll(50);
-        // }
-=======
         $conf = new RdKafka\Conf();
         $conf->setDrMsgCb(function ($kafka, $message) {
             file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
@@ -158,11 +141,6 @@ class mysqlToKafuka
             
             $rk->poll(50);
         }
->>>>>>> d836e7393f81fe9e3002d4b1e17a4169fd485999
-
-        
-
-
 
 
     }
@@ -173,7 +151,9 @@ class mysqlToKafuka
      * @return void
      */
     public function toRedis($data){
-
+        $key="tutorial-list";
+        $redis=Cache::store('redis')->handler();
+        $redis->Rpush($key,json_encode($data));
     }
 
 

+ 311 - 0
catch/api/service/mysqlToKafuka01.php

@@ -0,0 +1,311 @@
+<?php
+namespace catchAdmin\api\service;
+
+use think\facade\Cache;
+use think\facade\Db;
+
+//脚本衢州 目前不使用
+ //mysql 监听器
+class mysqlToKafuka01
+{
+   //1 kafka,2  redis
+    private $type;
+    private $redis;
+    public function __construct($type,$redis)
+    {
+       $this->type=$type;
+       $this->redis=$redis;
+    }
+    
+
+    
+    public function sql_monitor(){
+        $tables=["fjd_zcdj","fjdc_cbc_zcdj"];
+        while(true){
+            foreach($tables as $item){
+                $this->initData($item);
+            }
+        }
+
+    }
+    /**
+     * 初始化数据
+     *
+     * @return void
+     */
+    public function initData($table){
+        $time=$this->getUpdateTime($table,1);
+      
+        $list=[];
+        if(empty($time)){
+            $time=time();
+            
+            $list= Db::name($table)->where("ZHGXSJ","<=",date("Y-m-d H:i:s", $time))->order("ZHGXSJ","desc")->select();
+           
+        }else{
+            $list= Db::name($table)->where("ZHGXSJ","<=",date("Y-m-d H:i:s",$time))->order("ZHGXSJ","desc")->select();
+        }
+        
+        if(empty($list)){
+            debug_log("数据库解析","数据库:.$table"."数据无更新的数据休眠1s");
+            sleep(1);
+            return;
+        }
+       
+        $this->setData($list,$table);
+        $time=strtotime("Y-m-d H:i:s",$list[count($list)-1]["ZHGXSJ"]);
+        $this->getUpdateTime($table,2,$time);
+    }
+
+    private function setData($list,$table){
+
+        foreach($list as $item){
+           
+            $data=$this->Operation($table, $item);
+           
+            if(!empty($data)){
+                if($this->type==1){
+                    //存入kafuka
+                    $this->toKafuka($data);
+                }else{
+                    //存入redis
+                    $this->toRedis($data);
+                }
+            }
+        }
+    }
+    /**
+     * 业务处理
+     *
+     * @param [type] $table_name
+     * @param [type] $table_id
+     * @param [type] $type
+     * @return void
+     */
+    public function Operation($table_name,$data){
+      
+        switch($table_name)
+        {
+           case "fjd_zcdj":
+                return $this->fjd_zcdj($table_name,$data);  
+           case "fjdc_cbc_zcdj":
+                return $this->fjdc_cbc_zcdj($table_name,$data);
+    
+        }
+        
+    }
+    public function fjdc_cbc_zcdj($table_name,$data){
+        $json_data=[];
+        $time=strtotime("Y-m-d H:i:s",$data["ZHGXSJ"]);
+        $type= $this->getTableIdType($table_name,$data["FJDCXH"],$time);
+    
+        if($type=="add"){
+            $json_data=[
+                "PLATE_NO"=>$data["HPHM"],
+                "RFID_SN"=>"12345678",
+                "CAR_TYPE"=>$data["CLZL"],
+                "CAR_BRAND"=>$data['ZWPP'],
+                "NAME"=>$data["CZXM"],
+                "ID_CARD_NUMBER"=>$data['CZSFZMHM'],
+                "MOBILE_NUMBER"=>$data['LXDH'],
+                "INSTA_DATE"=>$time,
+                "INSTALLER"=>$data["CZY"],
+                "DATA_TYPE"=>"vehicle_save"
+            ];
+        }
+        if($type=="update"){
+            $json_data=[
+                "PLATE_NO"=>$data["HPHM"],
+                "OLD_NO"=>$data["HPHM"],
+                "RFID_SN"=>"77778888",
+                "CAR_TYPE"=>$data["CLZL"],
+                "CAR_BRAND"=>$data['ZWPP'],
+                "NAME"=>$data["CZXM"],
+                "ID_CARD_NUMBER"=>$data['CZSFZMHM'],
+                "MOBILE_NUMBER"=>$data['LXDH'],
+                "DATA_TYPE"=>"vehicle_update"
+            ];
+            
+        }
+        return $json_data;
+
+
+    }
+
+    public function fjd_zcdj($table_name,$data){
+       
+        $json_data=[];
+        $time=$data["ZHGXSJ"];
+        $type= $this->getTableIdType($table_name,$data["FJDCXH"],strtotime($time));
+        var_dump($data);
+        if($type=="add"){
+            $json_data=[
+                "PLATE_NO"=>$data["HPHM"],
+                "RFID_SN"=>"12345678",
+                "CAR_TYPE"=>$data["CLZL"],
+                "CAR_BRAND"=>$data['ZWPP'],
+                "NAME"=>$data["CZXM"],
+                "ID_CARD_NUMBER"=>$data['CZSFZMHM'],
+                "MOBILE_NUMBER"=>$data['LXDH'],
+                "INSTA_DATE"=>$time,
+                "INSTALLER"=>$data["CLQRR"],
+                "DATA_TYPE"=>"vehicle_save"
+            ];
+                
+
+        }
+        if($type=="update"){
+           
+            $json_data=[
+                "PLATE_NO"=>$data["HPHM"],
+                "OLD_NO"=>$data["HPHM"],
+                "RFID_SN"=>"77778888",
+                "CAR_TYPE"=>$data["CLZL"],
+                "CAR_BRAND"=>$data['ZWPP'],
+                "NAME"=>$data["CZXM"],
+                "ID_CARD_NUMBER"=>$data['CZSFZMHM'],
+                "MOBILE_NUMBER"=>$data['LXDH'],
+                "DATA_TYPE"=>"vehicle_update"
+            ];
+           
+
+        }
+      
+        return $json_data;
+
+    }
+    /**
+     * 获取redis缓存的库 id,updateTime时间戳
+     *
+     * @return void
+     */
+    private function getTableIdType($table,$id,$time){
+        $HashKey=$table."_redis";
+        $key=$id;
+        $data=["time"=>$time];
+        $old_data= $this->getHash($HashKey,$key);
+        $this->setHash($HashKey,$key,$data);
+         
+        if(empty($old_data)){
+            return "add";
+        }else{
+            return "update";
+        }
+      
+
+    }
+
+
+
+
+
+    /**
+     * function  获取 更新时间
+     *
+     * @param [type] $table 数据库表
+     * @param [type] $type 类型 1 查询 2 存储
+     * @param [type] $time 时间
+     * @return void
+     */
+    private function getUpdateTime($table,$type,$time=null){
+        $hashKey="car_update_time";
+        if($type==2){
+           $this->setHash($hashKey,$table,["time"=>$time]);
+           return null;
+        }else{
+           $data= $this->getHash($hashKey,$table,$time); 
+           if(empty($data)){
+             return false;
+           }else{
+             return $data["time"];
+           }
+        }
+        
+    }
+
+    public function getHash($hashKey,$key,$time=null){
+       
+        $res= $this->redis->hGet($hashKey,$key);
+        if(empty($res)){
+            return false;
+        }else{
+            return json_decode($res,true);
+        }
+
+    }
+    public function setHash($hashKey,$key,$data){
+        $this->redis->hSet($hashKey,$key,json_encode($data));
+       
+     } 
+     public function delHash($hashKey,$key){
+         $this->redis->hDel($hashKey,$key);
+     }
+ 
+
+
+    
+    /**
+     * 存入kafuka
+     *
+     * @param [type] $data
+     * @return void
+     */
+    public function toKafuka($data){
+
+        // $conf = new RdKafka\Conf();
+        // $conf->setDrMsgCb(function ($kafka, $message) {
+        //     file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
+        // });
+        // $conf->setErrorCb(function ($kafka, $err, $reason) {
+        //         file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
+        // });
+
+        // $rk = new RdKafka\Producer($conf);
+        // $rk->setLogLevel(LOG_DEBUG);
+        // $rk->addBrokers("127.0.0.1");
+        // $cf = new RdKafka\TopicConf();
+        // $cf->set('request.required.acks', 0);
+        // $topic = $rk->newTopic("test", $cf);
+        // $option = 'qkl';
+        // for ($i = 0; $i < 20; $i++) {
+        //      $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);
+        // }
+        // $len = $rk->getOutQLen();
+        //     while ($len > 0) {
+        //     $len = $rk->getOutQLen();
+            
+        //     $rk->poll(50);
+        // }
+
+
+    }
+      /**
+     * 存入redis
+     *
+     * @param [type] $data
+     * @return void
+     */
+    public function toRedis($data){
+        $key="tutorial-list";
+        $redis=Cache::store('redis')->handler();
+        $redis->Rpush($key,json_encode($data));
+    }
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+}

+ 1 - 1
task_script/MN_RL4RSSI_MQTT_CLIENT.php

@@ -7,7 +7,7 @@ date_default_timezone_set("PRC");
 define('HOST', '127.0.0.1');
 define('PORT', '6379');
 define('PASSWORD', '123456');
-define('DATABASE', 3);
+define('DATABASE', 4);
 
 
 function app_redis()