Quellcode durchsuchen

Merge branch 'master' of http://gogs.renlianiot.com:4000/zmcoding/station-test-api

nana_sen vor 1 Jahr
Ursprung
Commit
92795321fa

Datei-Diff unterdrückt, da er zu groß ist
+ 120 - 75
catch/api/controller/Api.php


+ 194 - 53
catch/api/service/dispose.php

@@ -255,6 +255,9 @@ class dispose
                                 "mac"=>$data['mac'],
                                 "data"=>$data_array
                             ];
+                            //设定最后结果
+                            $this->set_time_results($data['mac'],$data['label'],$resArray['time'],$inAndOut);
+                            //推送远程数据
                             $this->getRemoteData($url_data);
                             //清除缓存数据
                             $this->get_label_history(3,$data['mac'],$data['label']);
@@ -301,45 +304,45 @@ class dispose
 
 
     //排序
-    public function sortlist($list){
-        $list01=[];
-        $list02=[];
-        $list03=[];
-        $list04=[];
-        debug_log("InAndOUT","排序的数据:".json_encode($list));
-        foreach($list as $item){
-           array_push($list01,$item['rssi1']);
-           array_push($list02,$item['rssi2']);
-           array_push($list03,$item['rssi3']);
-           array_push($list04,$item['rssi4']);
-        }
+    // public function sortlist($list){
+    //     $list01=[];
+    //     $list02=[];
+    //     $list03=[];
+    //     $list04=[];
+    //     debug_log("InAndOUT","排序的数据:".json_encode($list));
+    //     foreach($list as $item){
+    //        array_push($list01,$item['rssi1']);
+    //        array_push($list02,$item['rssi2']);
+    //        array_push($list03,$item['rssi3']);
+    //        array_push($list04,$item['rssi4']);
+    //     }
 
-        if(count($list)>=3){
-            sort($list01);
-            sort($list02);
-            sort($list03);
-            sort($list04);
-            array_pop($list01);
-            array_pop($list02);
-            array_pop($list03);
-            array_pop($list04);
-            array_shift($list01);
-            array_shift($list02);
-            array_shift($list03);
-            array_shift($list04);
-        }
-        debug_log("InAndOUT","排序后的队列:".json_encode($list01));
-        debug_log("InAndOUT","排序后的队列:".json_encode($list02));
-        debug_log("InAndOUT","排序后的队列:".json_encode($list03));
-        debug_log("InAndOUT","排序后的队列:".json_encode($list04));
-        $rssi1= array_sum($list01)/count($list01);
-        $rssi2= array_sum($list02)/count($list02);
-        $rssi3= array_sum($list03)/count($list03);
-        $rssi4= array_sum($list04)/count($list04);
-        $res=[$rssi1,$rssi2,$rssi3,$rssi4];
-        debug_log("InAndOUT","结算后的结果:".json_encode($res));
-        return $res;
-    }
+    //     if(count($list)>=3){
+    //         sort($list01);
+    //         sort($list02);
+    //         sort($list03);
+    //         sort($list04);
+    //         array_pop($list01);
+    //         array_pop($list02);
+    //         array_pop($list03);
+    //         array_pop($list04);
+    //         array_shift($list01);
+    //         array_shift($list02);
+    //         array_shift($list03);
+    //         array_shift($list04);
+    //     }
+    //     debug_log("InAndOUT","排序后的队列:".json_encode($list01));
+    //     debug_log("InAndOUT","排序后的队列:".json_encode($list02));
+    //     debug_log("InAndOUT","排序后的队列:".json_encode($list03));
+    //     debug_log("InAndOUT","排序后的队列:".json_encode($list04));
+    //     $rssi1= array_sum($list01)/count($list01);
+    //     $rssi2= array_sum($list02)/count($list02);
+    //     $rssi3= array_sum($list03)/count($list03);
+    //     $rssi4= array_sum($list04)/count($list04);
+    //     $res=[$rssi1,$rssi2,$rssi3,$rssi4];
+    //     debug_log("InAndOUT","结算后的结果:".json_encode($res));
+    //     return $res;
+    // }
 /**
  * 获取远程推送的数据
  *
@@ -349,6 +352,7 @@ class dispose
  */
 
 public function getRemoteData($data){
+
     $key="push_data";
     $redis=$this->redis;
     $redis->Rpush($key,json_encode($data));
@@ -364,13 +368,31 @@ public function set_label_history($data){
     $key=$MAC."_label_history";
     $head=$data['label']."_head";
     $tail=$data["label"]."_tail";
+    $frontMaxKey=$data['label'].'_frontMax';
+    $backMaxKey=$data['label'].'_backMax';
+    $frontMax=null;
+    $backMax=null;
+   
     //保存头部
     $head_data= $this->selectHash($key,$head);
     $tail_data=$this->selectHash($key,$tail);
-    debug_log("InAndOUT","最旧信号间隔-".($data["report_time"]-$tail_data[count($tail_data)-1]["report_time"]));
-    debug_log("InAndOUT","最旧信号间隔--".json_encode($tail_data));
-    debug_log("InAndOUT","最旧信号间隔---".json_encode($tail_data[count($tail_data)-1]));
-    debug_log("InAndOUT","最旧信号间隔----".$data["report_time"]);
+    //保存朝外的信号最大值
+    $frontMax=$this->selectHash($key,$frontMaxKey);
+    //保存朝内的信号最大值
+    $backMax=$this->selectHash($key,$backMaxKey);
+    if(empty($frontMax)){
+        $frontMax=$data["rssi1"];
+    }else{
+        $frontMax=$data["rssi1"]<$frontMax?$data["rssi1"]:$frontMax;    
+    }
+    if(empty($backMax)){
+        $backMax=$data["rssi2"];
+    }else{
+        $backMax=$data["rssi2"]<$backMax?$data["rssi2"]:$backMax;    
+    }
+    $this->setHash($key,$frontMaxKey,$frontMax);
+    $this->setHash($key,$backMaxKey,$backMax);
+
     if(!empty($tail_data)){
         if(($data["report_time"]-$tail_data[count($tail_data)-1]["report_time"])>6){
           $head_data=[];
@@ -387,7 +409,6 @@ public function set_label_history($data){
     }
     //保存尾部标签数据
     
-   
     $tail_data=empty($tail_data)?[]: $tail_data;
     array_push($tail_data,$data);
     while(count($tail_data)>3){
@@ -408,6 +429,8 @@ public function get_label_history($type,$mac,$label){
     $key=$mac."_label_history";
     $head=$label."_head";
     $tail=$label."_tail";
+    $frontMaxKey=$label.'_frontMax';
+    $backMaxKey=$label.'_backMax';
     if($type==1){
         $head_data= $this->selectHash($key,$head);
         return $head_data;
@@ -418,9 +441,23 @@ public function get_label_history($type,$mac,$label){
     }
     if($type==3){
         debug_log("InAndOUT","清空头部和尾部历史:".$mac." ".$label);
-        $tail_data= $this->delHash($key,$head);
-        $tail_data= $this->delHash($key,$tail);
+        $this->delHash($key,$head);
+        $this->delHash($key,$tail);
+        $this->delHash($key,$frontMaxKey);
+        $this->delHash($key, $backMaxKey);
+    }
+    if($type==4){
+        $frontMax= $this->selectHash($key,$frontMaxKey);
+        return $frontMax;
     }
+    if($type==5){
+        $backMax= $this->selectHash($key,$backMaxKey);
+        return $backMax;
+    }
+
+
+
+
 }
 /**
  * 二次生成进出
@@ -471,8 +508,7 @@ public function second_create_direction($mac,$label){
             $front+=1;
         }else{
             $behind+=1;
-        }
-       
+        } 
         $time=$item['report_time'];
    }
    $tail_dir=$front>$behind?1:2;
@@ -484,11 +520,77 @@ public function second_create_direction($mac,$label){
    }else{
         $res=2;
    }
+
+   $frontMax= $this->get_label_history(4,$mac,$label);
+   debug_log("second_dirt","标签.$label.的最朝外的最大值".$frontMax);
+   if($frontMax>65){
+       debug_log("second_dirt","标签.$label.的最朝外的最小值大于65不参与计算");
+       return false;
+   }
+
+   //早上对"进寝室不做二次计算
+
+   $start_time=strtotime(date("Y-m-d",time()));
+   $end_time=$start_time+60*60*8;
+   debug_log("second_dirt","判断的时间".date("Y-m-d H:i:s",$end_time));
+   debug_log("second_dirt","判断的时间".date("Y-m-d H:i:s",$time));
+   //当天结束之间 当天早上7点之前进寝室,不做二次计算
+    if($time<$end_time&&$res==1){
+        debug_log("second_dirt","当天早上7点之前进寝室,不做二次计算");
+        return false;
+    }
+    $befor_res= $this->get_time_results($mac,$label,2);
+  
+    if(!empty($befor_res)){
+        debug_log("second_dirt","间隔时间".json_encode($befor_res));
+        if($befor_res['dir']==$res){
+            $interval_time=$time-$befor_res["time"];
+            debug_log("second_dirt","间隔时间".$interval_time);
+            debug_log("second_dirt","间隔时间第一个小时,且方向相同 不做二次计算".$interval_time);
+            if($interval_time<1*60*60){
+                return false;
+            }
+        }
+
+    }else{
+        debug_log("second_dirt","数据为空,进行生成");
+    }
    
+    
+
+
    debug_log("second_dirt","最终结果".json_encode(["dirt"=>$res,"time"=>$time]));
     return ["dirt"=>$res,"time"=>$time];
 
 }
+/**
+ * Undocumented function 基站配置
+ *
+ * @param [type] $mac
+ * @return void
+ */
+
+public function get_station_config($mac){
+     
+     $Key="station_config";
+     $mackey=$mac;
+     $data=[
+        "filter_signal"=>72,//过滤信号强度
+        "history_filter_signal"=>82,//历史记录过滤的信号强度
+        "init_data"=>3, //需要三条数据确定初始方向
+        "change_data"=>5, //需要5条数据确定变换的方向
+        "timeout"=>6,
+        "second_create_res"=>[]//二次根据历史记录生成配置 数组中的参数配置  start_time,end_time,dir 1 前 2后;
+     ];
+     $config_data=$this->selectHash($Key,$mackey);
+     if(!empty($config_data)){
+        foreach($data as $key=>$value){
+            $data[$key]=empty($config_data[$key])?$data[$key]:$config_data[$key];
+        }
+     }
+    return $data;
+
+}
 
 
 
@@ -496,6 +598,45 @@ public function second_create_direction($mac,$label){
 
 
 
+
+
+    /**
+     * function 设置存储结果的时间
+     *
+     * @param [type] $mac
+     * @param [type] $label
+     * @param [type] $time
+     * @param [type] $dir 进出结果
+     * @return void
+     */
+    public function set_time_results($mac,$label,$time,$dir){
+        $mac_key=$mac."res_time";
+        $array=["time"=>$time,"dir"=>$dir];
+        $this->setHash($mac_key,$label,$array);
+    }
+    /**
+     * function 获取存储结果的时间
+     *
+     * @param [type] $mac
+     * @param [type] $label 
+     * @param [type] $type 类型
+     * @return void
+     */
+    public function get_time_results($mac,$label,$type=1){
+        $mac_key=$mac."res_time";
+        $data=$this->selectHash($mac_key,$label);
+        if(empty($data)){
+            return false;
+        }
+        if($type==1){
+            return $data["time"];
+        }else{
+            return $data;
+        }
+    }
+
+
+
     //查询key hash
     public function selectHash($hashKey,$key){
         $res= $this->redis->hGet($hashKey,$key);
@@ -511,11 +652,13 @@ public function second_create_direction($mac,$label){
       
     } 
     public function delHash($hashKey,$key){
-        $this->redis->Hdel($hashKey,$key);
+        $this->redis->hDel($hashKey,$key);
     }
 
+
+
     public function temporary_label($redis,$label,$type){
-        
+
         return;
         $key="temporary_label";
         if($type==1){
@@ -532,7 +675,7 @@ public function second_create_direction($mac,$label){
      * 解析日志
      */
     public function analysisLog(){
-        $path=app()->getRootPath()."runtime/log/2023-05-15/label_log.log";
+        $path=app()->getRootPath()."runtime/log/2023-05-28/label_log.log";
         var_dump($path);
         var_dump(is_file($path));
         $file = fopen($path, "rb");
@@ -563,8 +706,6 @@ public function second_create_direction($mac,$label){
 
     }
 
-
-
         foreach($array as $item){
             debug_log("in_label",$item);
         }
@@ -574,7 +715,7 @@ public function second_create_direction($mac,$label){
 
     }
 
-
+    
 
 
 

+ 196 - 0
catch/api/service/mysqlToKafuka.php

@@ -0,0 +1,196 @@
+<?php
+namespace catchAdmin\api\service;
+
+use think\facade\Db;
+
+//脚本衢州 从mysql中存到kafka/redis中
+ //mysql 监听器
+class mysqlToKafuka
+{
+   //1 kafka,2  redis
+    private $type;
+    
+    public function __construct($type)
+    {
+       $this->type=$type;
+
+        
+    }
+
+
+
+    
+    public function sql_monitor(){
+        $table="test";
+        $list=Db::name($table)->where("status",1)->select();
+        foreach($list as $key=>$item){
+            $table_name=$item["table_name"];
+            $type=$item["type"];
+            $table_id=$item["table_id"];
+            $this->Operation($table_name,$table_id,$type);
+        }
+
+    }
+    /**
+     * 业务处理
+     *
+     * @param [type] $table_name
+     * @param [type] $table_id
+     * @param [type] $type
+     * @return void
+     */
+    public function Operation($table_name,$table_id,$type){
+        switch($table_name)
+        {
+           case "fjd_zcdj":
+                return $this->fjd_zcdj($table_name,$table_id,$type);
+
+        }
+        
+
+
+    }
+
+    public function fjd_zcdj($table_name,$table_id,$type){
+        $data=Db::name($table_name)->where("id",$table_id)->find();
+        $json_data=false;
+
+        if(empty($data)){
+            return false;
+        }
+        if($type=="add"){
+            $json_data=[
+                "PLATE_NO"=>"测试AAAAAA",
+                "RFID_SN"=>"12345678",
+                "CAR_TYPE"=>"1",
+                "CAR_BRAND"=>"1",
+                "NAME"=>"测试A",
+                "ID_CARD_NUMBER"=>"330127199301171835",
+                "MOBILE_NUMBER"=>"15706857065",
+                "INSTA_DATE"=>"2023-05-24 09:13:16",
+                "INSTALLER"=>"超级管理员",
+                "DATA_TYPE"=>"vehicle_save"
+            ];
+                
+
+        }
+        if($type=="update"){
+            $json_data=[
+                "PLATE_NO"=>"测试AAAAAA",
+                "OLD_NO"=>"测试AAAAAA",
+                "RFID_SN"=>"77778888",
+                "CAR_TYPE"=>"1",
+                "CAR_BRAND"=>"1",
+                "NAME"=>"测试A",
+                "ID_CARD_NUMBER"=>"330127199301171835",
+                "MOBILE_NUMBER"=>"15706857065",
+                "DATA_TYPE"=>"vehicle_update"
+            ];
+            
+
+        }
+        if($type=="delete"){
+
+            $json_data=[
+                "PLATE_NO"=>"测试AAAAAA",
+                "DATA_TYPE"=>"vehicle_delete"
+            ];
+
+        }
+        return $json_data;
+
+    }
+    /**
+     * 存入kafuka
+     *
+     * @param [type] $data
+     * @return void
+     */
+    public function toKafuka($data){
+
+<<<<<<< HEAD
+        // $conf = new RdKafka\Conf();
+        // $conf->setDrMsgCb(function ($kafka, $message) {
+        //     file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
+        // });
+        // $conf->setErrorCb(function ($kafka, $err, $reason) {
+        //         file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
+        // });
+
+        // $rk = new RdKafka\Producer($conf);
+        // $rk->setLogLevel(LOG_DEBUG);
+        // $rk->addBrokers("127.0.0.1");
+        // $cf = new RdKafka\TopicConf();
+        // $cf->set('request.required.acks', 0);
+        // $topic = $rk->newTopic("test", $cf);
+        // $option = 'qkl';
+        // for ($i = 0; $i < 20; $i++) {
+        //      $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);
+        // }
+        // $len = $rk->getOutQLen();
+        //     while ($len > 0) {
+        //     $len = $rk->getOutQLen();
+            
+        //     $rk->poll(50);
+        // }
+=======
+        $conf = new RdKafka\Conf();
+        $conf->setDrMsgCb(function ($kafka, $message) {
+            file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
+        });
+        $conf->setErrorCb(function ($kafka, $err, $reason) {
+                file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
+        });
+
+        $rk = new RdKafka\Producer($conf);
+        $rk->setLogLevel(LOG_DEBUG);
+        $rk->addBrokers("127.0.0.1");
+        $cf = new RdKafka\TopicConf();
+        $cf->set('request.required.acks', 0);
+        $topic = $rk->newTopic("test", $cf);
+        $option = 'qkl';
+        for ($i = 0; $i < 20; $i++) {
+             $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);
+        }
+        $len = $rk->getOutQLen();
+            while ($len > 0) {
+            $len = $rk->getOutQLen();
+            
+            $rk->poll(50);
+        }
+>>>>>>> d836e7393f81fe9e3002d4b1e17a4169fd485999
+
+        
+
+
+
+
+    }
+      /**
+     * 存入redis
+     *
+     * @param [type] $data
+     * @return void
+     */
+    public function toRedis($data){
+
+    }
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+}

+ 2 - 1
composer.json

@@ -36,7 +36,8 @@
         "aliyun/aliyun-tablestore-sdk-php": "~5.0",
         "alibabacloud/sts-20150401": "1.0.1",
         "phpmailer/phpmailer": "^6.6",
-        "php-mqtt/client": "^1.8"
+        "php-mqtt/client": "^1.8",
+        "nmred/kafka-php": "0.2.*"
        
     },
     "require-dev": {

+ 384 - 0
task_script/MN_RL4RSSI_MQTT_CLIENT.php

@@ -0,0 +1,384 @@
+<?php
+require('../vendor/autoload.php');
+use \PhpMqtt\Client\MqttClient;
+use \PhpMqtt\Client\ConnectionSettings;
+use think\facade\Cache;
+date_default_timezone_set("PRC");
+define('HOST', '127.0.0.1');
+define('PORT', '6379');
+define('PASSWORD', '123456');
+define('DATABASE', 3);
+
+
+function app_redis()
+{
+    static $redis = null;
+    static $conn = false;
+    if (!$conn) {
+        connect: //定义标签
+        $redis = new Redis();
+        try {
+            //建立的Redis短连接,在请求结束后不会自动关闭,相当于持久连接.
+            $conn = $redis->connect(HOST, PORT);
+            $conn = $redis->auth(PASSWORD);
+            $conn = $redis->select(DATABASE);
+            // 连接成功,返回$redis对象,连接失败,返回false.
+            return ($conn === true) ? $redis : false;
+        } catch (Exception $e) {
+            return false;
+        }
+    } else {
+        // 这里假设PHP-FPM在处理一个请求的时间内,Redis连接都是可用的.
+        // 所以只在PHP-CLI下检查Redis连接的状态,进行断线重连.
+        if (php_sapi_name() === 'cli') {
+            try {
+                // ping用于检查当前连接的状态,成功时返回+PONG,失败时抛出一个RedisException对象.
+                // ping失败时警告:
+                // Warning: Redis::ping(): connect() failed: Connection refused
+                // var_dump('AAAAAAAAA', $redis);
+                echo 'Redis 连接状态' . $redis->ping() . PHP_EOL;
+                @$redis->ping();
+                if (!$redis->ping()) {
+                    goto connect; //跳转到标签出继续执行连接操作
+                }
+            } catch (Exception  $e) {
+                // 信息如 Connection lost 或 Redis server went away
+                echo $e->getMessage();
+                echo 'Redis 连接失败 重新连接:' . PHP_EOL;
+                // 断线重连
+                goto connect;
+            }
+        }
+        return $redis;
+    }
+}
+
+function rlog(...$args)
+{
+    if (empty($args[0])) {
+        return;
+    }
+    static $LOG_CONSOLE = false; //是否输出到控制台
+    static $LOG_NAME = "school_mqtt.log"; //值为空时 不写入文件
+    static $LOG_SIZE = 64 * 1024 * 1024; //文件最大尺寸
+
+    static $LOG_CACHE = false; //是否缓存日志内容 用于批量写入文件
+    static $CACHE_DURATION = 10; //缓存最大时间 秒
+    static $CACHE_SIZE = 1024; //缓存大小
+    static $cacheStartTime = 0;
+    static $cacheBuf = '';
+
+    static $LOG_TIMES = 10; //调用这个函数最大次数 超过次数后判断下文件大小
+    static $logCount = 0;
+
+
+    $buf = '';
+    if (count($args) == 1 && $args[0] == "\n") { //只有换行时 不写入时间戳了
+        $buf = "\n";
+    } else {
+        $pid = ''; //进程id
+        if (function_exists('posix_getpid')) {
+            $pid = ' ' . posix_getpid() . ' ';
+        }
+        $fileLine = ''; //文件名:行号
+        {
+            $debug = debug_backtrace();
+            $fileLine = ($pid == '' ? ' ' : '') . basename($debug[0]['file']) . ':' . $debug[0]['line'] . ' ';
+        }
+        $buf = date("y-m-d H:i:s") . "{$pid}{$fileLine}" . implode(' ', $args) . "\n";
+    }
+
+    $logCount++;
+    if (!empty($LOG_NAME)) {
+        if ($LOG_CACHE) {
+            $cacheBuf .= $buf;
+            //超过缓存尺寸 或者 超过缓存时长 写缓存到文件
+            if (strlen($cacheBuf) > $CACHE_SIZE || time() - $cacheStartTime > $CACHE_DURATION) {
+                $cacheStartTime = time();
+                goto write;
+            } else {
+                goto skipWrite;
+            }
+        } else {
+            $cacheBuf = $buf;
+        }
+        write: {
+            //超过尺寸后 删除旧文件 把新文件重命名为旧文件  多进程同时操作 不加锁问题不大
+            if ($logCount > $LOG_TIMES && filesize($LOG_NAME) > $LOG_SIZE) {
+                $oldLogName = $LOG_NAME . '.old';
+                if (file_exists($oldLogName)) {
+                    if (!unlink($oldLogName)) {
+                        echo "unlink err\n";
+                    }
+                }
+                if (!rename($LOG_NAME, $oldLogName)) {
+                    echo "rename err\n";
+                }
+                $logCount = 0;
+            }
+            if (!file_put_contents($LOG_NAME, $cacheBuf, FILE_APPEND)) {
+                echo "file_put_contents err\n";
+            }
+            $cacheBuf = '';
+        }
+        skipWrite: {
+        }
+    }
+    if ($LOG_CONSOLE) {
+        echo $buf;
+    }
+}
+
+function http($url, $params, $header = [], $method = 'GET', $timeout = 10)
+{
+    // POST $params 字符串形式query=abcd&abc=12345
+    //GET $params 数组['query' => 'abcd', 'abc' => 12345]
+    //  $header[] = "Content-Type: application/x-www-form-urlencoded";
+    //  $header[] = "Content-Type: application/soap+xml; charset=utf-8";
+    //  $header[] = "Content-Type: application/json; charset=utf-8";
+    //  $header[] = "Expect: ";
+    rlog("[HTTP] url:$url,method:$method" . ",header:" . json_encode($header));
+    if (strtoupper($method) == 'POST') {
+        rlog("[POST] send params " . (!is_array($params) ? $params : json_encode($params, JSON_UNESCAPED_UNICODE)));
+    } else {
+        rlog("[GET] send " . json_encode($params));
+    }
+    $header[] = "Expect: ";
+    $opts = array(
+        CURLOPT_TIMEOUT => $timeout,
+        CURLOPT_RETURNTRANSFER => 1,
+        CURLOPT_SSL_VERIFYPEER => false,
+        CURLOPT_SSL_VERIFYHOST => false,
+        CURLOPT_HTTPHEADER => $header
+    );
+
+    /* 根据请求类型设置特定参数 */
+    switch (strtoupper($method)) {
+        case 'GET':
+            $opts[CURLOPT_URL] = $url . (empty($params) ? '' : ('?' . http_build_query($params)));
+            break;
+        case 'POST':
+            //$params = http_build_query($params);
+            $opts[CURLOPT_URL] = $url;
+            $opts[CURLOPT_POST] = 1;
+            $opts[CURLOPT_POSTFIELDS] = json_encode($params);
+            break;
+        default:
+            rlog("[ERR] method " . $method);
+            return false;
+    }
+
+    global $ch; //curl长连接
+    if (empty($ch)) {
+        $ch = curl_init();
+    }
+    if (empty($ch)) {
+        rlog("[ERR] curl_init");
+        return false;
+    }
+
+    $csa = curl_setopt_array($ch, $opts);
+    if (empty($csa)) {
+        rlog("[ERR] curl_setopt_array");
+        return false;
+    }
+
+    $data = curl_exec($ch);
+    if ($data === false) {
+        rlog("[ERR] curl_exec errno:" . curl_errno($ch) . " " . curl_error($ch));
+        return false;
+    }
+    //unicode转中文
+    $data = decodeUnicode($data);
+    rlog("[HTTP] recv " . $data);
+    //curl_close($ch);
+    return $data;
+}
+function decodeUnicode($str)
+{
+    return preg_replace_callback('/\\\\u([0-9a-f]{4})/i', function ($matches) {
+        return iconv("UCS-2BE", "UTF-8", pack("H*", $matches[1]));
+    }, $str);
+}
+function devRegularInfo($topic, $msg)
+{
+    // {"CSQ":26,"REP_INT":60,"GPS":"112.14060,32.06532","IMEI":"867160049332715","IMSI":"460041872816952","CCID":"898607B8101980060659","SYS_VER":"RLSC_V0.1","Time":1676257143,"Status":0,"StatusMsg":"设备正常","Initiative":1}
+    $data = json_decode($msg, true);
+    if (empty($data)) {
+        rlog("ERR", "json_decode");
+        return;
+    }
+    rlog("[I]", $msg);
+    $url = 'http://47.114.185.186:8115/api/trackReport';
+    // $url .= http_build_query($column);
+    http($url, $data, [], 'POST');
+    $loc = explode(',', $data['GPS']);
+    $column = [
+        'csq' => $data['CSQ'] ?: '',
+        'rep_int' => $data['REP_INT'] ?: '',
+        'latitude' => isset($loc[0]) ? $loc[0] : '',
+        'longitude' => isset($loc[1]) ? $loc[1] : '',
+        'imei' => $data['IMEI'] ?: '',
+        'imsi' => $data['IMSI'] ?: '',
+        'iccid' => $data['ICCID'] ?: '',
+        'version' => $data['SYS_VER'] ?: '',
+        'time' => $data['Time'] ?: '',
+        'status' => $data['Status'] ?: '',
+        'status_msg' => $data['StatusMsg'] ?: '',
+        'initiative' => $data['Initiative'] ?: ''
+    ];
+
+    $url = 'http://47.114.185.186:8115/rlapi/busHeartbeatData';
+    // $url .= http_build_query($column);
+    http($url, $column, [], 'POST');
+}
+
+function getUpDevSysMsg($topic, $msg)
+{
+    // {
+    //     "IMEI": "867160049332715",
+    //     "Mqtt_Host": "develop.rltest.cn",
+    //     "Mqtt_Port": 1883,
+    //     "Mqtt_User": " rl517",
+    //     "Mqtt_Password": "rlian2022",
+    //     "TTS_TEXT": "某某科技欢迎您",
+    //     "GPS_EN": "0",
+    //     "Time": 1676257143
+    // }
+    $arr = json_decode($msg, true);
+    if (empty($arr)) {
+        rlog("ERR", "json_decode");
+        return;
+    }
+    $column = [
+        'imei' => $arr['IMEI'],
+        'mqtt_host' => $arr['Mqtt_Host'],
+        'mqtt_port' => $arr['Mqtt_Port'],
+        'mqtt_user' => $arr['Mqtt_User'],
+        'mqtt_password' => $arr['Mqtt_Password'],
+        'tts_text' => $arr['TTS_TEXT'],
+        'gps_en' => $arr['GPS_EN'],
+        'time' => $arr['Time']
+    ];
+    $url = 'http://47.114.185.186:8115/rlapi/busSysMsgData';
+    // $url .= http_build_query($data);
+    http($url, $column, [], 'POST');
+}
+function rcInfoMsg($topic, $msg)
+{
+
+    // {
+    //     "IMEI": "867160049332715",
+    //     "RC_Number": "867160049332715",
+    //     "RC_Type": 1,
+    //     "RC_Pres": 1,
+    //     "RC_Total": 25,
+    //     "GPS_X": "116.25"
+    //     "GPS_Y": "29.36",
+    //     "Msgid": "dkx12-15dss-ad567-1a2ss",
+    //     "Time": 1676257143
+    // }
+    $arr = json_decode($msg, true);
+    if (empty($arr)) {
+        rlog("ERR", "json_decode");
+        return;
+    }
+    $column = [
+        'imei' => $arr['IMEI'],
+        'rc_number' => $arr['RC_Number'],
+        'rc_type' => $arr['RC_Type'],
+        'rc_pres' => $arr['RC_Pres'],
+        'rc_total' => $arr['RC_Total'],
+        'gps_x' => $arr['GPS_X'],
+        'gps_y' => $arr['GPS_Y'],
+        'msgid' => $arr['Msgid'],
+        'time' => $arr['Time']
+    ];
+    $url = 'http://47.114.185.186:8115/rlapi/busRcInfoData';
+    // $url .= http_build_query($data);
+    http($url, $column, [], 'POST');
+}
+function loop()
+{
+    $server   = 'develop.rltest.cn';
+    $port     = 1883;
+    $clientId = 'mqttx_test1312412412';
+    $username = 'rl517';
+    $password = "rlian2022";
+    $clean_session = true;
+
+    $connectionSettings  = new ConnectionSettings();
+    $connectionSettings = $connectionSettings
+        ->setUsername($username)
+        ->setPassword($password)
+        ->setKeepAliveInterval(60)
+        // Last Will 设置
+        //        ->setLastWillTopic('emqx/test/last-will')
+        //        ->setLastWillMessage('client disconnect')
+        //        ->setLastWillQualityOfService(1)
+    ;
+
+    //include "RLog.php";
+    //    $mqtt = new MqttClient($server, $port, $clientId, MqttClient::MQTT_3_1, null, new RLog());
+
+    $mqtt = new MqttClient($server, $port, $clientId);
+
+    $mqtt->connect($connectionSettings, $clean_session);
+    rlog('INFO', "connect OK");
+
+    /*
+消息方向	设备->服务器 
+设备主动上报当前设备公共信息参数:ScBusTem/DevRegularInfo
+服务器获取设备系统信息后设备上传信息,即GetDevSysMsg的回应 ScBusTem/GetUpDevSysMsg 
+服务器设置设备重量信息信息 ScBusTem/RCInfoMsg
+*/
+    //订阅心跳数据
+    $mqtt->subscribe('RL4RSSI/devOntime', function ($topic, $message) {
+        rlog("INFO", 'recv', $topic, $message);
+        var_dump($message);
+    }, 0);
+
+
+    // $mqtt->subscribe('ScBusTem/GetDevSysMsg/*', function ($topic, $message) {
+    //     rlog("INFO", 'recv', $topic, $message);
+    //     getDevSysMsg($topic, $message);
+    // }, 0);
+    //终端上报系统信息数据
+    $mqtt->subscribe('RL4RSSI/rfidinfos4444', function ($topic, $message) {
+        rlog("INFO", 'recv', $topic, $message);
+        $data=json_decode($message,true);
+
+        if((!empty($message))&&(!empty($data['cnt']))){
+
+            mqttToRedis($message);
+        }
+      
+        //var_dump($message);
+    }, 0);
+    $mqtt->loop(true);
+}
+
+function mqttToRedis($text){
+    
+    try{
+        
+        
+         app_redis()->lpush("mqtt_data",$text);
+
+    }catch(Exception $e){
+        rlog("INFO", 'recv',"redis 异常".$e->getMessage());
+    }
+    
+} 
+
+
+while (1) {
+    try {
+        rlog('INFO', 'connect start');
+        loop();
+    } catch (\Exception $ex) {
+        rlog("ERR", $ex->getTraceAsString());
+        rlog("ERR", $ex->getMessage());
+    }
+    sleep(3);
+}