initialize(); function app_redis() { static $redis = null; static $conn = false; if (!$conn) { connect: //定义标签 $redis = new Redis(); try { //建立的Redis短连接,在请求结束后不会自动关闭,相当于持久连接. $conn = $redis->connect(HOST, PORT); $conn = $redis->auth(PASSWORD); $conn = $redis->select(DATABASE); // 连接成功,返回$redis对象,连接失败,返回false. return ($conn === true) ? $redis : false; } catch (Exception $e) { return false; } } else { // 这里假设PHP-FPM在处理一个请求的时间内,Redis连接都是可用的. // 所以只在PHP-CLI下检查Redis连接的状态,进行断线重连. if (php_sapi_name() === 'cli') { try { // ping用于检查当前连接的状态,成功时返回+PONG,失败时抛出一个RedisException对象. // ping失败时警告: // Warning: Redis::ping(): connect() failed: Connection refused // var_dump('AAAAAAAAA', $redis); // echo 'Redis 连接状态' . $redis->ping() . PHP_EOL; @$redis->ping(); if (!$redis->ping()) { goto connect; //跳转到标签出继续执行连接操作 } } catch (Exception $e) { // 信息如 Connection lost 或 Redis server went away echo $e->getMessage(); echo 'Redis 连接失败 重新连接:' . PHP_EOL; // 断线重连 goto connect; } } return $redis; } } function publishMqtt($topic,$data) { //$server = '47.114.189.154'; $server = '172.16.222.99'; $port = 1883; $clientId = 'mqtt_dji_drone_client_0815'; $username = 'rl241107'; $password = "rlian2024"; $clean_session = false; $connectionSettings = new ConnectionSettings(); $connectionSettings = $connectionSettings ->setUsername($username) ->setPassword($password) ->setKeepAliveInterval(60) // Last Will 设置 // ->setLastWillTopic('emqx/test/last-will') // ->setLastWillMessage('client disconnect') // ->setLastWillQualityOfService(1) ; $mqtt = new MqttClient($server, $port, $clientId); $mqtt->connect($connectionSettings, $clean_session); echo 'connect OK'.PHP_EOL; echo 'topic:'.$topic.PHP_EOL; echo 'data:'.$data.PHP_EOL; $res=$mqtt->publish( $topic, $data, 1 ); echo 'publish end'.PHP_EOL; $mqtt->loop(true,true); $mqtt->disconnect(); return $res; } while (1) { $jsonData= app_redis()->rpop("OSD_DRONE_LIST"); if(!$jsonData){ sleep(3); continue; } $redis_data=json_decode($jsonData,true); $data=$redis_data['val']; if(!isset($data['longitude']) || !isset($data['latitude']) || !isset($data['height'])){ continue; } if(!isset($redis_data['ts'])){ echo 'ts不存在'; continue; } $save_data=[ 'longitude'=>$data['longitude'], 'latitude'=>$data['latitude'], 'height'=>$data['height'], 'time'=>floor($redis_data['ts']/1000), 'gateway'=>$redis_data['gateway'], 'device_sn'=>$redis_data['from'] , ]; var_dump($save_data); try { $res = Db::connect('uav')->name('uav_route')->insertGetId($save_data); var_dump($res); if($res){ $mqtt_data=[ 'devId'=>$redis_data['gateway'], 'osid'=>$redis_data['from'], 'Longitude'=>$data['longitude'], 'Latitude'=>$data['latitude'], 'Height'=>$data['height'], 'AltitudeGeo'=>$data['height'], 'time'=>floor($redis_data['ts']/1000), 'SpeedHorizontal'=>$data['horizontal_speed'], 'distance'=>$data['home_distance'], 'SpeedVertical'=>$data['vertical_speed'], 'Direction'=>$data['attitude_head'], ]; $topic='DJI/DATA/'.$redis_data['from']; publishMqtt($topic,json_encode($mqtt_data)); } } catch (\Exception $e) { // 错误日志或重新连接逻辑 echo "错误: " . $e->getMessage().PHP_EOL; // 重新连接逻辑(示例): Db::connect('uav')->close(); // 关闭现有连接 Db::connect('uav')->connect(); // 重新连接 $res = Db::connect('uav')->name('uav_route')->insertGetId($save_data); var_dump($res); } }