mysqlToKafuka01.php 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. <?php
  2. namespace catchAdmin\api\service;
  3. use think\facade\Cache;
  4. use think\facade\Db;
  5. //脚本衢州 目前不使用
  6. //mysql 监听器
  7. class mysqlToKafuka01
  8. {
  9. //1 kafka,2 redis
  10. private $type;
  11. private $redis;
  12. public function __construct($type,$redis)
  13. {
  14. $this->type=$type;
  15. $this->redis=$redis;
  16. }
  17. public function sql_monitor(){
  18. $tables=["fjd_zcdj","fjdc_cbc_zcdj"];
  19. while(true){
  20. foreach($tables as $item){
  21. $this->initData($item);
  22. }
  23. }
  24. }
  25. /**
  26. * 初始化数据
  27. *
  28. * @return void
  29. */
  30. public function initData($table){
  31. $time=$this->getUpdateTime($table,1);
  32. $list=[];
  33. if(empty($time)){
  34. $time=time();
  35. $list= Db::name($table)->where("ZHGXSJ","<=",date("Y-m-d H:i:s", $time))->order("ZHGXSJ","desc")->select();
  36. }else{
  37. $list= Db::name($table)->where("ZHGXSJ","<=",date("Y-m-d H:i:s",$time))->order("ZHGXSJ","desc")->select();
  38. }
  39. if(empty($list)){
  40. debug_log("数据库解析","数据库:.$table"."数据无更新的数据休眠1s");
  41. sleep(1);
  42. return;
  43. }
  44. $this->setData($list,$table);
  45. $time=strtotime("Y-m-d H:i:s",$list[count($list)-1]["ZHGXSJ"]);
  46. $this->getUpdateTime($table,2,$time);
  47. }
  48. private function setData($list,$table){
  49. foreach($list as $item){
  50. $data=$this->Operation($table, $item);
  51. if(!empty($data)){
  52. if($this->type==1){
  53. //存入kafuka
  54. $this->toKafuka($data);
  55. }else{
  56. //存入redis
  57. $this->toRedis($data);
  58. }
  59. }
  60. }
  61. }
  62. /**
  63. * 业务处理
  64. *
  65. * @param [type] $table_name
  66. * @param [type] $table_id
  67. * @param [type] $type
  68. * @return void
  69. */
  70. public function Operation($table_name,$data){
  71. switch($table_name)
  72. {
  73. case "fjd_zcdj":
  74. return $this->fjd_zcdj($table_name,$data);
  75. case "fjdc_cbc_zcdj":
  76. return $this->fjdc_cbc_zcdj($table_name,$data);
  77. }
  78. }
  79. public function fjdc_cbc_zcdj($table_name,$data){
  80. $json_data=[];
  81. $time=strtotime("Y-m-d H:i:s",$data["ZHGXSJ"]);
  82. $type= $this->getTableIdType($table_name,$data["FJDCXH"],$time);
  83. if($type=="add"){
  84. $json_data=[
  85. "PLATE_NO"=>$data["HPHM"],
  86. "RFID_SN"=>"12345678",
  87. "CAR_TYPE"=>$data["CLZL"],
  88. "CAR_BRAND"=>$data['ZWPP'],
  89. "NAME"=>$data["CZXM"],
  90. "ID_CARD_NUMBER"=>$data['CZSFZMHM'],
  91. "MOBILE_NUMBER"=>$data['LXDH'],
  92. "INSTA_DATE"=>$time,
  93. "INSTALLER"=>$data["CZY"],
  94. "DATA_TYPE"=>"vehicle_save"
  95. ];
  96. }
  97. if($type=="update"){
  98. $json_data=[
  99. "PLATE_NO"=>$data["HPHM"],
  100. "OLD_NO"=>$data["HPHM"],
  101. "RFID_SN"=>"77778888",
  102. "CAR_TYPE"=>$data["CLZL"],
  103. "CAR_BRAND"=>$data['ZWPP'],
  104. "NAME"=>$data["CZXM"],
  105. "ID_CARD_NUMBER"=>$data['CZSFZMHM'],
  106. "MOBILE_NUMBER"=>$data['LXDH'],
  107. "DATA_TYPE"=>"vehicle_update"
  108. ];
  109. }
  110. return $json_data;
  111. }
  112. public function fjd_zcdj($table_name,$data){
  113. $json_data=[];
  114. $time=$data["ZHGXSJ"];
  115. $type= $this->getTableIdType($table_name,$data["FJDCXH"],strtotime($time));
  116. var_dump($data);
  117. if($type=="add"){
  118. $json_data=[
  119. "PLATE_NO"=>$data["HPHM"],
  120. "RFID_SN"=>"12345678",
  121. "CAR_TYPE"=>$data["CLZL"],
  122. "CAR_BRAND"=>$data['ZWPP'],
  123. "NAME"=>$data["CZXM"],
  124. "ID_CARD_NUMBER"=>$data['CZSFZMHM'],
  125. "MOBILE_NUMBER"=>$data['LXDH'],
  126. "INSTA_DATE"=>$time,
  127. "INSTALLER"=>$data["CLQRR"],
  128. "DATA_TYPE"=>"vehicle_save"
  129. ];
  130. }
  131. if($type=="update"){
  132. $json_data=[
  133. "PLATE_NO"=>$data["HPHM"],
  134. "OLD_NO"=>$data["HPHM"],
  135. "RFID_SN"=>"77778888",
  136. "CAR_TYPE"=>$data["CLZL"],
  137. "CAR_BRAND"=>$data['ZWPP'],
  138. "NAME"=>$data["CZXM"],
  139. "ID_CARD_NUMBER"=>$data['CZSFZMHM'],
  140. "MOBILE_NUMBER"=>$data['LXDH'],
  141. "DATA_TYPE"=>"vehicle_update"
  142. ];
  143. }
  144. return $json_data;
  145. }
  146. /**
  147. * 获取redis缓存的库 id,updateTime时间戳
  148. *
  149. * @return void
  150. */
  151. private function getTableIdType($table,$id,$time){
  152. $HashKey=$table."_redis";
  153. $key=$id;
  154. $data=["time"=>$time];
  155. $old_data= $this->getHash($HashKey,$key);
  156. $this->setHash($HashKey,$key,$data);
  157. if(empty($old_data)){
  158. return "add";
  159. }else{
  160. return "update";
  161. }
  162. }
  163. /**
  164. * function 获取 更新时间
  165. *
  166. * @param [type] $table 数据库表
  167. * @param [type] $type 类型 1 查询 2 存储
  168. * @param [type] $time 时间
  169. * @return void
  170. */
  171. private function getUpdateTime($table,$type,$time=null){
  172. $hashKey="car_update_time";
  173. if($type==2){
  174. $this->setHash($hashKey,$table,["time"=>$time]);
  175. return null;
  176. }else{
  177. $data= $this->getHash($hashKey,$table,$time);
  178. if(empty($data)){
  179. return false;
  180. }else{
  181. return $data["time"];
  182. }
  183. }
  184. }
  185. public function getHash($hashKey,$key,$time=null){
  186. $res= $this->redis->hGet($hashKey,$key);
  187. if(empty($res)){
  188. return false;
  189. }else{
  190. return json_decode($res,true);
  191. }
  192. }
  193. public function setHash($hashKey,$key,$data){
  194. $this->redis->hSet($hashKey,$key,json_encode($data));
  195. }
  196. public function delHash($hashKey,$key){
  197. $this->redis->hDel($hashKey,$key);
  198. }
  199. /**
  200. * 存入kafuka
  201. *
  202. * @param [type] $data
  203. * @return void
  204. */
  205. public function toKafuka($data){
  206. // $conf = new RdKafka\Conf();
  207. // $conf->setDrMsgCb(function ($kafka, $message) {
  208. // file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
  209. // });
  210. // $conf->setErrorCb(function ($kafka, $err, $reason) {
  211. // file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
  212. // });
  213. // $rk = new RdKafka\Producer($conf);
  214. // $rk->setLogLevel(LOG_DEBUG);
  215. // $rk->addBrokers("127.0.0.1");
  216. // $cf = new RdKafka\TopicConf();
  217. // $cf->set('request.required.acks', 0);
  218. // $topic = $rk->newTopic("test", $cf);
  219. // $option = 'qkl';
  220. // for ($i = 0; $i < 20; $i++) {
  221. // $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);
  222. // }
  223. // $len = $rk->getOutQLen();
  224. // while ($len > 0) {
  225. // $len = $rk->getOutQLen();
  226. // $rk->poll(50);
  227. // }
  228. }
  229. /**
  230. * 存入redis
  231. *
  232. * @param [type] $data
  233. * @return void
  234. */
  235. public function toRedis($data){
  236. $key="tutorial-list";
  237. $redis=Cache::store('redis')->handler();
  238. $redis->Rpush($key,json_encode($data));
  239. }
  240. }