type=$type; $this->redis=$redis; } public function sql_monitor(){ if (!extension_loaded('rdkafka')){ $this->write_log('pushToKafka fail,extension of rdkafka has not installed!!'.PHP_EOL); $this->write_log('请重新安装kafka扩展'); return false; } if (!extension_loaded('redis')){ $this->write_log('redis fail,extension of rdkafka has not installed!!'.PHP_EOL); $this->write_log('请重新安装redis扩展'); return false; } $tables=["fjd_zcdj"]; $this->write_log("开始同步数据库:fjd_zcdj"); while(true){ try{ foreach($tables as $item){ $this->initData($item); } }catch(Exception $e){ $this->write_log("同步数据库抛出异常:".$e->getMessage()); $this->write_log("程序强行暂停"); exit(0); } } } /** * 初始化数据 * * @return void */ public function initData($table){ if($this->type==1){ $this->init_kafka(); } $time=$this->getUpdateTime($table,1); $this->write_log("获取数据库同步最新时间".$time); if(empty($time)){ $this->write_log("获取失败,无最新时间,开始初始化将当前所有数据进行同步"); $limit=100; $count= Db::name($table)->count(); if($count==0){ $this->write_log("当前数据库中无数据,开始监听"); return; } $pageTotal= ceil($count/$limit); $this->write_log("当前共有数据.$count.条,按照.$limit.条数据为一页,共计.$pageTotal.页"); $this->write_log("同步开始"); for($i=1;$i<=$pageTotal;$i++){ $this->write_log("第.$i.页进行同步....."); $list= Db::name($table)->order("ZHGXSJ","asc")->page($i)->limit($limit)->select()->toArray(); $this->setData($table,$list); if($i==$pageTotal){ $time= strtotime($list[count($list)-1]["ZHGXSJ"]); $this->getUpdateTime($table,2,$time); } $this->write_log("第.$i.页进行同步完成"); } $this->write_log("同步全部完成.当前时间:".date("Y-m-d H:i:s",$time)); $this->write_log("开始监听最新数据"); }else{ $this->write_log("开始监听当前时间之后的数据".date("Y-m-d H:i:s",$time)); $list= Db::name($table)->where("ZHGXSJ",">",date("Y-m-d H:i:s",$time))->order("ZHGXSJ","asc")->select()->toArray(); if(empty($list)){ $this->write_log("数据库:.$table"."数据无更新的数据休眠1s"); sleep(1); return; } $this->write_log("同步开始"); $this->setData($table,$list); $this->write_log("同步结束"); $time= strtotime($list[count($list)-1]["ZHGXSJ"]); $this->write_log("存入最新时间:".date("Y-m-d H:i:s",$time)); $this->getUpdateTime($table,2,$time); } } /** * 存入数据 function * * @param [type] $table * @param [type] $list * @return void */ private function setData($table,$list){ foreach($list as $item){ $data=$this->Operation($table, $item); if(!empty($data)){ if($this->type==1){ //存入kafuka $this->toKafka($data); }else{ //存入redis $this->toRedis($data); } } } } /** * 业务处理 * * @param [type] $table_name * @param [type] $table_id * @param [type] $type * @return void */ public function Operation($table_name,$data){ switch($table_name) { case "fjd_zcdj": return $this->fjd_zcdj($table_name,$data); case "fjdc_cbc_zcdj": return $this->fjdc_cbc_zcdj($table_name,$data); default: return false; } } public function fjdc_cbc_zcdj($table_name,$data){ $json_data=[]; $time=$data["ZHGXSJ"]; $type= $this->getTableIdType($table_name,$data["FJDCXH"],strtotime($time)); if($type=="add"){ $json_data=[ "PLATE_NO"=>$data["HPHM"], "RFID_SN"=>"12345678", "CAR_BRAND"=>$data['ZWPP'], "NAME"=>$data["CZXM"], "ID_CARD_NUMBER"=>$data['CZSFZMHM'], "MOBILE_NUMBER"=>$data['LXDH'], "INSTA_DATE"=>$time, "INSTALLER"=>$data["CZY"], "DATA_TYPE"=>"vehicle_save" ]; } if($type=="update"){ $json_data=[ "PLATE_NO"=>$data["HPHM"], "OLD_NO"=>$data["HPHM"], "RFID_SN"=>"77778888", // "CAR_TYPE"=>$data["CLZL"], "CAR_BRAND"=>$data['ZWPP'], "NAME"=>$data["CZXM"], "ID_CARD_NUMBER"=>$data['CZSFZMHM'], "MOBILE_NUMBER"=>$data['LXDH'], "DATA_TYPE"=>"vehicle_update" ]; } return $json_data; } public function fjd_zcdj($table_name,$data){ $json_data=[]; $time=$data["ZHGXSJ"]; $type= $this->getTableIdType($table_name,$data["FJDCXH"],strtotime($time)); $brand_id=$this->match_brand($data['ZWPP']); $car_type_id=$this->match_car_type($data['CLZL']); if($type=="add"){ $json_data=[ "PLATE_NO"=>$data["HPHM"], "RFID_SN"=>$data["GYHM"], "CAR TYPE"=>$car_type_id, "CAR_BRAND"=>$brand_id, "NAME"=>$data["CZXM"], "ID_CARD_NUMBER"=>$data['CZSFZMHM'], "MOBILE_NUMBER"=>$data['LXDH'], "INSTA_DATE"=>$time, "INSTALLER"=>1, "DATA_TYPE"=>"vehicle_save" ]; } if($type=="update"){ $json_data=[ "PLATE_NO"=>$data["HPHM"], "RFID_SN"=>$data["GYHM"], "CAR TYPE"=>$car_type_id, "CAR_BRAND"=>$brand_id, "NAME"=>$data["CZXM"], "ID_CARD_NUMBER"=>$data['CZSFZMHM'], "MOBILE_NUMBER"=>$data['LXDH'], "DATA_TYPE"=>"vehicle_update" ]; } return $json_data; } /** * 获取redis缓存的库 id,updateTime时间戳 * * @return void */ private function getTableIdType($table,$id,$time){ $HashKey=$table."_redis"; $key=$id; $data=["time"=>$time]; $old_data= $this->getHash($HashKey,$key); $this->setHash($HashKey,$key,$data); if(empty($old_data)){ return "add"; }else{ return "update"; } } /** * function 获取 更新时间 * * @param [type] $table 数据库表 * @param [type] $type 类型 1 查询 2 存储 * @param [type] $time 时间 * @return void */ private function getUpdateTime($table,$type,$time=null){ $hashKey="car_update_time"; if($type==2){ $this->setHash($hashKey,$table,["time"=>$time]); return null; }else{ $data= $this->getHash($hashKey,$table); if(empty($data)){ return false; }else{ return $data["time"]; } } } public function getHash($hashKey,$key,$time=null){ $res= $this->redis->hGet($hashKey,$key); if(empty($res)){ return false; }else{ return json_decode($res,true); } } public function setHash($hashKey,$key,$data){ $this->redis->hSet($hashKey,$key,json_encode($data)); } public function delHash($hashKey,$key){ $this->redis->hDel($hashKey,$key); } public function init_kafka(){ $this->write_log("初始化kafka"); $conf = new Rdkafka\Conf(); $kafka_ip= Env::get('kafka.ip', ''); $topic=Env::get('kafka.topic', ''); if(empty($kafka_ip)){ throw new Exception("kafka的ip不存在"); } if(empty($topic)){ throw new Exception("kafka的主题不存在"); } $conf->set('metadata.broker.list',$kafka_ip); $conf->setErrorCb(function ($kafka, $err, $reason) { throw new Exception("kafka错误: err:.$err.reason:.$reason"); }); $this->rk= new RdKafka\Producer($conf); $this->topic = $this->rk->newTopic($topic); } /** * 存入kafuka * * @param [type] $data * @return void */ public function toKafka($data){ $this->topic->produce(RD_KAFKA_PARTITION_UA, 0,json_encode($data)); $this->rk->poll(0); while ($this->rk->getOutQLen() >300) { $this->rk->poll(10); } } /** * 存入redis * * @param [type] $data * @return void */ public function toRedis($data){ $key="tutorial-list"; $redis=Cache::store('redis')->handler(); $redis->Rpush($key,json_encode($data)); } public function write_log($text){ DahuaUtil::rlog($text); } /** * function 车牌匹配 * @param [type] brand 车牌名称 * @return void */ private function match_brand($brand){ $brand_list= Config::get("app.CAR_BRAND"); $brand_key=99; if(empty($brand_list)){ throw new Exception("车辆品牌不存在"); } if(empty($brand)){ return $brand_key; } $bool=false; foreach($brand_list as $key=>$val){ if(strlen($val)<=strlen($brand)){ $bool= stristr($brand,$val); }else{ $bool= stristr($val,$brand); } if($bool){ $brand_key=$key; break; } } return $brand_key; } private function match_car_type($car_type){ $type_list= Config::get("app.CAR_TYPE"); $type_key=4; if(empty($type_list)){ throw new Exception("车辆类型列表不存在"); } if(empty($car_type)){ return $type_key; } $bool=false; foreach($type_list as $key=>$val){ if(strlen($val)<=strlen($car_type)){ $bool= stristr($car_type,$val); }else{ $bool= stristr($val,$car_type); } if($bool){ $type_key=$key; break; } } return $type_key; } }