type=$type; } public function sql_monitor(){ $table="test"; $list=Db::name($table)->where("status",1)->order("id","asc")->select(); foreach($list as $key=>$item){ $table_name=$item["table_name"]; $type=$item["type"]; $table_id=$item["table_id"]; $data=$this->Operation($table_name,$table_id,$type); if(!empty($data)){ if($this->type==1){ //存入kafuka $this->toKafuka($data); }else{ //存入redis $this->toRedis($data); } } Db::name($table)->where("id",$item["id"])->delete(); } } /** * 业务处理 * * @param [type] $table_name * @param [type] $table_id * @param [type] $type * @return void */ public function Operation($table_name,$table_id,$type){ switch($table_name) { case "fjd_zcdj": return $this->fjd_zcdj($table_name,$table_id,$type); } } public function fjd_zcdj($table_name,$table_id,$type){ $data=Db::name($table_name)->where("id",$table_id)->find(); $json_data=false; if(empty($data)){ return false; } if($type=="add"){ $json_data=[ "PLATE_NO"=>$data["HPHM"], "RFID_SN"=>"12345678", "CAR_TYPE"=>"1", "CAR_BRAND"=>"1", "NAME"=>"测试A", "ID_CARD_NUMBER"=>"330127199301171835", "MOBILE_NUMBER"=>"15706857065", "INSTA_DATE"=>"2023-05-24 09:13:16", "INSTALLER"=>"超级管理员", "DATA_TYPE"=>"vehicle_save" ]; } if($type=="update"){ $json_data=[ "PLATE_NO"=>"测试AAAAAA", "OLD_NO"=>"测试AAAAAA", "RFID_SN"=>"77778888", "CAR_TYPE"=>"1", "CAR_BRAND"=>"1", "NAME"=>"测试A", "ID_CARD_NUMBER"=>"330127199301171835", "MOBILE_NUMBER"=>"15706857065", "DATA_TYPE"=>"vehicle_update" ]; } if($type=="delete"){ $json_data=[ "PLATE_NO"=>"测试AAAAAA", "DATA_TYPE"=>"vehicle_delete" ]; } return $json_data; } /** * 存入kafuka * * @param [type] $data * @return void */ public function toKafuka($data){ $conf = new RdKafka\Conf(); $conf->setDrMsgCb(function ($kafka, $message) { file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND); }); $conf->setErrorCb(function ($kafka, $err, $reason) { file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND); }); $rk = new RdKafka\Producer($conf); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers("127.0.0.1"); $cf = new RdKafka\TopicConf(); $cf->set('request.required.acks', 0); $topic = $rk->newTopic("test", $cf); $option = 'qkl'; for ($i = 0; $i < 20; $i++) { $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option); } $len = $rk->getOutQLen(); while ($len > 0) { $len = $rk->getOutQLen(); $rk->poll(50); } } /** * 存入redis * * @param [type] $data * @return void */ public function toRedis($data){ $key="tutorial-list"; $redis=Cache::store('redis')->handler(); $redis->Rpush($key,json_encode($data)); } }