123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- <?php
- require('../vendor/autoload.php');
- use \PhpMqtt\Client\MqttClient;
- use \PhpMqtt\Client\ConnectionSettings;
- use think\facade\Cache;
- use think\facade\Db;
- date_default_timezone_set("PRC");
- // define('HOST', '127.0.0.1');
- // define('PORT', '6379');
- // define('PASSWORD', '123456');
- // define('DATABASE', 2);
- define('HOST', '172.16.222.97');
- define('PORT', '6379');
- define('PASSWORD', 'jrtk.net');
- define('DATABASE', 4);
- (new think\App())->initialize();
- function app_redis()
- {
- static $redis = null;
- static $conn = false;
- if (!$conn) {
- connect: //定义标签
- $redis = new Redis();
- try {
- //建立的Redis短连接,在请求结束后不会自动关闭,相当于持久连接.
- $conn = $redis->connect(HOST, PORT);
- $conn = $redis->auth(PASSWORD);
- $conn = $redis->select(DATABASE);
- // 连接成功,返回$redis对象,连接失败,返回false.
- return ($conn === true) ? $redis : false;
- } catch (Exception $e) {
- return false;
- }
- } else {
- // 这里假设PHP-FPM在处理一个请求的时间内,Redis连接都是可用的.
- // 所以只在PHP-CLI下检查Redis连接的状态,进行断线重连.
- if (php_sapi_name() === 'cli') {
- try {
- // ping用于检查当前连接的状态,成功时返回+PONG,失败时抛出一个RedisException对象.
- // ping失败时警告:
- // Warning: Redis::ping(): connect() failed: Connection refused
- // var_dump('AAAAAAAAA', $redis);
- // echo 'Redis 连接状态' . $redis->ping() . PHP_EOL;
- @$redis->ping();
- if (!$redis->ping()) {
- goto connect; //跳转到标签出继续执行连接操作
- }
- } catch (Exception $e) {
- // 信息如 Connection lost 或 Redis server went away
- echo $e->getMessage();
- echo 'Redis 连接失败 重新连接:' . PHP_EOL;
- // 断线重连
- goto connect;
- }
- }
- return $redis;
- }
- }
- function publishMqtt($topic,$data)
- {
- //$server = '47.114.189.154';
- $server = '172.16.222.99';
- $port = 1883;
- $clientId = 'mqtt_dji_drone_client_0815';
- $username = 'rl241107';
- $password = "rlian2024";
- $clean_session = false;
- $connectionSettings = new ConnectionSettings();
- $connectionSettings = $connectionSettings
- ->setUsername($username)
- ->setPassword($password)
- ->setKeepAliveInterval(60)
- // Last Will 设置
- // ->setLastWillTopic('emqx/test/last-will')
- // ->setLastWillMessage('client disconnect')
- // ->setLastWillQualityOfService(1)
- ;
- $mqtt = new MqttClient($server, $port, $clientId);
- $mqtt->connect($connectionSettings, $clean_session);
- echo 'connect OK'.PHP_EOL;
- echo 'topic:'.$topic.PHP_EOL;
- echo 'data:'.$data.PHP_EOL;
- $res=$mqtt->publish(
- $topic,
- $data,
- 1
- );
- echo 'publish end'.PHP_EOL;
- $mqtt->loop(true,true);
- $mqtt->disconnect();
- return $res;
- }
- while (1) {
- $jsonData= app_redis()->rpop("OSD_DRONE_LIST");
- if(!$jsonData){
- sleep(3);
- continue;
- }
- $redis_data=json_decode($jsonData,true);
- $data=$redis_data['val'];
- if(!isset($data['longitude']) || !isset($data['latitude']) || !isset($data['height'])){
- continue;
- }
- if(!isset($redis_data['ts'])){
- echo 'ts不存在';
- continue;
- }
- $save_data=[
- 'longitude'=>$data['longitude'],
- 'latitude'=>$data['latitude'],
- 'height'=>$data['height'],
- 'time'=>floor($redis_data['ts']/1000),
- 'gateway'=>$redis_data['gateway'],
- 'device_sn'=>$redis_data['from'] ,
- ];
- var_dump($save_data);
- try {
- $res = Db::connect('uav')->name('uav_route')->insertGetId($save_data);
- var_dump($res);
- if($res){
- $mqtt_data=[
- 'devId'=>$redis_data['gateway'],
- 'osid'=>$redis_data['from'],
- 'Longitude'=>$data['longitude'],
- 'Latitude'=>$data['latitude'],
- 'Height'=>$data['height'],
- 'AltitudeGeo'=>$data['height'],
- 'time'=>floor($redis_data['ts']/1000),
- 'SpeedHorizontal'=>$data['horizontal_speed'],
- 'distance'=>$data['home_distance'],
- 'SpeedVertical'=>$data['vertical_speed'],
- 'Direction'=>$data['attitude_head'],
- ];
- $topic='DJI/DATA/'.$redis_data['from'];
- publishMqtt($topic,json_encode($mqtt_data));
- }
-
- } catch (\Exception $e) {
- // 错误日志或重新连接逻辑
- echo "错误: " . $e->getMessage().PHP_EOL;
- // 重新连接逻辑(示例):
- Db::connect('uav')->close(); // 关闭现有连接
- Db::connect('uav')->connect(); // 重新连接
- $res = Db::connect('uav')->name('uav_route')->insertGetId($save_data);
- var_dump($res);
- }
- }
|