RL4RSSI_MQTT_CLIENT.php 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. <?php
  2. require('vendor/autoload.php');
  3. use \PhpMqtt\Client\MqttClient;
  4. use \PhpMqtt\Client\ConnectionSettings;
  5. date_default_timezone_set("PRC");
  6. function rlog(...$args)
  7. {
  8. if (empty($args[0])) {
  9. return;
  10. }
  11. static $LOG_CONSOLE = false; //是否输出到控制台
  12. static $LOG_NAME = "school_mqtt.log"; //值为空时 不写入文件
  13. static $LOG_SIZE = 64 * 1024 * 1024; //文件最大尺寸
  14. static $LOG_CACHE = false; //是否缓存日志内容 用于批量写入文件
  15. static $CACHE_DURATION = 10; //缓存最大时间 秒
  16. static $CACHE_SIZE = 1024; //缓存大小
  17. static $cacheStartTime = 0;
  18. static $cacheBuf = '';
  19. static $LOG_TIMES = 10; //调用这个函数最大次数 超过次数后判断下文件大小
  20. static $logCount = 0;
  21. $buf = '';
  22. if (count($args) == 1 && $args[0] == "\n") { //只有换行时 不写入时间戳了
  23. $buf = "\n";
  24. } else {
  25. $pid = ''; //进程id
  26. if (function_exists('posix_getpid')) {
  27. $pid = ' ' . posix_getpid() . ' ';
  28. }
  29. $fileLine = ''; //文件名:行号
  30. {
  31. $debug = debug_backtrace();
  32. $fileLine = ($pid == '' ? ' ' : '') . basename($debug[0]['file']) . ':' . $debug[0]['line'] . ' ';
  33. }
  34. $buf = date("y-m-d H:i:s") . "{$pid}{$fileLine}" . implode(' ', $args) . "\n";
  35. }
  36. $logCount++;
  37. if (!empty($LOG_NAME)) {
  38. if ($LOG_CACHE) {
  39. $cacheBuf .= $buf;
  40. //超过缓存尺寸 或者 超过缓存时长 写缓存到文件
  41. if (strlen($cacheBuf) > $CACHE_SIZE || time() - $cacheStartTime > $CACHE_DURATION) {
  42. $cacheStartTime = time();
  43. goto write;
  44. } else {
  45. goto skipWrite;
  46. }
  47. } else {
  48. $cacheBuf = $buf;
  49. }
  50. write: {
  51. //超过尺寸后 删除旧文件 把新文件重命名为旧文件 多进程同时操作 不加锁问题不大
  52. if ($logCount > $LOG_TIMES && filesize($LOG_NAME) > $LOG_SIZE) {
  53. $oldLogName = $LOG_NAME . '.old';
  54. if (file_exists($oldLogName)) {
  55. if (!unlink($oldLogName)) {
  56. echo "unlink err\n";
  57. }
  58. }
  59. if (!rename($LOG_NAME, $oldLogName)) {
  60. echo "rename err\n";
  61. }
  62. $logCount = 0;
  63. }
  64. if (!file_put_contents($LOG_NAME, $cacheBuf, FILE_APPEND)) {
  65. echo "file_put_contents err\n";
  66. }
  67. $cacheBuf = '';
  68. }
  69. skipWrite: {
  70. }
  71. }
  72. if ($LOG_CONSOLE) {
  73. echo $buf;
  74. }
  75. }
  76. function http($url, $params, $header = [], $method = 'GET', $timeout = 10)
  77. {
  78. // POST $params 字符串形式query=abcd&abc=12345
  79. //GET $params 数组['query' => 'abcd', 'abc' => 12345]
  80. // $header[] = "Content-Type: application/x-www-form-urlencoded";
  81. // $header[] = "Content-Type: application/soap+xml; charset=utf-8";
  82. // $header[] = "Content-Type: application/json; charset=utf-8";
  83. // $header[] = "Expect: ";
  84. rlog("[HTTP] url:$url,method:$method" . ",header:" . json_encode($header));
  85. if (strtoupper($method) == 'POST') {
  86. rlog("[POST] send params " . (!is_array($params) ? $params : json_encode($params, JSON_UNESCAPED_UNICODE)));
  87. } else {
  88. rlog("[GET] send " . json_encode($params));
  89. }
  90. $header[] = "Expect: ";
  91. $opts = array(
  92. CURLOPT_TIMEOUT => $timeout,
  93. CURLOPT_RETURNTRANSFER => 1,
  94. CURLOPT_SSL_VERIFYPEER => false,
  95. CURLOPT_SSL_VERIFYHOST => false,
  96. CURLOPT_HTTPHEADER => $header
  97. );
  98. /* 根据请求类型设置特定参数 */
  99. switch (strtoupper($method)) {
  100. case 'GET':
  101. $opts[CURLOPT_URL] = $url . (empty($params) ? '' : ('?' . http_build_query($params)));
  102. break;
  103. case 'POST':
  104. //$params = http_build_query($params);
  105. $opts[CURLOPT_URL] = $url;
  106. $opts[CURLOPT_POST] = 1;
  107. $opts[CURLOPT_POSTFIELDS] = json_encode($params);
  108. break;
  109. default:
  110. rlog("[ERR] method " . $method);
  111. return false;
  112. }
  113. global $ch; //curl长连接
  114. if (empty($ch)) {
  115. $ch = curl_init();
  116. }
  117. if (empty($ch)) {
  118. rlog("[ERR] curl_init");
  119. return false;
  120. }
  121. $csa = curl_setopt_array($ch, $opts);
  122. if (empty($csa)) {
  123. rlog("[ERR] curl_setopt_array");
  124. return false;
  125. }
  126. $data = curl_exec($ch);
  127. if ($data === false) {
  128. rlog("[ERR] curl_exec errno:" . curl_errno($ch) . " " . curl_error($ch));
  129. return false;
  130. }
  131. //unicode转中文
  132. $data = decodeUnicode($data);
  133. rlog("[HTTP] recv " . $data);
  134. //curl_close($ch);
  135. return $data;
  136. }
  137. function decodeUnicode($str)
  138. {
  139. return preg_replace_callback('/\\\\u([0-9a-f]{4})/i', function ($matches) {
  140. return iconv("UCS-2BE", "UTF-8", pack("H*", $matches[1]));
  141. }, $str);
  142. }
  143. function devRegularInfo($topic, $msg)
  144. {
  145. // {"CSQ":26,"REP_INT":60,"GPS":"112.14060,32.06532","IMEI":"867160049332715","IMSI":"460041872816952","CCID":"898607B8101980060659","SYS_VER":"RLSC_V0.1","Time":1676257143,"Status":0,"StatusMsg":"设备正常","Initiative":1}
  146. $data = json_decode($msg, true);
  147. if (empty($data)) {
  148. rlog("ERR", "json_decode");
  149. return;
  150. }
  151. rlog("[I]", $msg);
  152. $url = 'http://47.114.185.186:8115/api/trackReport';
  153. // $url .= http_build_query($column);
  154. http($url, $data, [], 'POST');
  155. $loc = explode(',', $data['GPS']);
  156. $column = [
  157. 'csq' => $data['CSQ'] ?: '',
  158. 'rep_int' => $data['REP_INT'] ?: '',
  159. 'latitude' => isset($loc[0]) ? $loc[0] : '',
  160. 'longitude' => isset($loc[1]) ? $loc[1] : '',
  161. 'imei' => $data['IMEI'] ?: '',
  162. 'imsi' => $data['IMSI'] ?: '',
  163. 'iccid' => $data['ICCID'] ?: '',
  164. 'version' => $data['SYS_VER'] ?: '',
  165. 'time' => $data['Time'] ?: '',
  166. 'status' => $data['Status'] ?: '',
  167. 'status_msg' => $data['StatusMsg'] ?: '',
  168. 'initiative' => $data['Initiative'] ?: ''
  169. ];
  170. $url = 'http://47.114.185.186:8115/rlapi/busHeartbeatData';
  171. // $url .= http_build_query($column);
  172. http($url, $column, [], 'POST');
  173. }
  174. function getUpDevSysMsg($topic, $msg)
  175. {
  176. // {
  177. // "IMEI": "867160049332715",
  178. // "Mqtt_Host": "develop.rltest.cn",
  179. // "Mqtt_Port": 1883,
  180. // "Mqtt_User": " rl517",
  181. // "Mqtt_Password": "rlian2022",
  182. // "TTS_TEXT": "某某科技欢迎您",
  183. // "GPS_EN": "0",
  184. // "Time": 1676257143
  185. // }
  186. $arr = json_decode($msg, true);
  187. if (empty($arr)) {
  188. rlog("ERR", "json_decode");
  189. return;
  190. }
  191. $column = [
  192. 'imei' => $arr['IMEI'],
  193. 'mqtt_host' => $arr['Mqtt_Host'],
  194. 'mqtt_port' => $arr['Mqtt_Port'],
  195. 'mqtt_user' => $arr['Mqtt_User'],
  196. 'mqtt_password' => $arr['Mqtt_Password'],
  197. 'tts_text' => $arr['TTS_TEXT'],
  198. 'gps_en' => $arr['GPS_EN'],
  199. 'time' => $arr['Time']
  200. ];
  201. $url = 'http://47.114.185.186:8115/rlapi/busSysMsgData';
  202. // $url .= http_build_query($data);
  203. http($url, $column, [], 'POST');
  204. }
  205. function rcInfoMsg($topic, $msg)
  206. {
  207. // {
  208. // "IMEI": "867160049332715",
  209. // "RC_Number": "867160049332715",
  210. // "RC_Type": 1,
  211. // "RC_Pres": 1,
  212. // "RC_Total": 25,
  213. // "GPS_X": "116.25"
  214. // "GPS_Y": "29.36",
  215. // "Msgid": "dkx12-15dss-ad567-1a2ss",
  216. // "Time": 1676257143
  217. // }
  218. $arr = json_decode($msg, true);
  219. if (empty($arr)) {
  220. rlog("ERR", "json_decode");
  221. return;
  222. }
  223. $column = [
  224. 'imei' => $arr['IMEI'],
  225. 'rc_number' => $arr['RC_Number'],
  226. 'rc_type' => $arr['RC_Type'],
  227. 'rc_pres' => $arr['RC_Pres'],
  228. 'rc_total' => $arr['RC_Total'],
  229. 'gps_x' => $arr['GPS_X'],
  230. 'gps_y' => $arr['GPS_Y'],
  231. 'msgid' => $arr['Msgid'],
  232. 'time' => $arr['Time']
  233. ];
  234. $url = 'http://47.114.185.186:8115/rlapi/busRcInfoData';
  235. // $url .= http_build_query($data);
  236. http($url, $column, [], 'POST');
  237. }
  238. function loop()
  239. {
  240. $server = 'develop.rltest.cn';
  241. $port = 1883;
  242. $clientId = 'mqttx_f845f9b0';
  243. $username = 'rl517';
  244. $password = "rlian2022";
  245. $clean_session = true;
  246. $connectionSettings = new ConnectionSettings();
  247. $connectionSettings = $connectionSettings
  248. ->setUsername($username)
  249. ->setPassword($password)
  250. ->setKeepAliveInterval(60)
  251. // Last Will 设置
  252. // ->setLastWillTopic('emqx/test/last-will')
  253. // ->setLastWillMessage('client disconnect')
  254. // ->setLastWillQualityOfService(1)
  255. ;
  256. //include "RLog.php";
  257. // $mqtt = new MqttClient($server, $port, $clientId, MqttClient::MQTT_3_1, null, new RLog());
  258. $mqtt = new MqttClient($server, $port, $clientId);
  259. $mqtt->connect($connectionSettings, $clean_session);
  260. rlog('INFO', "connect OK");
  261. /*
  262. 消息方向 设备->服务器
  263. 设备主动上报当前设备公共信息参数:ScBusTem/DevRegularInfo
  264. 服务器获取设备系统信息后设备上传信息,即GetDevSysMsg的回应 ScBusTem/GetUpDevSysMsg
  265. 服务器设置设备重量信息信息 ScBusTem/RCInfoMsg
  266. */
  267. //订阅心跳数据
  268. $mqtt->subscribe('RL4RSSI/devOntime', function ($topic, $message) {
  269. rlog("INFO", 'recv', $topic, $message);
  270. var_dump($message);
  271. }, 0);
  272. // $mqtt->subscribe('ScBusTem/GetDevSysMsg/*', function ($topic, $message) {
  273. // rlog("INFO", 'recv', $topic, $message);
  274. // getDevSysMsg($topic, $message);
  275. // }, 0);
  276. //终端上报系统信息数据
  277. $mqtt->subscribe('RL4RSSI/rfidinfos', function ($topic, $message) {
  278. rlog("INFO", 'recv', $topic, $message);
  279. var_dump($message);
  280. }, 0);
  281. $mqtt->loop(true);
  282. }
  283. while (1) {
  284. try {
  285. rlog('INFO', 'connect start');
  286. loop();
  287. } catch (\Exception $ex) {
  288. rlog("ERR", $ex->getTraceAsString());
  289. rlog("ERR", $ex->getMessage());
  290. }
  291. sleep(3);
  292. }