likang hai 1 ano
pai
achega
41c5ab8383
Modificáronse 2 ficheiros con 346 adicións e 0 borrados
  1. 35 0
      catch/api/controller/sqlLister.php
  2. 311 0
      catch/api/service/mysqlToKafuka01.php

+ 35 - 0
catch/api/controller/sqlLister.php

@@ -0,0 +1,35 @@
+<?php
+namespace catchAdmin\api\controller;
+
+use catchAdmin\api\service\mysqlToKafuka01;
+use catcher\base\CatchController;
+use think\facade\Cache;
+
+class sqlLister extends CatchController
+{
+    /**
+     * 监听者 function
+     *
+     * @return void
+     */
+    public function lister(){
+       $redis=Cache::store('redis')->handler();
+       $sqlTokafuka=  new mysqlToKafuka01(2,$redis);
+       
+       $sqlTokafuka->sql_monitor(); 
+
+    }
+
+
+
+
+
+
+
+
+
+
+
+
+
+}

+ 311 - 0
catch/api/service/mysqlToKafuka01.php

@@ -0,0 +1,311 @@
+<?php
+namespace catchAdmin\api\service;
+
+use think\facade\Cache;
+use think\facade\Db;
+
+//脚本衢州 目前不使用
+ //mysql 监听器
+class mysqlToKafuka01
+{
+   //1 kafka,2  redis
+    private $type;
+    private $redis;
+    public function __construct($type,$redis)
+    {
+       $this->type=$type;
+       $this->redis=$redis;
+    }
+    
+
+    
+    public function sql_monitor(){
+        $tables=["fjd_zcdj","fjdc_cbc_zcdj"];
+        while(true){
+            foreach($tables as $item){
+                $this->initData($item);
+            }
+        }
+
+    }
+    /**
+     * 初始化数据
+     *
+     * @return void
+     */
+    public function initData($table){
+        $time=$this->getUpdateTime($table,1);
+      
+        $list=[];
+        if(empty($time)){
+            $time=time();
+            
+            $list= Db::name($table)->where("ZHGXSJ","<=",date("Y-m-d H:i:s", $time))->order("ZHGXSJ","desc")->select();
+           
+        }else{
+            $list= Db::name($table)->where("ZHGXSJ","<=",date("Y-m-d H:i:s",$time))->order("ZHGXSJ","desc")->select();
+        }
+        
+        if(empty($list)){
+            debug_log("数据库解析","数据库:.$table"."数据无更新的数据休眠1s");
+            sleep(1);
+            return;
+        }
+       
+        $this->setData($list,$table);
+        $time=strtotime("Y-m-d H:i:s",$list[count($list)-1]["ZHGXSJ"]);
+        $this->getUpdateTime($table,2,$time);
+    }
+
+    private function setData($list,$table){
+
+        foreach($list as $item){
+           
+            $data=$this->Operation($table, $item);
+           
+            if(!empty($data)){
+                if($this->type==1){
+                    //存入kafuka
+                    $this->toKafuka($data);
+                }else{
+                    //存入redis
+                    $this->toRedis($data);
+                }
+            }
+        }
+    }
+    /**
+     * 业务处理
+     *
+     * @param [type] $table_name
+     * @param [type] $table_id
+     * @param [type] $type
+     * @return void
+     */
+    public function Operation($table_name,$data){
+      
+        switch($table_name)
+        {
+           case "fjd_zcdj":
+                return $this->fjd_zcdj($table_name,$data);  
+           case "fjdc_cbc_zcdj":
+                return $this->fjdc_cbc_zcdj($table_name,$data);
+    
+        }
+        
+    }
+    public function fjdc_cbc_zcdj($table_name,$data){
+        $json_data=[];
+        $time=strtotime("Y-m-d H:i:s",$data["ZHGXSJ"]);
+        $type= $this->getTableIdType($table_name,$data["FJDCXH"],$time);
+    
+        if($type=="add"){
+            $json_data=[
+                "PLATE_NO"=>$data["HPHM"],
+                "RFID_SN"=>"12345678",
+                "CAR_TYPE"=>$data["CLZL"],
+                "CAR_BRAND"=>$data['ZWPP'],
+                "NAME"=>$data["CZXM"],
+                "ID_CARD_NUMBER"=>$data['CZSFZMHM'],
+                "MOBILE_NUMBER"=>$data['LXDH'],
+                "INSTA_DATE"=>$time,
+                "INSTALLER"=>$data["CZY"],
+                "DATA_TYPE"=>"vehicle_save"
+            ];
+        }
+        if($type=="update"){
+            $json_data=[
+                "PLATE_NO"=>$data["HPHM"],
+                "OLD_NO"=>$data["HPHM"],
+                "RFID_SN"=>"77778888",
+                "CAR_TYPE"=>$data["CLZL"],
+                "CAR_BRAND"=>$data['ZWPP'],
+                "NAME"=>$data["CZXM"],
+                "ID_CARD_NUMBER"=>$data['CZSFZMHM'],
+                "MOBILE_NUMBER"=>$data['LXDH'],
+                "DATA_TYPE"=>"vehicle_update"
+            ];
+            
+        }
+        return $json_data;
+
+
+    }
+
+    public function fjd_zcdj($table_name,$data){
+       
+        $json_data=[];
+        $time=$data["ZHGXSJ"];
+        $type= $this->getTableIdType($table_name,$data["FJDCXH"],strtotime($time));
+        var_dump($data);
+        if($type=="add"){
+            $json_data=[
+                "PLATE_NO"=>$data["HPHM"],
+                "RFID_SN"=>"12345678",
+                "CAR_TYPE"=>$data["CLZL"],
+                "CAR_BRAND"=>$data['ZWPP'],
+                "NAME"=>$data["CZXM"],
+                "ID_CARD_NUMBER"=>$data['CZSFZMHM'],
+                "MOBILE_NUMBER"=>$data['LXDH'],
+                "INSTA_DATE"=>$time,
+                "INSTALLER"=>$data["CLQRR"],
+                "DATA_TYPE"=>"vehicle_save"
+            ];
+                
+
+        }
+        if($type=="update"){
+           
+            $json_data=[
+                "PLATE_NO"=>$data["HPHM"],
+                "OLD_NO"=>$data["HPHM"],
+                "RFID_SN"=>"77778888",
+                "CAR_TYPE"=>$data["CLZL"],
+                "CAR_BRAND"=>$data['ZWPP'],
+                "NAME"=>$data["CZXM"],
+                "ID_CARD_NUMBER"=>$data['CZSFZMHM'],
+                "MOBILE_NUMBER"=>$data['LXDH'],
+                "DATA_TYPE"=>"vehicle_update"
+            ];
+           
+
+        }
+      
+        return $json_data;
+
+    }
+    /**
+     * 获取redis缓存的库 id,updateTime时间戳
+     *
+     * @return void
+     */
+    private function getTableIdType($table,$id,$time){
+        $HashKey=$table."_redis";
+        $key=$id;
+        $data=["time"=>$time];
+        $old_data= $this->getHash($HashKey,$key);
+        $this->setHash($HashKey,$key,$data);
+         
+        if(empty($old_data)){
+            return "add";
+        }else{
+            return "update";
+        }
+      
+
+    }
+
+
+
+
+
+    /**
+     * function  获取 更新时间
+     *
+     * @param [type] $table 数据库表
+     * @param [type] $type 类型 1 查询 2 存储
+     * @param [type] $time 时间
+     * @return void
+     */
+    private function getUpdateTime($table,$type,$time=null){
+        $hashKey="car_update_time";
+        if($type==2){
+           $this->setHash($hashKey,$table,["time"=>$time]);
+           return null;
+        }else{
+           $data= $this->getHash($hashKey,$table,$time); 
+           if(empty($data)){
+             return false;
+           }else{
+             return $data["time"];
+           }
+        }
+        
+    }
+
+    public function getHash($hashKey,$key,$time=null){
+       
+        $res= $this->redis->hGet($hashKey,$key);
+        if(empty($res)){
+            return false;
+        }else{
+            return json_decode($res,true);
+        }
+
+    }
+    public function setHash($hashKey,$key,$data){
+        $this->redis->hSet($hashKey,$key,json_encode($data));
+       
+     } 
+     public function delHash($hashKey,$key){
+         $this->redis->hDel($hashKey,$key);
+     }
+ 
+
+
+    
+    /**
+     * 存入kafuka
+     *
+     * @param [type] $data
+     * @return void
+     */
+    public function toKafuka($data){
+
+        // $conf = new RdKafka\Conf();
+        // $conf->setDrMsgCb(function ($kafka, $message) {
+        //     file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
+        // });
+        // $conf->setErrorCb(function ($kafka, $err, $reason) {
+        //         file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
+        // });
+
+        // $rk = new RdKafka\Producer($conf);
+        // $rk->setLogLevel(LOG_DEBUG);
+        // $rk->addBrokers("127.0.0.1");
+        // $cf = new RdKafka\TopicConf();
+        // $cf->set('request.required.acks', 0);
+        // $topic = $rk->newTopic("test", $cf);
+        // $option = 'qkl';
+        // for ($i = 0; $i < 20; $i++) {
+        //      $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);
+        // }
+        // $len = $rk->getOutQLen();
+        //     while ($len > 0) {
+        //     $len = $rk->getOutQLen();
+            
+        //     $rk->poll(50);
+        // }
+
+
+    }
+      /**
+     * 存入redis
+     *
+     * @param [type] $data
+     * @return void
+     */
+    public function toRedis($data){
+        $key="tutorial-list";
+        $redis=Cache::store('redis')->handler();
+        $redis->Rpush($key,json_encode($data));
+    }
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+}