123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- <?php
- namespace catchAdmin\api\service;
- use think\facade\Cache;
- use think\facade\Db;
- //脚本衢州 目前不使用
- //mysql 监听器
- class mysqlToKafuka
- {
- //1 kafka,2 redis
- private $type;
-
- public function __construct($type)
- {
- $this->type=$type;
-
- }
-
- public function sql_monitor(){
- $table="test";
- $list=Db::name($table)->where("status",1)->order("id","asc")->select();
- foreach($list as $key=>$item){
- $table_name=$item["table_name"];
- $type=$item["type"];
- $table_id=$item["table_id"];
- $data=$this->Operation($table_name,$table_id,$type);
- if(!empty($data)){
- if($this->type==1){
- //存入kafuka
- $this->toKafuka($data);
- }else{
- //存入redis
- $this->toRedis($data);
- }
- }
- Db::name($table)->where("id",$item["id"])->delete();
- }
- }
- /**
- * 业务处理
- *
- * @param [type] $table_name
- * @param [type] $table_id
- * @param [type] $type
- * @return void
- */
- public function Operation($table_name,$table_id,$type){
- switch($table_name)
- {
- case "fjd_zcdj":
- return $this->fjd_zcdj($table_name,$table_id,$type);
- }
-
- }
- public function fjd_zcdj($table_name,$table_id,$type){
- $data=Db::name($table_name)->where("id",$table_id)->find();
- $json_data=false;
- if(empty($data)){
- return false;
- }
-
- if($type=="add"){
- $json_data=[
- "PLATE_NO"=>$data["HPHM"],
- "RFID_SN"=>"12345678",
- "CAR_TYPE"=>"1",
- "CAR_BRAND"=>"1",
- "NAME"=>"测试A",
- "ID_CARD_NUMBER"=>"330127199301171835",
- "MOBILE_NUMBER"=>"15706857065",
- "INSTA_DATE"=>"2023-05-24 09:13:16",
- "INSTALLER"=>"超级管理员",
- "DATA_TYPE"=>"vehicle_save"
- ];
-
- }
- if($type=="update"){
- $json_data=[
- "PLATE_NO"=>"测试AAAAAA",
- "OLD_NO"=>"测试AAAAAA",
- "RFID_SN"=>"77778888",
- "CAR_TYPE"=>"1",
- "CAR_BRAND"=>"1",
- "NAME"=>"测试A",
- "ID_CARD_NUMBER"=>"330127199301171835",
- "MOBILE_NUMBER"=>"15706857065",
- "DATA_TYPE"=>"vehicle_update"
- ];
-
- }
- if($type=="delete"){
- $json_data=[
- "PLATE_NO"=>"测试AAAAAA",
- "DATA_TYPE"=>"vehicle_delete"
- ];
- }
- return $json_data;
- }
- /**
- * 存入kafuka
- *
- * @param [type] $data
- * @return void
- */
- public function toKafuka($data){
- $conf = new RdKafka\Conf();
- $conf->setDrMsgCb(function ($kafka, $message) {
- file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
- });
- $conf->setErrorCb(function ($kafka, $err, $reason) {
- file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
- });
- $rk = new RdKafka\Producer($conf);
- $rk->setLogLevel(LOG_DEBUG);
- $rk->addBrokers("127.0.0.1");
- $cf = new RdKafka\TopicConf();
- $cf->set('request.required.acks', 0);
- $topic = $rk->newTopic("test", $cf);
- $option = 'qkl';
- for ($i = 0; $i < 20; $i++) {
- $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);
- }
- $len = $rk->getOutQLen();
- while ($len > 0) {
- $len = $rk->getOutQLen();
-
- $rk->poll(50);
- }
- }
- /**
- * 存入redis
- *
- * @param [type] $data
- * @return void
- */
- public function toRedis($data){
- $key="tutorial-list";
- $redis=Cache::store('redis')->handler();
- $redis->Rpush($key,json_encode($data));
- }
- }
|