mysqlToKafuka.php 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. <?php
  2. namespace catchAdmin\api\service;
  3. use think\facade\Cache;
  4. use think\facade\Db;
  5. //脚本衢州 目前不使用
  6. //mysql 监听器
  7. class mysqlToKafuka
  8. {
  9. //1 kafka,2 redis
  10. private $type;
  11. public function __construct($type)
  12. {
  13. $this->type=$type;
  14. }
  15. public function sql_monitor(){
  16. $table="test";
  17. $list=Db::name($table)->where("status",1)->order("id","asc")->select();
  18. foreach($list as $key=>$item){
  19. $table_name=$item["table_name"];
  20. $type=$item["type"];
  21. $table_id=$item["table_id"];
  22. $data=$this->Operation($table_name,$table_id,$type);
  23. if(!empty($data)){
  24. if($this->type==1){
  25. //存入kafuka
  26. $this->toKafuka($data);
  27. }else{
  28. //存入redis
  29. $this->toRedis($data);
  30. }
  31. }
  32. Db::name($table)->where("id",$item["id"])->delete();
  33. }
  34. }
  35. /**
  36. * 业务处理
  37. *
  38. * @param [type] $table_name
  39. * @param [type] $table_id
  40. * @param [type] $type
  41. * @return void
  42. */
  43. public function Operation($table_name,$table_id,$type){
  44. switch($table_name)
  45. {
  46. case "fjd_zcdj":
  47. return $this->fjd_zcdj($table_name,$table_id,$type);
  48. }
  49. }
  50. public function fjd_zcdj($table_name,$table_id,$type){
  51. $data=Db::name($table_name)->where("id",$table_id)->find();
  52. $json_data=false;
  53. if(empty($data)){
  54. return false;
  55. }
  56. if($type=="add"){
  57. $json_data=[
  58. "PLATE_NO"=>$data["HPHM"],
  59. "RFID_SN"=>"12345678",
  60. "CAR_TYPE"=>"1",
  61. "CAR_BRAND"=>"1",
  62. "NAME"=>"测试A",
  63. "ID_CARD_NUMBER"=>"330127199301171835",
  64. "MOBILE_NUMBER"=>"15706857065",
  65. "INSTA_DATE"=>"2023-05-24 09:13:16",
  66. "INSTALLER"=>"超级管理员",
  67. "DATA_TYPE"=>"vehicle_save"
  68. ];
  69. }
  70. if($type=="update"){
  71. $json_data=[
  72. "PLATE_NO"=>"测试AAAAAA",
  73. "OLD_NO"=>"测试AAAAAA",
  74. "RFID_SN"=>"77778888",
  75. "CAR_TYPE"=>"1",
  76. "CAR_BRAND"=>"1",
  77. "NAME"=>"测试A",
  78. "ID_CARD_NUMBER"=>"330127199301171835",
  79. "MOBILE_NUMBER"=>"15706857065",
  80. "DATA_TYPE"=>"vehicle_update"
  81. ];
  82. }
  83. if($type=="delete"){
  84. $json_data=[
  85. "PLATE_NO"=>"测试AAAAAA",
  86. "DATA_TYPE"=>"vehicle_delete"
  87. ];
  88. }
  89. return $json_data;
  90. }
  91. /**
  92. * 存入kafuka
  93. *
  94. * @param [type] $data
  95. * @return void
  96. */
  97. public function toKafuka($data){
  98. $conf = new RdKafka\Conf();
  99. $conf->setDrMsgCb(function ($kafka, $message) {
  100. file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
  101. });
  102. $conf->setErrorCb(function ($kafka, $err, $reason) {
  103. file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
  104. });
  105. $rk = new RdKafka\Producer($conf);
  106. $rk->setLogLevel(LOG_DEBUG);
  107. $rk->addBrokers("127.0.0.1");
  108. $cf = new RdKafka\TopicConf();
  109. $cf->set('request.required.acks', 0);
  110. $topic = $rk->newTopic("test", $cf);
  111. $option = 'qkl';
  112. for ($i = 0; $i < 20; $i++) {
  113. $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);
  114. }
  115. $len = $rk->getOutQLen();
  116. while ($len > 0) {
  117. $len = $rk->getOutQLen();
  118. $rk->poll(50);
  119. }
  120. }
  121. /**
  122. * 存入redis
  123. *
  124. * @param [type] $data
  125. * @return void
  126. */
  127. public function toRedis($data){
  128. $key="tutorial-list";
  129. $redis=Cache::store('redis')->handler();
  130. $redis->Rpush($key,json_encode($data));
  131. }
  132. }