|
@@ -4,10 +4,22 @@ namespace catchAdmin\api\service;
|
|
|
use think\facade\Db;
|
|
|
|
|
|
//脚本衢州 从mysql中存到kafka/redis中
|
|
|
+ //mysql 监听器
|
|
|
class mysqlToKafuka
|
|
|
{
|
|
|
- //mysql 监听器
|
|
|
+ //1 kafka,2 redis
|
|
|
+ private $type;
|
|
|
+
|
|
|
+ public function __construct($type)
|
|
|
+ {
|
|
|
+ $this->type=$type;
|
|
|
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
public function sql_monitor(){
|
|
|
$table="test";
|
|
|
$list=Db::name($table)->where("status",1)->select();
|
|
@@ -15,6 +27,7 @@ class mysqlToKafuka
|
|
|
$table_name=$item["table_name"];
|
|
|
$type=$item["type"];
|
|
|
$table_id=$item["table_id"];
|
|
|
+ $this->Operation($table_name,$table_id,$type);
|
|
|
}
|
|
|
|
|
|
}
|
|
@@ -30,23 +43,108 @@ class mysqlToKafuka
|
|
|
switch($table_name)
|
|
|
{
|
|
|
case "fjd_zcdj":
|
|
|
+ return $this->fjd_zcdj($table_name,$table_id,$type);
|
|
|
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
|
|
|
+ }
|
|
|
+
|
|
|
+ public function fjd_zcdj($table_name,$table_id,$type){
|
|
|
+ $data=Db::name($table_name)->where("id",$table_id)->find();
|
|
|
+ $json_data=false;
|
|
|
|
|
|
+ if(empty($data)){
|
|
|
+ return false;
|
|
|
}
|
|
|
-
|
|
|
+ if($type=="add"){
|
|
|
+ $json_data=[
|
|
|
+ "PLATE_NO"=>"测试AAAAAA",
|
|
|
+ "RFID_SN"=>"12345678",
|
|
|
+ "CAR_TYPE"=>"1",
|
|
|
+ "CAR_BRAND"=>"1",
|
|
|
+ "NAME"=>"测试A",
|
|
|
+ "ID_CARD_NUMBER"=>"330127199301171835",
|
|
|
+ "MOBILE_NUMBER"=>"15706857065",
|
|
|
+ "INSTA_DATE"=>"2023-05-24 09:13:16",
|
|
|
+ "INSTALLER"=>"超级管理员",
|
|
|
+ "DATA_TYPE"=>"vehicle_save"
|
|
|
+ ];
|
|
|
+
|
|
|
|
|
|
+ }
|
|
|
+ if($type=="update"){
|
|
|
+ $json_data=[
|
|
|
+ "PLATE_NO"=>"测试AAAAAA",
|
|
|
+ "OLD_NO"=>"测试AAAAAA",
|
|
|
+ "RFID_SN"=>"77778888",
|
|
|
+ "CAR_TYPE"=>"1",
|
|
|
+ "CAR_BRAND"=>"1",
|
|
|
+ "NAME"=>"测试A",
|
|
|
+ "ID_CARD_NUMBER"=>"330127199301171835",
|
|
|
+ "MOBILE_NUMBER"=>"15706857065",
|
|
|
+ "DATA_TYPE"=>"vehicle_update"
|
|
|
+ ];
|
|
|
+
|
|
|
|
|
|
- }
|
|
|
+ }
|
|
|
+ if($type=="delete"){
|
|
|
|
|
|
+ $json_data=[
|
|
|
+ "PLATE_NO"=>"测试AAAAAA",
|
|
|
+ "DATA_TYPE"=>"vehicle_delete"
|
|
|
+ ];
|
|
|
+
|
|
|
+ }
|
|
|
+ return $json_data;
|
|
|
+
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * 存入kafuka
|
|
|
+ *
|
|
|
+ * @param [type] $data
|
|
|
+ * @return void
|
|
|
+ */
|
|
|
public function toKafuka($data){
|
|
|
+
|
|
|
+ // $conf = new RdKafka\Conf();
|
|
|
+ // $conf->setDrMsgCb(function ($kafka, $message) {
|
|
|
+ // file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
|
|
|
+ // });
|
|
|
+ // $conf->setErrorCb(function ($kafka, $err, $reason) {
|
|
|
+ // file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
|
|
|
+ // });
|
|
|
+
|
|
|
+ // $rk = new RdKafka\Producer($conf);
|
|
|
+ // $rk->setLogLevel(LOG_DEBUG);
|
|
|
+ // $rk->addBrokers("127.0.0.1");
|
|
|
+ // $cf = new RdKafka\TopicConf();
|
|
|
+ // $cf->set('request.required.acks', 0);
|
|
|
+ // $topic = $rk->newTopic("test", $cf);
|
|
|
+ // $option = 'qkl';
|
|
|
+ // for ($i = 0; $i < 20; $i++) {
|
|
|
+ // $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);
|
|
|
+ // }
|
|
|
+ // $len = $rk->getOutQLen();
|
|
|
+ // while ($len > 0) {
|
|
|
+ // $len = $rk->getOutQLen();
|
|
|
+
|
|
|
+ // $rk->poll(50);
|
|
|
+ // }
|
|
|
+
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
+ /**
|
|
|
+ * 存入redis
|
|
|
+ *
|
|
|
+ * @param [type] $data
|
|
|
+ * @return void
|
|
|
+ */
|
|
|
public function toRedis($data){
|
|
|
|
|
|
}
|