mysqlToKafuka01.php 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. <?php
  2. namespace catchAdmin\api\service;
  3. include "./DahuaUtil.php";
  4. ini_set('memory_limit', '1024M');
  5. use Exception;
  6. use thans\jwt\claim\Expiration;
  7. use think\facade\Cache;
  8. use think\facade\Config;
  9. use think\facade\Db;
  10. use think\facade\Env;
  11. //脚本衢州 目前不使用
  12. //mysql 监听器
  13. class mysqlToKafuka01
  14. {
  15. //1 kafka,2 redis
  16. private $type;
  17. private $redis;
  18. private $rk;
  19. private $topic;
  20. public function __construct($type,$redis)
  21. {
  22. $this->type=$type;
  23. $this->redis=$redis;
  24. }
  25. public function sql_monitor(){
  26. if (!extension_loaded('rdkafka')){
  27. $this->write_log('pushToKafka fail,extension of rdkafka has not installed!!'.PHP_EOL);
  28. $this->write_log('请重新安装kafka扩展');
  29. return false;
  30. }
  31. if (!extension_loaded('redis')){
  32. $this->write_log('redis fail,extension of rdkafka has not installed!!'.PHP_EOL);
  33. $this->write_log('请重新安装redis扩展');
  34. return false;
  35. }
  36. $tables=["fjd_zcdj"];
  37. $this->write_log("开始同步数据库:fjd_zcdj");
  38. while(true){
  39. try{
  40. foreach($tables as $item){
  41. $this->initData($item);
  42. }
  43. }catch(Exception $e){
  44. $this->write_log("同步数据库抛出异常:".$e->getMessage());
  45. $this->write_log("程序强行暂停");
  46. exit(0);
  47. }
  48. }
  49. }
  50. /**
  51. * 初始化数据
  52. *
  53. * @return void
  54. */
  55. public function initData($table){
  56. if($this->type==1){
  57. $this->init_kafka();
  58. }
  59. $time=$this->getUpdateTime($table,1);
  60. $this->write_log("获取数据库同步最新时间".$time);
  61. if(empty($time)){
  62. $this->write_log("获取失败,无最新时间,开始初始化将当前所有数据进行同步");
  63. $limit=100;
  64. $count= Db::name($table)->count();
  65. if($count==0){
  66. $this->write_log("当前数据库中无数据,开始监听");
  67. return;
  68. }
  69. $pageTotal= ceil($count/$limit);
  70. $this->write_log("当前共有数据.$count.条,按照.$limit.条数据为一页,共计.$pageTotal.页");
  71. $this->write_log("同步开始");
  72. for($i=1;$i<=$pageTotal;$i++){
  73. $this->write_log("第.$i.页进行同步.....");
  74. $list= Db::name($table)->order("ZHGXSJ","asc")->page($i)->limit($limit)->select()->toArray();
  75. $this->setData($table,$list);
  76. if($i==$pageTotal){
  77. $time= strtotime($list[count($list)-1]["ZHGXSJ"]);
  78. $this->getUpdateTime($table,2,$time);
  79. }
  80. $this->write_log("第.$i.页进行同步完成");
  81. }
  82. $this->write_log("同步全部完成.当前时间:".date("Y-m-d H:i:s",$time));
  83. $this->write_log("开始监听最新数据");
  84. }else{
  85. $this->write_log("开始监听当前时间之后的数据".date("Y-m-d H:i:s",$time));
  86. $list= Db::name($table)->where("ZHGXSJ",">",date("Y-m-d H:i:s",$time))->order("ZHGXSJ","asc")->select()->toArray();
  87. if(empty($list)){
  88. $this->write_log("数据库:.$table"."数据无更新的数据休眠1s");
  89. sleep(1);
  90. return;
  91. }
  92. $this->write_log("同步开始");
  93. $this->setData($table,$list);
  94. $this->write_log("同步结束");
  95. $time= strtotime($list[count($list)-1]["ZHGXSJ"]);
  96. $this->write_log("存入最新时间:".date("Y-m-d H:i:s",$time));
  97. $this->getUpdateTime($table,2,$time);
  98. }
  99. }
  100. /**
  101. * 存入数据 function
  102. *
  103. * @param [type] $table
  104. * @param [type] $list
  105. * @return void
  106. */
  107. private function setData($table,$list){
  108. foreach($list as $item){
  109. $data=$this->Operation($table, $item);
  110. if(!empty($data)){
  111. if($this->type==1){
  112. //存入kafuka
  113. $this->toKafka($data);
  114. }else{
  115. //存入redis
  116. $this->toRedis($data);
  117. }
  118. }
  119. }
  120. }
  121. /**
  122. * 业务处理
  123. *
  124. * @param [type] $table_name
  125. * @param [type] $table_id
  126. * @param [type] $type
  127. * @return void
  128. */
  129. public function Operation($table_name,$data){
  130. switch($table_name)
  131. {
  132. case "fjd_zcdj":
  133. return $this->fjd_zcdj($table_name,$data);
  134. case "fjdc_cbc_zcdj":
  135. return $this->fjdc_cbc_zcdj($table_name,$data);
  136. default:
  137. return false;
  138. }
  139. }
  140. public function fjdc_cbc_zcdj($table_name,$data){
  141. $json_data=[];
  142. $time=$data["ZHGXSJ"];
  143. $type= $this->getTableIdType($table_name,$data["FJDCXH"],strtotime($time));
  144. if($type=="add"){
  145. $json_data=[
  146. "PLATE_NO"=>$data["HPHM"],
  147. "RFID_SN"=>"12345678",
  148. "CAR_BRAND"=>$data['ZWPP'],
  149. "NAME"=>$data["CZXM"],
  150. "ID_CARD_NUMBER"=>$data['CZSFZMHM'],
  151. "MOBILE_NUMBER"=>$data['LXDH'],
  152. "INSTA_DATE"=>$time,
  153. "INSTALLER"=>$data["CZY"],
  154. "DATA_TYPE"=>"vehicle_save"
  155. ];
  156. }
  157. if($type=="update"){
  158. $json_data=[
  159. "PLATE_NO"=>$data["HPHM"],
  160. "OLD_NO"=>$data["HPHM"],
  161. "RFID_SN"=>"77778888",
  162. // "CAR_TYPE"=>$data["CLZL"],
  163. "CAR_BRAND"=>$data['ZWPP'],
  164. "NAME"=>$data["CZXM"],
  165. "ID_CARD_NUMBER"=>$data['CZSFZMHM'],
  166. "MOBILE_NUMBER"=>$data['LXDH'],
  167. "DATA_TYPE"=>"vehicle_update"
  168. ];
  169. }
  170. return $json_data;
  171. }
  172. public function fjd_zcdj($table_name,$data){
  173. $json_data=[];
  174. $time=$data["ZHGXSJ"];
  175. $type= $this->getTableIdType($table_name,$data["FJDCXH"],strtotime($time));
  176. $brand_id=$this->match_brand($data['ZWPP']);
  177. $car_type_id=$this->match_car_type($data['CLZL']);
  178. if($type=="add"){
  179. $json_data=[
  180. "PLATE_NO"=>$data["HPHM"],
  181. "RFID_SN"=>$data["GYHM"],
  182. "CAR TYPE"=>$car_type_id,
  183. "CAR_BRAND"=>$brand_id,
  184. "NAME"=>$data["CZXM"],
  185. "ID_CARD_NUMBER"=>$data['CZSFZMHM'],
  186. "MOBILE_NUMBER"=>$data['LXDH'],
  187. "INSTA_DATE"=>$time,
  188. "INSTALLER"=>1,
  189. "DATA_TYPE"=>"vehicle_save"
  190. ];
  191. }
  192. if($type=="update"){
  193. $json_data=[
  194. "PLATE_NO"=>$data["HPHM"],
  195. "RFID_SN"=>$data["GYHM"],
  196. "CAR TYPE"=>$car_type_id,
  197. "CAR_BRAND"=>$brand_id,
  198. "NAME"=>$data["CZXM"],
  199. "ID_CARD_NUMBER"=>$data['CZSFZMHM'],
  200. "MOBILE_NUMBER"=>$data['LXDH'],
  201. "DATA_TYPE"=>"vehicle_update"
  202. ];
  203. }
  204. return $json_data;
  205. }
  206. /**
  207. * 获取redis缓存的库 id,updateTime时间戳
  208. *
  209. * @return void
  210. */
  211. private function getTableIdType($table,$id,$time){
  212. $HashKey=$table."_redis";
  213. $key=$id;
  214. $data=["time"=>$time];
  215. $old_data= $this->getHash($HashKey,$key);
  216. $this->setHash($HashKey,$key,$data);
  217. if(empty($old_data)){
  218. return "add";
  219. }else{
  220. return "update";
  221. }
  222. }
  223. /**
  224. * function 获取 更新时间
  225. *
  226. * @param [type] $table 数据库表
  227. * @param [type] $type 类型 1 查询 2 存储
  228. * @param [type] $time 时间
  229. * @return void
  230. */
  231. private function getUpdateTime($table,$type,$time=null){
  232. $hashKey="car_update_time";
  233. if($type==2){
  234. $this->setHash($hashKey,$table,["time"=>$time]);
  235. return null;
  236. }else{
  237. $data= $this->getHash($hashKey,$table);
  238. if(empty($data)){
  239. return false;
  240. }else{
  241. return $data["time"];
  242. }
  243. }
  244. }
  245. public function getHash($hashKey,$key,$time=null){
  246. $res= $this->redis->hGet($hashKey,$key);
  247. if(empty($res)){
  248. return false;
  249. }else{
  250. return json_decode($res,true);
  251. }
  252. }
  253. public function setHash($hashKey,$key,$data){
  254. $this->redis->hSet($hashKey,$key,json_encode($data));
  255. }
  256. public function delHash($hashKey,$key){
  257. $this->redis->hDel($hashKey,$key);
  258. }
  259. public function init_kafka(){
  260. $this->write_log("初始化kafka");
  261. $conf = new Rdkafka\Conf();
  262. $kafka_ip= Env::get('kafka.ip', '');
  263. $topic=Env::get('kafka.topic', '');
  264. if(empty($kafka_ip)){
  265. throw new Exception("kafka的ip不存在");
  266. }
  267. if(empty($topic)){
  268. throw new Exception("kafka的主题不存在");
  269. }
  270. $conf->set('metadata.broker.list',$kafka_ip);
  271. $conf->setErrorCb(function ($kafka, $err, $reason) {
  272. throw new Exception("kafka错误: err:.$err.reason:.$reason");
  273. });
  274. $this->rk= new RdKafka\Producer($conf);
  275. $this->topic = $this->rk->newTopic($topic);
  276. }
  277. /**
  278. * 存入kafuka
  279. *
  280. * @param [type] $data
  281. * @return void
  282. */
  283. public function toKafka($data){
  284. $this->topic->produce(RD_KAFKA_PARTITION_UA, 0,json_encode($data));
  285. $this->rk->poll(0);
  286. while ($this->rk->getOutQLen() >300) {
  287. $this->rk->poll(10);
  288. }
  289. }
  290. /**
  291. * 存入redis
  292. *
  293. * @param [type] $data
  294. * @return void
  295. */
  296. public function toRedis($data){
  297. $key="tutorial-list";
  298. $redis=Cache::store('redis')->handler();
  299. $redis->Rpush($key,json_encode($data));
  300. }
  301. public function write_log($text){
  302. DahuaUtil::rlog($text);
  303. }
  304. /**
  305. * function 车牌匹配
  306. * @param [type] brand 车牌名称
  307. * @return void
  308. */
  309. private function match_brand($brand){
  310. $brand_list= Config::get("app.CAR_BRAND");
  311. $brand_key=99;
  312. if(empty($brand_list)){
  313. throw new Exception("车辆品牌不存在");
  314. }
  315. if(empty($brand)){
  316. return $brand_key;
  317. }
  318. $bool=false;
  319. foreach($brand_list as $key=>$val){
  320. if(strlen($val)<=strlen($brand)){
  321. $bool= stristr($brand,$val);
  322. }else{
  323. $bool= stristr($val,$brand);
  324. }
  325. if($bool){
  326. $brand_key=$key;
  327. break;
  328. }
  329. }
  330. return $brand_key;
  331. }
  332. private function match_car_type($car_type){
  333. $type_list= Config::get("app.CAR_TYPE");
  334. $type_key=4;
  335. if(empty($type_list)){
  336. throw new Exception("车辆类型列表不存在");
  337. }
  338. if(empty($car_type)){
  339. return $type_key;
  340. }
  341. $bool=false;
  342. foreach($type_list as $key=>$val){
  343. if(strlen($val)<=strlen($car_type)){
  344. $bool= stristr($car_type,$val);
  345. }else{
  346. $bool= stristr($val,$car_type);
  347. }
  348. if($bool){
  349. $type_key=$key;
  350. break;
  351. }
  352. }
  353. return $type_key;
  354. }
  355. }