mysqlToKafuka.php 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. <?php
  2. namespace catchAdmin\api\service;
  3. use think\facade\Db;
  4. //脚本衢州 从mysql中存到kafka/redis中
  5. //mysql 监听器
  6. class mysqlToKafuka
  7. {
  8. //1 kafka,2 redis
  9. private $type;
  10. public function __construct($type)
  11. {
  12. $this->type=$type;
  13. }
  14. public function sql_monitor(){
  15. $table="test";
  16. $list=Db::name($table)->where("status",1)->select();
  17. foreach($list as $key=>$item){
  18. $table_name=$item["table_name"];
  19. $type=$item["type"];
  20. $table_id=$item["table_id"];
  21. $this->Operation($table_name,$table_id,$type);
  22. }
  23. }
  24. /**
  25. * 业务处理
  26. *
  27. * @param [type] $table_name
  28. * @param [type] $table_id
  29. * @param [type] $type
  30. * @return void
  31. */
  32. public function Operation($table_name,$table_id,$type){
  33. switch($table_name)
  34. {
  35. case "fjd_zcdj":
  36. return $this->fjd_zcdj($table_name,$table_id,$type);
  37. }
  38. }
  39. public function fjd_zcdj($table_name,$table_id,$type){
  40. $data=Db::name($table_name)->where("id",$table_id)->find();
  41. $json_data=false;
  42. if(empty($data)){
  43. return false;
  44. }
  45. if($type=="add"){
  46. $json_data=[
  47. "PLATE_NO"=>"测试AAAAAA",
  48. "RFID_SN"=>"12345678",
  49. "CAR_TYPE"=>"1",
  50. "CAR_BRAND"=>"1",
  51. "NAME"=>"测试A",
  52. "ID_CARD_NUMBER"=>"330127199301171835",
  53. "MOBILE_NUMBER"=>"15706857065",
  54. "INSTA_DATE"=>"2023-05-24 09:13:16",
  55. "INSTALLER"=>"超级管理员",
  56. "DATA_TYPE"=>"vehicle_save"
  57. ];
  58. }
  59. if($type=="update"){
  60. $json_data=[
  61. "PLATE_NO"=>"测试AAAAAA",
  62. "OLD_NO"=>"测试AAAAAA",
  63. "RFID_SN"=>"77778888",
  64. "CAR_TYPE"=>"1",
  65. "CAR_BRAND"=>"1",
  66. "NAME"=>"测试A",
  67. "ID_CARD_NUMBER"=>"330127199301171835",
  68. "MOBILE_NUMBER"=>"15706857065",
  69. "DATA_TYPE"=>"vehicle_update"
  70. ];
  71. }
  72. if($type=="delete"){
  73. $json_data=[
  74. "PLATE_NO"=>"测试AAAAAA",
  75. "DATA_TYPE"=>"vehicle_delete"
  76. ];
  77. }
  78. return $json_data;
  79. }
  80. /**
  81. * 存入kafuka
  82. *
  83. * @param [type] $data
  84. * @return void
  85. */
  86. public function toKafuka($data){
  87. <<<<<<< HEAD
  88. // $conf = new RdKafka\Conf();
  89. // $conf->setDrMsgCb(function ($kafka, $message) {
  90. // file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
  91. // });
  92. // $conf->setErrorCb(function ($kafka, $err, $reason) {
  93. // file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
  94. // });
  95. // $rk = new RdKafka\Producer($conf);
  96. // $rk->setLogLevel(LOG_DEBUG);
  97. // $rk->addBrokers("127.0.0.1");
  98. // $cf = new RdKafka\TopicConf();
  99. // $cf->set('request.required.acks', 0);
  100. // $topic = $rk->newTopic("test", $cf);
  101. // $option = 'qkl';
  102. // for ($i = 0; $i < 20; $i++) {
  103. // $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);
  104. // }
  105. // $len = $rk->getOutQLen();
  106. // while ($len > 0) {
  107. // $len = $rk->getOutQLen();
  108. // $rk->poll(50);
  109. // }
  110. =======
  111. $conf = new RdKafka\Conf();
  112. $conf->setDrMsgCb(function ($kafka, $message) {
  113. file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
  114. });
  115. $conf->setErrorCb(function ($kafka, $err, $reason) {
  116. file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
  117. });
  118. $rk = new RdKafka\Producer($conf);
  119. $rk->setLogLevel(LOG_DEBUG);
  120. $rk->addBrokers("127.0.0.1");
  121. $cf = new RdKafka\TopicConf();
  122. $cf->set('request.required.acks', 0);
  123. $topic = $rk->newTopic("test", $cf);
  124. $option = 'qkl';
  125. for ($i = 0; $i < 20; $i++) {
  126. $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);
  127. }
  128. $len = $rk->getOutQLen();
  129. while ($len > 0) {
  130. $len = $rk->getOutQLen();
  131. $rk->poll(50);
  132. }
  133. >>>>>>> d836e7393f81fe9e3002d4b1e17a4169fd485999
  134. }
  135. /**
  136. * 存入redis
  137. *
  138. * @param [type] $data
  139. * @return void
  140. */
  141. public function toRedis($data){
  142. }
  143. }