123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457 |
- <?php
- namespace catchAdmin\api\service;
- include "./DahuaUtil.php";
- ini_set('memory_limit', '1024M');
- use Exception;
- use thans\jwt\claim\Expiration;
- use think\facade\Cache;
- use think\facade\Config;
- use think\facade\Db;
- use think\facade\Env;
- //脚本衢州 目前不使用
- //mysql 监听器
- class mysqlToKafuka01
- {
- //1 kafka,2 redis
- private $type;
- private $redis;
- private $rk;
- private $topic;
-
- public function __construct($type,$redis)
- {
- $this->type=$type;
- $this->redis=$redis;
-
- }
-
- public function sql_monitor(){
- if (!extension_loaded('rdkafka')){
- $this->write_log('pushToKafka fail,extension of rdkafka has not installed!!'.PHP_EOL);
- $this->write_log('请重新安装kafka扩展');
- return false;
- }
- if (!extension_loaded('redis')){
-
- $this->write_log('redis fail,extension of rdkafka has not installed!!'.PHP_EOL);
- $this->write_log('请重新安装redis扩展');
- return false;
- }
- $tables=["fjd_zcdj"];
- $this->write_log("开始同步数据库:fjd_zcdj");
- while(true){
- try{
- foreach($tables as $item){
-
- $this->initData($item);
- }
- }catch(Exception $e){
- $this->write_log("同步数据库抛出异常:".$e->getMessage());
- $this->write_log("程序强行暂停");
- exit(0);
- }
-
- }
- }
- /**
- * 初始化数据
- *
- * @return void
- */
- public function initData($table){
- if($this->type==1){
- $this->init_kafka();
- }
- $time=$this->getUpdateTime($table,1);
- $this->write_log("获取数据库同步最新时间".$time);
- if(empty($time)){
- $this->write_log("获取失败,无最新时间,开始初始化将当前所有数据进行同步");
- $limit=100;
- $count= Db::name($table)->count();
- if($count==0){
- $this->write_log("当前数据库中无数据,开始监听");
- return;
- }
- $pageTotal= ceil($count/$limit);
- $this->write_log("当前共有数据.$count.条,按照.$limit.条数据为一页,共计.$pageTotal.页");
- $this->write_log("同步开始");
- for($i=1;$i<=$pageTotal;$i++){
- $this->write_log("第.$i.页进行同步.....");
- $list= Db::name($table)->order("ZHGXSJ","asc")->page($i)->limit($limit)->select()->toArray();
-
- $this->setData($table,$list);
- if($i==$pageTotal){
- $time= strtotime($list[count($list)-1]["ZHGXSJ"]);
- $this->getUpdateTime($table,2,$time);
- }
- $this->write_log("第.$i.页进行同步完成");
- }
- $this->write_log("同步全部完成.当前时间:".date("Y-m-d H:i:s",$time));
- $this->write_log("开始监听最新数据");
-
- }else{
- $this->write_log("开始监听当前时间之后的数据".date("Y-m-d H:i:s",$time));
- $list= Db::name($table)->where("ZHGXSJ",">",date("Y-m-d H:i:s",$time))->order("ZHGXSJ","asc")->select()->toArray();
- if(empty($list)){
- $this->write_log("数据库:.$table"."数据无更新的数据休眠1s");
- sleep(1);
- return;
- }
- $this->write_log("同步开始");
- $this->setData($table,$list);
- $this->write_log("同步结束");
-
- $time= strtotime($list[count($list)-1]["ZHGXSJ"]);
- $this->write_log("存入最新时间:".date("Y-m-d H:i:s",$time));
- $this->getUpdateTime($table,2,$time);
- }
-
-
-
-
- }
- /**
- * 存入数据 function
- *
- * @param [type] $table
- * @param [type] $list
- * @return void
- */
- private function setData($table,$list){
- foreach($list as $item){
-
- $data=$this->Operation($table, $item);
-
-
- if(!empty($data)){
- if($this->type==1){
- //存入kafuka
- $this->toKafka($data);
- }else{
- //存入redis
-
- $this->toRedis($data);
-
- }
- }
- }
- }
- /**
- * 业务处理
- *
- * @param [type] $table_name
- * @param [type] $table_id
- * @param [type] $type
- * @return void
- */
- public function Operation($table_name,$data){
-
- switch($table_name)
- {
- case "fjd_zcdj":
- return $this->fjd_zcdj($table_name,$data);
- case "fjdc_cbc_zcdj":
- return $this->fjdc_cbc_zcdj($table_name,$data);
- default:
- return false;
- }
-
- }
- public function fjdc_cbc_zcdj($table_name,$data){
- $json_data=[];
- $time=$data["ZHGXSJ"];
- $type= $this->getTableIdType($table_name,$data["FJDCXH"],strtotime($time));
-
- if($type=="add"){
- $json_data=[
- "PLATE_NO"=>$data["HPHM"],
- "RFID_SN"=>"12345678",
- "CAR_BRAND"=>$data['ZWPP'],
- "NAME"=>$data["CZXM"],
- "ID_CARD_NUMBER"=>$data['CZSFZMHM'],
- "MOBILE_NUMBER"=>$data['LXDH'],
- "INSTA_DATE"=>$time,
- "INSTALLER"=>$data["CZY"],
- "DATA_TYPE"=>"vehicle_save"
- ];
- }
- if($type=="update"){
- $json_data=[
- "PLATE_NO"=>$data["HPHM"],
- "OLD_NO"=>$data["HPHM"],
- "RFID_SN"=>"77778888",
- // "CAR_TYPE"=>$data["CLZL"],
- "CAR_BRAND"=>$data['ZWPP'],
- "NAME"=>$data["CZXM"],
- "ID_CARD_NUMBER"=>$data['CZSFZMHM'],
- "MOBILE_NUMBER"=>$data['LXDH'],
- "DATA_TYPE"=>"vehicle_update"
- ];
-
- }
- return $json_data;
- }
- public function fjd_zcdj($table_name,$data){
-
- $json_data=[];
- $time=$data["ZHGXSJ"];
- $type= $this->getTableIdType($table_name,$data["FJDCXH"],strtotime($time));
- $brand_id=$this->match_brand($data['ZWPP']);
- $car_type_id=$this->match_car_type($data['CLZL']);
- if($type=="add"){
- $json_data=[
- "PLATE_NO"=>$data["HPHM"],
- "RFID_SN"=>$data["GYHM"],
- "CAR TYPE"=>$car_type_id,
- "CAR_BRAND"=>$brand_id,
- "NAME"=>$data["CZXM"],
- "ID_CARD_NUMBER"=>$data['CZSFZMHM'],
- "MOBILE_NUMBER"=>$data['LXDH'],
- "INSTA_DATE"=>$time,
- "INSTALLER"=>1,
- "DATA_TYPE"=>"vehicle_save"
- ];
-
- }
- if($type=="update"){
-
- $json_data=[
- "PLATE_NO"=>$data["HPHM"],
- "RFID_SN"=>$data["GYHM"],
- "CAR TYPE"=>$car_type_id,
- "CAR_BRAND"=>$brand_id,
- "NAME"=>$data["CZXM"],
- "ID_CARD_NUMBER"=>$data['CZSFZMHM'],
- "MOBILE_NUMBER"=>$data['LXDH'],
- "DATA_TYPE"=>"vehicle_update"
- ];
-
- }
-
- return $json_data;
- }
- /**
- * 获取redis缓存的库 id,updateTime时间戳
- *
- * @return void
- */
- private function getTableIdType($table,$id,$time){
- $HashKey=$table."_redis";
- $key=$id;
- $data=["time"=>$time];
- $old_data= $this->getHash($HashKey,$key);
- $this->setHash($HashKey,$key,$data);
-
- if(empty($old_data)){
- return "add";
- }else{
- return "update";
- }
- }
- /**
- * function 获取 更新时间
- *
- * @param [type] $table 数据库表
- * @param [type] $type 类型 1 查询 2 存储
- * @param [type] $time 时间
- * @return void
- */
- private function getUpdateTime($table,$type,$time=null){
- $hashKey="car_update_time";
- if($type==2){
- $this->setHash($hashKey,$table,["time"=>$time]);
- return null;
- }else{
- $data= $this->getHash($hashKey,$table);
- if(empty($data)){
- return false;
- }else{
- return $data["time"];
- }
- }
-
- }
- public function getHash($hashKey,$key,$time=null){
-
- $res= $this->redis->hGet($hashKey,$key);
- if(empty($res)){
- return false;
- }else{
- return json_decode($res,true);
- }
- }
- public function setHash($hashKey,$key,$data){
- $this->redis->hSet($hashKey,$key,json_encode($data));
-
- }
- public function delHash($hashKey,$key){
- $this->redis->hDel($hashKey,$key);
- }
-
- public function init_kafka(){
- $this->write_log("初始化kafka");
- $conf = new Rdkafka\Conf();
- $kafka_ip= Env::get('kafka.ip', '');
- $topic=Env::get('kafka.topic', '');
- if(empty($kafka_ip)){
- throw new Exception("kafka的ip不存在");
- }
- if(empty($topic)){
- throw new Exception("kafka的主题不存在");
- }
- $conf->set('metadata.broker.list',$kafka_ip);
- $conf->setErrorCb(function ($kafka, $err, $reason) {
- throw new Exception("kafka错误: err:.$err.reason:.$reason");
- });
- $this->rk= new RdKafka\Producer($conf);
- $this->topic = $this->rk->newTopic($topic);
- }
-
- /**
- * 存入kafuka
- *
- * @param [type] $data
- * @return void
- */
- public function toKafka($data){
-
-
- $this->topic->produce(RD_KAFKA_PARTITION_UA, 0,json_encode($data));
- $this->rk->poll(0);
- while ($this->rk->getOutQLen() >300) {
- $this->rk->poll(10);
- }
- }
- /**
- * 存入redis
- *
- * @param [type] $data
- * @return void
- */
- public function toRedis($data){
- $key="tutorial-list";
- $redis=Cache::store('redis')->handler();
- $redis->Rpush($key,json_encode($data));
- }
- public function write_log($text){
- DahuaUtil::rlog($text);
- }
- /**
- * function 车牌匹配
- * @param [type] brand 车牌名称
- * @return void
- */
- private function match_brand($brand){
- $brand_list= Config::get("app.CAR_BRAND");
- $brand_key=99;
-
-
- if(empty($brand_list)){
- throw new Exception("车辆品牌不存在");
- }
- if(empty($brand)){
- return $brand_key;
- }
-
- $bool=false;
- foreach($brand_list as $key=>$val){
- if(strlen($val)<=strlen($brand)){
- $bool= stristr($brand,$val);
- }else{
- $bool= stristr($val,$brand);
- }
- if($bool){
- $brand_key=$key;
- break;
- }
- }
-
- return $brand_key;
- }
- private function match_car_type($car_type){
- $type_list= Config::get("app.CAR_TYPE");
- $type_key=4;
- if(empty($type_list)){
- throw new Exception("车辆类型列表不存在");
- }
- if(empty($car_type)){
- return $type_key;
- }
-
- $bool=false;
- foreach($type_list as $key=>$val){
- if(strlen($val)<=strlen($car_type)){
- $bool= stristr($car_type,$val);
- }else{
- $bool= stristr($val,$car_type);
- }
- if($bool){
- $type_key=$key;
- break;
- }
- }
-
- return $type_key;
- }
- }
|