Browse Source

parea second push

nana_sen 1 year ago
parent
commit
f4cfbc55a7
3 changed files with 583 additions and 0 deletions
  1. 204 0
      catch/api/controller/Api.php
  2. 1 0
      catch/api/route.php
  3. 378 0
      task_script/RL4RSSI_PAREA_MQTT.php

+ 204 - 0
catch/api/controller/Api.php

@@ -135,9 +135,213 @@ class Api extends CatchController
 
 
     }
+    public function redis_to_mysql_parea(){
 
+        $ues_redis=Cache::store('redis')->handler();
+        $text="";
+        $dispose = new dispose($ues_redis);
+        while(1){
+            $jsonData= $ues_redis->rpop("mqtt_data_parea4rssi");
+            debug_log("InAndOUT",'数据redis'.$jsonData);
+            
+            if(empty($jsonData)){
+                debug_log("InAndOUT",'没有数据');
+                sleep(1);
+                continue;
+                
+            }
+            // debug_log("InAndOUT",$jsonData);
+            $data=json_decode($jsonData,true);
+            // debug_log("InAndOUT",$data['cnt']);
+            $time=$data['time'];
+            $mac=$data['mac'];
+            $text=$text.$data['cnt'];
+            
+            if(strlen($text)<24){
+                continue;
+            }
+            $text = $this->handleMessage($text, $mac, $time, $ues_redis);
+
+            // foreach ($jsonData as $k => $v) {
+            //     $data = json_decode($v, true);
+            //     $config = $this->selectHash("anbang_four_wire",$data['mac']);
+            //     if(!$config){
+            //         $config = [
+            //             "front" => 70,
+            //             "behind" => 70,
+            //             "left" => 70,
+            //             "right" => 70
+            //         ];
+            //     }else{
+            //         $config = json_decode($config);
+            //     }
+            //     $field = "";
+            //     $infos = $ues_redis->hget("parea_rfidinfos",$field);
+            //     if(!$infos){
+                    
+            //     }
+            //     //判断进出段
+            //     //1。当前时间 - 上次时间 > 设定时间
+            //     if( (time() - $v["time"]) > $setInt ){
+            //         //推送离开
+            //     }
+            // }
+            sleep(2);
+
+        }
+
+
+    }
+    // //http://47.114.185.186:8115/api/areaReport
+// //{“mac”:”013560”,”data”:[{“label”:”258962f”,”time”:1682317268,”dirt”:2},{“label”:”258962f”,”time”:1682317268,”dirt”:1}]
+//             }
+
+    //消息处理
+    public function handleMessage($text, $mac, $time, $ues_redis){
+        while(strlen($text)>=24){
+    
+            $da=substr($text,0,24);
+            $t1 = substr($da,8,2)=='00'||substr($da,8,2)=='10'||substr($da,8,2)=='01' ? true:false;
+            $t2 =substr($da,10,6)=="FFFFFF"||substr($da,10,6)=="000000" ?true:false;
+            
+            if($t1&&$t2){
+                // var_dump($text);
+                $rfid= substr($da,0,8);
+                $rssi1= hexdec(substr($da,16,2));
+                $rssi2= hexdec(substr($da,18,2));
+                $rssi3= hexdec(substr($da,20,2));
+                $rssi4= hexdec(substr($da,22,2));
+                // $rc1 = ($rssi1 == 255 ? 0 : 1);
+                // $rc2 = ($rssi2 == 255 ? 0 : 1);
+                // $rc3 = ($rssi3 == 255 ? 0 : 1);
+                // $rc4 = ($rssi4 == 255 ? 0 : 1);
+                $DA=[
+                    'mac' => $mac,
+                    'label' =>$rfid,
+                    'rssi1' => $rssi1,
+                    'rssi2' => $rssi2,
+                    'rssi3' => $rssi3,
+                    'rssi4' => $rssi4,
+                    'report_time'=>$time
+                ];
+                
+                $field = $mac."_".$rfid;
+                $info = $ues_redis->hget("parea_rfidinfos",$field);
+                debug_log("InAndOUT","数据:".json_encode($DA,true));
+
+                if(!$info){
+                    $info = [
+                        "mac" => $mac,
+                        "id" => $rfid,
+                        "firs_time" => $time,
+                        "last_time" => $time,
+                        "status" => 1,//1出状态,2进状态
+                        "rssi" => [
+                            [
+                                "front" => $rssi1,
+                                "behind" => $rssi2,
+                                "left" => $rssi3,
+                                "right" => $rssi4,
+                                "time" => $time
+                            ]
+                        ]
+                    ];
+                    $ues_redis->hset("parea_rfidinfos",$field, json_encode($info,true));
+                }else{
+                    $info = json_decode($info,true);
+
+                    $info["last_time"] = $time;
+                    $newRssi = [
+                        "front" => $rssi1,
+                        "behind" => $rssi2,
+                        "left" => $rssi3,
+                        "right" => $rssi4,
+                        "time" => $time
+                    ];
+                    $rssisArr = $info["rssi"];
+
+                    foreach ($rssisArr as $k => $v) {
+                        # code...
+                        if( ($time - $v["time"] ) > $EXP_TIME ){
+                            unset($rssisArr[$k]);
+                        }
+                    }
+                    if(count($rssisArr) == $EFF_SIG_NUM){
+                        array_shift($rssisArr);
+                        array_push($rssisArr, $newRssi);
+                    }else{
+                        array_push($rssisArr, $newRssi);
+                    }
+                    $info["rssi"] = $rssisArr;
+
+                    $cacCount = count($rssisArr);
+                    // if($cacCount > $EFF_SIG_NUM){
+                    $sum1 = 0;
+                    $sum2 = 0;
+                    $sum3 = 0;
+                    $sum4 = 0;
+                    foreach ($rssisArr as $k => $v) {
+                        # code...
+                        $sum1 += ($v["front"] == 255 ? 85 : $v["front"]);
+                        $sum2 += ($v["behind"] == 255 ? 85 : $v["behind"]);
+                        $sum3 += ($v["left"] == 255 ? 85 : $v["left"]);
+                        $sum4 += ($v["right"] == 255 ? 85 : $v["right"]);
+                    }
+                    $info["avg"] = [
+                        "front" => number_format( $sum1/$cacCount, 2 ),
+                        "behind" => number_format( $sum2/$cacCount, 2 ),
+                        "left" => number_format( $sum3/$cacCount, 2 ),
+                        "right" => number_format( $sum4/$cacCount, 2 ),
+                    ];
+
+                    if ($cacCount == $EFF_SIG_NUM) {
+                        # code...
+                        $this->compAndPush($info["avg"], $mac, $rfid, $ues_redis);
+                    }
+
+                    $ues_redis->hset("parea_rfidinfos",$field, json_encode($info,true));
+                }
+                $text=substr($text,24);
+            }else{
+                $text=substr($text,1);
+            }
+        }
+        return $text ?$text : "";
+    }
+    //判断进出和推送
+    public function compAndPush($data, $mac, $rfid, $ues_redis){
+        //标签多,数据多时候,判断完成后放入redis,异步推送,当前暂时直接推
+        if (empty($data)) {
+            # code...
+            return false;
+        }
+        $config  = $ues_redis->hget("anbang_four_wire", $mac);
+
+        //{"front_in":44,"front_out":44,"behind_in":50,"behind_out":50,"left_in":55,"left_out":55,"right_out":44,"right_out":44}
+        if (!$config) {
+            # code...
+            $config = [
+                "front_in" => 70,
+                "behind_in" => 70,
+                "left_in" => 70,
+                "right_in" => 70,
+                "front_out" => 80,
+                "behind_out" => 80,
+                "left_out" => 80,
+                "right_out" => 80,
+            ];     
+        }
 
+        $flagIn = $data["front"] < $config["front_in"] || $data["behind"] < $config["behind_in"] || $data["left"] < $config["left_in"] || $data["right"] < $config["right_in"];
+        $flagOut = $data["front"] > $config["front_out"] && $data["behind"] < $config["behind_out"] && $data["left"] < $config["left_out"] && $data["right"] < $config["right_out"];
 
+        if($flagIn){
+            //推送进
+        }else{
+            //推送出
+        }
+        return true;
+    }
 
    
     public function test(){

+ 1 - 0
catch/api/route.php

@@ -28,6 +28,7 @@ $router->post('api/report', '\catchAdmin\api\controller\Api@report');
 $router->get('api/deleteHistory', '\catchAdmin\api\controller\Api@deleteHistory');
 $router->get('api/test', '\catchAdmin\api\controller\Api@test');
 $router->get('api/redis', '\catchAdmin\api\controller\Api@redis_to_mysql');
+$router->get('api/redis_parea', '\catchAdmin\api\controller\Api@redis_to_mysql_parea');
 
 //更新软件版本测试
 //$router->get('api/detectionVersion', '\catchAdmin\api\controller\Api@detectionVersion');

+ 378 - 0
task_script/RL4RSSI_PAREA_MQTT.php

@@ -0,0 +1,378 @@
+<?php
+require('../vendor/autoload.php');
+use \PhpMqtt\Client\MqttClient;
+use \PhpMqtt\Client\ConnectionSettings;
+use think\facade\Cache;
+date_default_timezone_set("PRC");
+define('HOST', '127.0.0.1');
+define('PORT', '6379');
+define('PASSWORD', '123456');
+define('DATABASE', 3);
+
+
+function app_redis()
+{
+    static $redis = null;
+    static $conn = false;
+    if (!$conn) {
+        connect: //定义标签
+        $redis = new Redis();
+        try {
+            //建立的Redis短连接,在请求结束后不会自动关闭,相当于持久连接.
+            $conn = $redis->connect(HOST, PORT);
+            $conn = $redis->auth(PASSWORD);
+            $conn = $redis->select(DATABASE);
+            // 连接成功,返回$redis对象,连接失败,返回false.
+            return ($conn === true) ? $redis : false;
+        } catch (Exception $e) {
+            return false;
+        }
+    } else {
+        // 这里假设PHP-FPM在处理一个请求的时间内,Redis连接都是可用的.
+        // 所以只在PHP-CLI下检查Redis连接的状态,进行断线重连.
+        if (php_sapi_name() === 'cli') {
+            try {
+                // ping用于检查当前连接的状态,成功时返回+PONG,失败时抛出一个RedisException对象.
+                // ping失败时警告:
+                // Warning: Redis::ping(): connect() failed: Connection refused
+                // var_dump('AAAAAAAAA', $redis);
+                echo 'Redis 连接状态' . $redis->ping() . PHP_EOL;
+                @$redis->ping();
+                if (!$redis->ping()) {
+                    goto connect; //跳转到标签出继续执行连接操作
+                }
+            } catch (Exception  $e) {
+                // 信息如 Connection lost 或 Redis server went away
+                echo $e->getMessage();
+                echo 'Redis 连接失败 重新连接:' . PHP_EOL;
+                // 断线重连
+                goto connect;
+            }
+        }
+        return $redis;
+    }
+}
+
+function rlog(...$args)
+{
+    if (empty($args[0])) {
+        return;
+    }
+    static $LOG_CONSOLE = false; //是否输出到控制台
+    static $LOG_NAME = "school_mqtt.log"; //值为空时 不写入文件
+    static $LOG_SIZE = 64 * 1024 * 1024; //文件最大尺寸
+
+    static $LOG_CACHE = false; //是否缓存日志内容 用于批量写入文件
+    static $CACHE_DURATION = 10; //缓存最大时间 秒
+    static $CACHE_SIZE = 1024; //缓存大小
+    static $cacheStartTime = 0;
+    static $cacheBuf = '';
+
+    static $LOG_TIMES = 10; //调用这个函数最大次数 超过次数后判断下文件大小
+    static $logCount = 0;
+
+
+    $buf = '';
+    if (count($args) == 1 && $args[0] == "\n") { //只有换行时 不写入时间戳了
+        $buf = "\n";
+    } else {
+        $pid = ''; //进程id
+        if (function_exists('posix_getpid')) {
+            $pid = ' ' . posix_getpid() . ' ';
+        }
+        $fileLine = ''; //文件名:行号
+        {
+            $debug = debug_backtrace();
+            $fileLine = ($pid == '' ? ' ' : '') . basename($debug[0]['file']) . ':' . $debug[0]['line'] . ' ';
+        }
+        $buf = date("y-m-d H:i:s") . "{$pid}{$fileLine}" . implode(' ', $args) . "\n";
+    }
+
+    $logCount++;
+    if (!empty($LOG_NAME)) {
+        if ($LOG_CACHE) {
+            $cacheBuf .= $buf;
+            //超过缓存尺寸 或者 超过缓存时长 写缓存到文件
+            if (strlen($cacheBuf) > $CACHE_SIZE || time() - $cacheStartTime > $CACHE_DURATION) {
+                $cacheStartTime = time();
+                goto write;
+            } else {
+                goto skipWrite;
+            }
+        } else {
+            $cacheBuf = $buf;
+        }
+        write: {
+            //超过尺寸后 删除旧文件 把新文件重命名为旧文件  多进程同时操作 不加锁问题不大
+            if ($logCount > $LOG_TIMES && filesize($LOG_NAME) > $LOG_SIZE) {
+                $oldLogName = $LOG_NAME . '.old';
+                if (file_exists($oldLogName)) {
+                    if (!unlink($oldLogName)) {
+                        echo "unlink err\n";
+                    }
+                }
+                if (!rename($LOG_NAME, $oldLogName)) {
+                    echo "rename err\n";
+                }
+                $logCount = 0;
+            }
+            if (!file_put_contents($LOG_NAME, $cacheBuf, FILE_APPEND)) {
+                echo "file_put_contents err\n";
+            }
+            $cacheBuf = '';
+        }
+        skipWrite: {
+        }
+    }
+    if ($LOG_CONSOLE) {
+        echo $buf;
+    }
+}
+
+function http($url, $params, $header = [], $method = 'GET', $timeout = 10)
+{
+    // POST $params 字符串形式query=abcd&abc=12345
+    //GET $params 数组['query' => 'abcd', 'abc' => 12345]
+    //  $header[] = "Content-Type: application/x-www-form-urlencoded";
+    //  $header[] = "Content-Type: application/soap+xml; charset=utf-8";
+    //  $header[] = "Content-Type: application/json; charset=utf-8";
+    //  $header[] = "Expect: ";
+    rlog("[HTTP] url:$url,method:$method" . ",header:" . json_encode($header));
+    if (strtoupper($method) == 'POST') {
+        rlog("[POST] send params " . (!is_array($params) ? $params : json_encode($params, JSON_UNESCAPED_UNICODE)));
+    } else {
+        rlog("[GET] send " . json_encode($params));
+    }
+    $header[] = "Expect: ";
+    $opts = array(
+        CURLOPT_TIMEOUT => $timeout,
+        CURLOPT_RETURNTRANSFER => 1,
+        CURLOPT_SSL_VERIFYPEER => false,
+        CURLOPT_SSL_VERIFYHOST => false,
+        CURLOPT_HTTPHEADER => $header
+    );
+
+    /* 根据请求类型设置特定参数 */
+    switch (strtoupper($method)) {
+        case 'GET':
+            $opts[CURLOPT_URL] = $url . (empty($params) ? '' : ('?' . http_build_query($params)));
+            break;
+        case 'POST':
+            //$params = http_build_query($params);
+            $opts[CURLOPT_URL] = $url;
+            $opts[CURLOPT_POST] = 1;
+            $opts[CURLOPT_POSTFIELDS] = json_encode($params);
+            break;
+        default:
+            rlog("[ERR] method " . $method);
+            return false;
+    }
+
+    global $ch; //curl长连接
+    if (empty($ch)) {
+        $ch = curl_init();
+    }
+    if (empty($ch)) {
+        rlog("[ERR] curl_init");
+        return false;
+    }
+
+    $csa = curl_setopt_array($ch, $opts);
+    if (empty($csa)) {
+        rlog("[ERR] curl_setopt_array");
+        return false;
+    }
+
+    $data = curl_exec($ch);
+    if ($data === false) {
+        rlog("[ERR] curl_exec errno:" . curl_errno($ch) . " " . curl_error($ch));
+        return false;
+    }
+    //unicode转中文
+    $data = decodeUnicode($data);
+    rlog("[HTTP] recv " . $data);
+    //curl_close($ch);
+    return $data;
+}
+function decodeUnicode($str)
+{
+    return preg_replace_callback('/\\\\u([0-9a-f]{4})/i', function ($matches) {
+        return iconv("UCS-2BE", "UTF-8", pack("H*", $matches[1]));
+    }, $str);
+}
+function devRegularInfo($topic, $msg)
+{
+    // {"CSQ":26,"REP_INT":60,"GPS":"112.14060,32.06532","IMEI":"867160049332715","IMSI":"460041872816952","CCID":"898607B8101980060659","SYS_VER":"RLSC_V0.1","Time":1676257143,"Status":0,"StatusMsg":"设备正常","Initiative":1}
+    $data = json_decode($msg, true);
+    if (empty($data)) {
+        rlog("ERR", "json_decode");
+        return;
+    }
+    rlog("[I]", $msg);
+    $url = 'http://47.114.185.186:8115/api/trackReport';
+    // $url .= http_build_query($column);
+    http($url, $data, [], 'POST');
+    $loc = explode(',', $data['GPS']);
+    $column = [
+        'csq' => $data['CSQ'] ?: '',
+        'rep_int' => $data['REP_INT'] ?: '',
+        'latitude' => isset($loc[0]) ? $loc[0] : '',
+        'longitude' => isset($loc[1]) ? $loc[1] : '',
+        'imei' => $data['IMEI'] ?: '',
+        'imsi' => $data['IMSI'] ?: '',
+        'iccid' => $data['ICCID'] ?: '',
+        'version' => $data['SYS_VER'] ?: '',
+        'time' => $data['Time'] ?: '',
+        'status' => $data['Status'] ?: '',
+        'status_msg' => $data['StatusMsg'] ?: '',
+        'initiative' => $data['Initiative'] ?: ''
+    ];
+
+    $url = 'http://47.114.185.186:8115/rlapi/busHeartbeatData';
+    // $url .= http_build_query($column);
+    http($url, $column, [], 'POST');
+}
+
+function getUpDevSysMsg($topic, $msg)
+{
+    // {
+    //     "IMEI": "867160049332715",
+    //     "Mqtt_Host": "develop.rltest.cn",
+    //     "Mqtt_Port": 1883,
+    //     "Mqtt_User": " rl517",
+    //     "Mqtt_Password": "rlian2022",
+    //     "TTS_TEXT": "某某科技欢迎您",
+    //     "GPS_EN": "0",
+    //     "Time": 1676257143
+    // }
+    $arr = json_decode($msg, true);
+    if (empty($arr)) {
+        rlog("ERR", "json_decode");
+        return;
+    }
+    $column = [
+        'imei' => $arr['IMEI'],
+        'mqtt_host' => $arr['Mqtt_Host'],
+        'mqtt_port' => $arr['Mqtt_Port'],
+        'mqtt_user' => $arr['Mqtt_User'],
+        'mqtt_password' => $arr['Mqtt_Password'],
+        'tts_text' => $arr['TTS_TEXT'],
+        'gps_en' => $arr['GPS_EN'],
+        'time' => $arr['Time']
+    ];
+    $url = 'http://47.114.185.186:8115/rlapi/busSysMsgData';
+    // $url .= http_build_query($data);
+    http($url, $column, [], 'POST');
+}
+function rcInfoMsg($topic, $msg)
+{
+
+    // {
+    //     "IMEI": "867160049332715",
+    //     "RC_Number": "867160049332715",
+    //     "RC_Type": 1,
+    //     "RC_Pres": 1,
+    //     "RC_Total": 25,
+    //     "GPS_X": "116.25"
+    //     "GPS_Y": "29.36",
+    //     "Msgid": "dkx12-15dss-ad567-1a2ss",
+    //     "Time": 1676257143
+    // }
+    $arr = json_decode($msg, true);
+    if (empty($arr)) {
+        rlog("ERR", "json_decode");
+        return;
+    }
+    $column = [
+        'imei' => $arr['IMEI'],
+        'rc_number' => $arr['RC_Number'],
+        'rc_type' => $arr['RC_Type'],
+        'rc_pres' => $arr['RC_Pres'],
+        'rc_total' => $arr['RC_Total'],
+        'gps_x' => $arr['GPS_X'],
+        'gps_y' => $arr['GPS_Y'],
+        'msgid' => $arr['Msgid'],
+        'time' => $arr['Time']
+    ];
+    $url = 'http://47.114.185.186:8115/rlapi/busRcInfoData';
+    // $url .= http_build_query($data);
+    http($url, $column, [], 'POST');
+}
+function loop()
+{
+    $server   = 'develop.rltest.cn';
+    $port     = 1883;
+    $clientId = 'mqttx_testparea9659';
+    $username = 'rl517';
+    $password = "rlian2022";
+    $clean_session = true;
+
+    $connectionSettings  = new ConnectionSettings();
+    $connectionSettings = $connectionSettings
+        ->setUsername($username)
+        ->setPassword($password)
+        ->setKeepAliveInterval(60)
+        // Last Will 设置
+        //        ->setLastWillTopic('emqx/test/last-will')
+        //        ->setLastWillMessage('client disconnect')
+        //        ->setLastWillQualityOfService(1)
+    ;
+
+    //include "RLog.php";
+    //    $mqtt = new MqttClient($server, $port, $clientId, MqttClient::MQTT_3_1, null, new RLog());
+
+    $mqtt = new MqttClient($server, $port, $clientId);
+
+    $mqtt->connect($connectionSettings, $clean_session);
+    rlog('INFO', "connect OK");
+
+    /*
+消息方向	设备->服务器 
+设备主动上报当前设备公共信息参数:ScBusTem/DevRegularInfo
+服务器获取设备系统信息后设备上传信息,即GetDevSysMsg的回应 ScBusTem/GetUpDevSysMsg 
+服务器设置设备重量信息信息 ScBusTem/RCInfoMsg
+*/
+    //订阅心跳数据
+    $mqtt->subscribe('PArea4RSSI/devOntime', function ($topic, $message) {
+        rlog("INFO", 'recv', $topic, $message);
+        var_dump($message);
+    }, 0);
+
+
+    // $mqtt->subscribe('ScBusTem/GetDevSysMsg/*', function ($topic, $message) {
+    //     rlog("INFO", 'recv', $topic, $message);
+    //     getDevSysMsg($topic, $message);
+    // }, 0);
+    //终端上报系统信息数据
+    $mqtt->subscribe('PArea4RSSI/rfidinfos', function ($topic, $message) {
+        rlog("INFO", 'recv', $topic, $message);
+        if(!empty($message)){
+            mqttToRedis($message);
+        }
+        
+    }, 0);
+    $mqtt->loop(true);
+}
+
+// FFFFFFFFFF000014FF000002200000E2B15AD501FFFFFFFF49FFFF46
+//                               5bc6f8a110FFFFFFFF474d4a49
+function mqttToRedis($text){
+    
+    try{
+        app_redis()->lpush("mqtt_data_parea4rssi",$text);
+    }catch(\Exception $e){
+        rlog("INFO", 'recv',"redis 异常".$e->getMessage());
+    }
+    
+} 
+
+while (1) {
+    try {
+        rlog('INFO', 'connect start');
+        loop();
+    } catch (\Exception $ex) {
+        rlog("ERR", $ex->getTraceAsString());
+        rlog("ERR", $ex->getMessage());
+    }
+    sleep(3);
+}