type=$type; $this->redis=$redis; } public function sql_monitor(){ $tables=["fjd_zcdj","fjdc_cbc_zcdj"]; while(true){ foreach($tables as $item){ $this->initData($item); } } } /** * 初始化数据 * * @return void */ public function initData($table){ $time=$this->getUpdateTime($table,1); $list=[]; if(empty($time)){ $time=time(); $list= Db::name($table)->where("ZHGXSJ","<=",date("Y-m-d H:i:s", $time))->order("ZHGXSJ","desc")->select(); }else{ $list= Db::name($table)->where("ZHGXSJ","<=",date("Y-m-d H:i:s",$time))->order("ZHGXSJ","desc")->select(); } if(empty($list)){ debug_log("数据库解析","数据库:.$table"."数据无更新的数据休眠1s"); sleep(1); return; } $this->setData($list,$table); $time=strtotime("Y-m-d H:i:s",$list[count($list)-1]["ZHGXSJ"]); $this->getUpdateTime($table,2,$time); } private function setData($list,$table){ foreach($list as $item){ $data=$this->Operation($table, $item); if(!empty($data)){ if($this->type==1){ //存入kafuka $this->toKafuka($data); }else{ //存入redis $this->toRedis($data); } } } } /** * 业务处理 * * @param [type] $table_name * @param [type] $table_id * @param [type] $type * @return void */ public function Operation($table_name,$data){ switch($table_name) { case "fjd_zcdj": return $this->fjd_zcdj($table_name,$data); case "fjdc_cbc_zcdj": return $this->fjdc_cbc_zcdj($table_name,$data); } } public function fjdc_cbc_zcdj($table_name,$data){ $json_data=[]; $time=strtotime("Y-m-d H:i:s",$data["ZHGXSJ"]); $type= $this->getTableIdType($table_name,$data["FJDCXH"],$time); if($type=="add"){ $json_data=[ "PLATE_NO"=>$data["HPHM"], "RFID_SN"=>"12345678", "CAR_TYPE"=>$data["CLZL"], "CAR_BRAND"=>$data['ZWPP'], "NAME"=>$data["CZXM"], "ID_CARD_NUMBER"=>$data['CZSFZMHM'], "MOBILE_NUMBER"=>$data['LXDH'], "INSTA_DATE"=>$time, "INSTALLER"=>$data["CZY"], "DATA_TYPE"=>"vehicle_save" ]; } if($type=="update"){ $json_data=[ "PLATE_NO"=>$data["HPHM"], "OLD_NO"=>$data["HPHM"], "RFID_SN"=>"77778888", "CAR_TYPE"=>$data["CLZL"], "CAR_BRAND"=>$data['ZWPP'], "NAME"=>$data["CZXM"], "ID_CARD_NUMBER"=>$data['CZSFZMHM'], "MOBILE_NUMBER"=>$data['LXDH'], "DATA_TYPE"=>"vehicle_update" ]; } return $json_data; } public function fjd_zcdj($table_name,$data){ $json_data=[]; $time=$data["ZHGXSJ"]; $type= $this->getTableIdType($table_name,$data["FJDCXH"],strtotime($time)); var_dump($data); if($type=="add"){ $json_data=[ "PLATE_NO"=>$data["HPHM"], "RFID_SN"=>"12345678", "CAR_TYPE"=>$data["CLZL"], "CAR_BRAND"=>$data['ZWPP'], "NAME"=>$data["CZXM"], "ID_CARD_NUMBER"=>$data['CZSFZMHM'], "MOBILE_NUMBER"=>$data['LXDH'], "INSTA_DATE"=>$time, "INSTALLER"=>$data["CLQRR"], "DATA_TYPE"=>"vehicle_save" ]; } if($type=="update"){ $json_data=[ "PLATE_NO"=>$data["HPHM"], "OLD_NO"=>$data["HPHM"], "RFID_SN"=>"77778888", "CAR_TYPE"=>$data["CLZL"], "CAR_BRAND"=>$data['ZWPP'], "NAME"=>$data["CZXM"], "ID_CARD_NUMBER"=>$data['CZSFZMHM'], "MOBILE_NUMBER"=>$data['LXDH'], "DATA_TYPE"=>"vehicle_update" ]; } return $json_data; } /** * 获取redis缓存的库 id,updateTime时间戳 * * @return void */ private function getTableIdType($table,$id,$time){ $HashKey=$table."_redis"; $key=$id; $data=["time"=>$time]; $old_data= $this->getHash($HashKey,$key); $this->setHash($HashKey,$key,$data); if(empty($old_data)){ return "add"; }else{ return "update"; } } /** * function 获取 更新时间 * * @param [type] $table 数据库表 * @param [type] $type 类型 1 查询 2 存储 * @param [type] $time 时间 * @return void */ private function getUpdateTime($table,$type,$time=null){ $hashKey="car_update_time"; if($type==2){ $this->setHash($hashKey,$table,["time"=>$time]); return null; }else{ $data= $this->getHash($hashKey,$table,$time); if(empty($data)){ return false; }else{ return $data["time"]; } } } public function getHash($hashKey,$key,$time=null){ $res= $this->redis->hGet($hashKey,$key); if(empty($res)){ return false; }else{ return json_decode($res,true); } } public function setHash($hashKey,$key,$data){ $this->redis->hSet($hashKey,$key,json_encode($data)); } public function delHash($hashKey,$key){ $this->redis->hDel($hashKey,$key); } /** * 存入kafuka * * @param [type] $data * @return void */ public function toKafuka($data){ // $conf = new RdKafka\Conf(); // $conf->setDrMsgCb(function ($kafka, $message) { // file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND); // }); // $conf->setErrorCb(function ($kafka, $err, $reason) { // file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND); // }); // $rk = new RdKafka\Producer($conf); // $rk->setLogLevel(LOG_DEBUG); // $rk->addBrokers("127.0.0.1"); // $cf = new RdKafka\TopicConf(); // $cf->set('request.required.acks', 0); // $topic = $rk->newTopic("test", $cf); // $option = 'qkl'; // for ($i = 0; $i < 20; $i++) { // $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option); // } // $len = $rk->getOutQLen(); // while ($len > 0) { // $len = $rk->getOutQLen(); // $rk->poll(50); // } } /** * 存入redis * * @param [type] $data * @return void */ public function toRedis($data){ $key="tutorial-list"; $redis=Cache::store('redis')->handler(); $redis->Rpush($key,json_encode($data)); } }