tongshanglei 10 달 전
부모
커밋
85c8dc4464
1개의 변경된 파일220개의 추가작업 그리고 0개의 파일을 삭제
  1. 220 0
      task_script/LIVESTOCK_MQTT_REPEAT.php

+ 220 - 0
task_script/LIVESTOCK_MQTT_REPEAT.php

@@ -0,0 +1,220 @@
+<?php
+require('../vendor/autoload.php');
+use \PhpMqtt\Client\MqttClient;
+use \PhpMqtt\Client\ConnectionSettings;
+use think\facade\Cache;
+// date_default_timezone_set("PRC");
+// date_default_timezone_set("America/Bahia");
+// define('HOST', 'r-bp1eebab79320044pd.redis.rds.aliyuncs.com');
+// define('PORT', '6379');
+// define('PASSWORD', '7e2b5c91e438be3c!');
+// define('DATABASE', 4);
+define('HOST', '127.0.0.1');
+define('PORT', '6379');
+define('PASSWORD', '123456');
+define('DATABASE', 2);
+
+
+function rlog(...$args)
+{
+    if (empty($args[0])) {
+        return;
+    }
+    static $LOG_CONSOLE = false; //是否输出到控制台
+    static $LOG_NAME = "livestock_repeat.log"; //值为空时 不写入文件
+    static $LOG_SIZE = 64 * 1024 * 1024; //文件最大尺寸
+
+    static $LOG_CACHE = false; //是否缓存日志内容 用于批量写入文件
+    static $CACHE_DURATION = 10; //缓存最大时间 秒
+    static $CACHE_SIZE = 1024; //缓存大小
+    static $cacheStartTime = 0;
+    static $cacheBuf = '';
+
+    static $LOG_TIMES = 10; //调用这个函数最大次数 超过次数后判断下文件大小
+    static $logCount = 0;
+
+
+    $buf = '';
+    if (count($args) == 1 && $args[0] == "\n") { //只有换行时 不写入时间戳了
+        $buf = "\n";
+    } else {
+        $pid = ''; //进程id
+        if (function_exists('posix_getpid')) {
+            $pid = ' ' . posix_getpid() . ' ';
+        }
+        $fileLine = ''; //文件名:行号
+        {
+            $debug = debug_backtrace();
+            $fileLine = ($pid == '' ? ' ' : '') . basename($debug[0]['file']) . ':' . $debug[0]['line'] . ' ';
+        }
+        $buf = date("y-m-d H:i:s") . "{$pid}{$fileLine}" . implode(' ', $args) . "\n";
+    }
+
+    $logCount++;
+    if (!empty($LOG_NAME)) {
+        if ($LOG_CACHE) {
+            $cacheBuf .= $buf;
+            //超过缓存尺寸 或者 超过缓存时长 写缓存到文件
+            if (strlen($cacheBuf) > $CACHE_SIZE || time() - $cacheStartTime > $CACHE_DURATION) {
+                $cacheStartTime = time();
+                goto write;
+            } else {
+                goto skipWrite;
+            }
+        } else {
+            $cacheBuf = $buf;
+        }
+        write: {
+            //超过尺寸后 删除旧文件 把新文件重命名为旧文件  多进程同时操作 不加锁问题不大
+            if ($logCount > $LOG_TIMES && filesize($LOG_NAME) > $LOG_SIZE) {
+                $oldLogName = $LOG_NAME . '.old';
+                if (file_exists($oldLogName)) {
+                    if (!unlink($oldLogName)) {
+                        echo "unlink err\n";
+                    }
+                }
+                if (!rename($LOG_NAME, $oldLogName)) {
+                    echo "rename err\n";
+                }
+                $logCount = 0;
+            }
+            if (!file_put_contents($LOG_NAME, $cacheBuf, FILE_APPEND)) {
+                echo "file_put_contents err\n";
+            }
+            $cacheBuf = '';
+        }
+        skipWrite: {
+        }
+    }
+    if ($LOG_CONSOLE) {
+        echo $buf;
+    }
+}
+
+function loop()
+{
+    $server   = '116.62.220.88';
+    $port     = 1883;
+    $clientId = 'repeat_mqtt_livestock_develop';
+    $username = 'rl517';
+    $password = "rlian2022";
+    $clean_session = false;
+    $connectionSettings  = new ConnectionSettings();
+    $connectionSettings = $connectionSettings
+        ->setUsername($username)
+        ->setPassword($password)
+        ->setKeepAliveInterval(60)
+        // Last Will 设置
+        //        ->setLastWillTopic('emqx/test/last-will')
+        //        ->setLastWillMessage('client disconnect')
+        //        ->setLastWillQualityOfService(1)
+    ;
+
+    //include "RLog.php";
+    //    $mqtt = new MqttClient($server, $port, $clientId, MqttClient::MQTT_3_1, null, new RLog());
+
+    $mqtt = new MqttClient($server, $port, $clientId);
+
+    $mqtt->connect($connectionSettings, $clean_session);
+    rlog('INFO', "connect OK");
+
+    /*
+    消息方向	设备->服务器 
+    设备主动上报当前设备公共信息参数:ScBusTem/DevRegularInfo
+    服务器获取设备系统信息后设备上传信息,即GetDevSysMsg的回应 ScBusTem/GetUpDevSysMsg 
+    服务器设置设备重量信息信息 ScBusTem/RCInfoMsg
+    */
+
+
+    // $mqtt->subscribe('ScBusTem/GetDevSysMsg/*', function ($topic, $message) {
+    //     rlog("INFO", 'recv', $topic, $message);
+    //     getDevSysMsg($topic, $message);
+    // }, 0);
+    //终端上报系统信息数据
+    $mqtt->subscribe('earings/+/reportData', function ($topic, $message) use($mqtt) {
+        rlog("reportData", 'recv', $topic, $message);
+        $topicArr=explode('/',$topic);
+        $deviceId=$topicArr[1];
+
+        mqttRepeat($topic, $message);
+    }, 1);
+    // $mqtt->subscribe('earings/+/cloudResp', function ($topic, $message) use($mqtt) {
+    //     rlog("cloudResp", 'recv', $topic, $message);
+    //     mqttRepeat($topic, $message);
+    // }, 1);
+    // $mqtt->subscribe('earings/+/cloudControl', function ($topic, $message) use($mqtt) {
+    //     rlog("cloudControl", 'recv', $topic, $message);
+    //     mqttRepeat($topic, $message);
+    // }, 1);
+    // $mqtt->subscribe('$SYS/brokers/+/clients/+/connected', function ($topic, $message) use($mqtt) {
+    //     rlog("connected", 'recv', $topic, $message);
+    //     $data=json_decode($message,true);
+    //     $data['deviceId']=$data['clientid'];
+    //     $data['data_type']='connected';
+    // }, 1);
+    // $mqtt->subscribe('$SYS/brokers/+/clients/+/disconnected', function ($topic, $message) use($mqtt) {
+    //     rlog("connected", 'recv', $topic, $message);
+    //     $data=json_decode($message,true);
+    //     $data['deviceId']=$data['clientid'];
+    //     $data['data_type']='disconnected';
+    // }, 1);
+    // 上线,: $SYS/brokers/+/clients/+/connected
+	// 下线,: $SYS/brokers/+/clients/+/disconnected
+    $mqtt->loop(true);
+}
+
+function mqttRepeat($topic,$message){
+    
+
+
+    $server   = '61.175.203.188';
+    $port     = 10002;
+    $clientId = 'repeat_mqtt_livestock_ningbo';
+    $username = 'rltest';
+    $password = "rlian@24329";
+    $clean_session = false;
+
+    $connectionSettings  = new ConnectionSettings();
+    $connectionSettings = $connectionSettings
+        ->setUsername($username)
+        ->setPassword($password)
+        ->setKeepAliveInterval(60)
+        // Last Will 设置
+        //        ->setLastWillTopic('emqx/test/last-will')
+        //        ->setLastWillMessage('client disconnect')
+        //        ->setLastWillQualityOfService(1)
+    ;
+
+    $mqtt2 = new MqttClient($server, $port, $clientId);
+
+    $mqtt2->connect($connectionSettings, $clean_session);
+    echo 'connect OK'.PHP_EOL;
+    echo 'topic:'.$topic.PHP_EOL;
+    echo 'message:'.$message.PHP_EOL;
+    $res=$mqtt2->publish(
+        $topic,
+        $message,
+        1
+    );
+    echo 'publish end'.PHP_EOL;
+    $mqtt2->loop(true,true);
+    $mqtt2->disconnect();
+    return $res;
+
+    
+} 
+
+
+while (1) {
+    try {
+        rlog('INFO', 'connect start');
+        loop();
+    } catch (\Exception $ex) {
+        rlog("ERR", $ex->getTraceAsString());
+        rlog("ERR", $ex->getMessage());
+    }
+    sleep(3);
+}
+// $text='{"idESim":460046697314223,"stepCount":0,"EnvironmentTemperature":"27.0","earTemperature":"22.2","latitude":0,"longitude":0,"charging":1,"lastCharge":1704328944,"battery-level":99,"measurementTimestamp":1691173083,"agnss-dtime":16170,"agnss-inserttime":28050,"gnss-locatetime":39600,"gnss-satnum":1,"gnss-cn":28,"csq":"0-0","edrxrdp":",,","deviceId":"869154043484299-999202300000012","data_type":"reportData"}';
+// $text='{"clean_start":false,"clientid":"866216066939047-999274877906912","connack":0,"connected_at":1716169665,"expiry_interval":86400,"ipaddress":"39.144.129.107","keepalive":120,"proto_name":"MQTT","proto_ver":4,"sockport":1883,"ts":1716169665478,"username":"rl517","deviceId":"866216066939047-999274877906912","data_type":"connected"}';
+// app_redis()->lpush("mqtt_data_livestock",$text);