push() ->setPlatform('all') ->addAllAudience() ->setNotificationAlert('车辆异动告警'); try { $response = $push_payload->send(); print_r($response); } catch (\JPush\Exceptions\APIConnectionException $e) { // try something here print $e; } catch (\JPush\Exceptions\APIRequestException $e) { // try something here print $e; } } public function pushFromKafka( ){ $conf = new RdKafka\Conf(); // Set a rebalance callback to log partition assignments (optional)(当有新的消费进程加入或者退出消费组时,kafka 会自动重新分配分区给消费者进程,这里注册了一个回调函数,当分区被重新分配时触发) $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; var_dump($partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(NULL); break; default: throw new \Exception($err); } }); // Configure the group.id. All consumer with the same group.id will consume( 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于这个topic 分区的数量是没有意义的。) // different partitions. $conf->set('group.id', 'myConsumerGroup'); // Initial list of Kafka brokers(添加 kafka集群服务器地址) if( !C('KAFKA_BROKER_LIST') ){ echo 'please set broker list !!! '; } $conf->set('metadata.broker.list', C('KAFKA_BROKER_LIST')); $topicConf = new RdKafka\TopicConf(); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'smallest': start from the beginning $topicConf->set('auto.offset.reset', 'smallest'); // Set the configuration to use for subscribed/assigned topics $conf->setDefaultTopicConf($topicConf); $consumer = new RdKafka\KafkaConsumer($conf); // 消费者订阅 $consumer->subscribe(['gps_alarm_msg_queue']); while (true) { $message = $consumer->consume(120*1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: $msg_data = json_decode($message->payload,true); if($msg_data){ // 使用极光推送消息 if(!C('JPUSH_APP_KEY') || !C('JPUSH_MASTER_SECRET')){ echo 'jpush app key or secret not exists',PHP_EOL; break; } $jpush_client = new \JPush\Client( C('JPUSH_APP_KEY'), C('JPUSH_MASTER_SECRET') ); // 电子围栏告警,上锁车辆异动告警,低电量告警,被盗告警,广播消息推送 $result = $this->jpushMsg( $jpush_client, $msg_data ); if($result){ echo $result['message'],PHP_EOL; debug_log('jpush',$result['message']); } } break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } } } private function jpushMsg( $client, $msg_data ){ /* $msg_data = '{ "type":2, "title":"被盗告警", "content":"车被偷了,赶紧去找", "device_number":"FFFFFF123122" }'; $msg_data = '{ "type":9, "title":"群推消息", "content":"这是一个广播" }'; */ $single_push_type = array( \Rlfd\Alarm\PushTypeEnum::OUTAGE_ALARM, // C('OUTAGE_ALARM'),// 1-断电报警 \Rlfd\Alarm\PushTypeEnum::SOS_ALARM,// C('SOS_ALARM'), // 2-SOS报警 \Rlfd\Alarm\PushTypeEnum::LOWWER_BATTERY_ALARM,// C('LOWWER_BATTERY_ALARM'), // 3-低电量告警 \Rlfd\Alarm\PushTypeEnum::SHAKE_ALARM,// C('SHAKE_ALARM'), // 4-震动报警 \Rlfd\Alarm\PushTypeEnum::SHIFT_ALARM,// C('SHIFT_ALARM'), // 5-位移报警 \Rlfd\Alarm\PushTypeEnum::LOCK_VEHICLE_ALARM,// C('LOCK_VEHICLE_ALARM'), // 99-锁车告警 \Rlfd\Alarm\PushTypeEnum::STOLEN_ALARM,// C('STOLEN_ALARM'), // 7-被盗告警 \Rlfd\Alarm\PushTypeEnum::FENCE_ALARM,// C('FENCE_ALARM'), // 8-电子围栏告警 \Rlfd\Alarm\PushTypeEnum::CRASH_ALARM, \Rlfd\Alarm\PushTypeEnum::HIGHVOLTAGE_ALARM, \Rlfd\Alarm\PushTypeEnum::TAKEAPART_ALARM,//拆动报警 \Rlfd\Alarm\PushTypeEnum::TURN_ALARM,//翻转报警 ); if(!is_array($msg_data)){ return array( 'success' => false , 'message' => 'invalid message data format!'.$msg_data); } // 通过传过来的车牌查出 JgClientRegistrationId if( empty($client) ){ $client = new \JPush\Client( C('JPUSH_APP_KEY'), C('JPUSH_MASTER_SECRET') ); } if( in_array($msg_data['type'],$single_push_type) && $msg_data['device_number'] ){ // 判断是否在围栏告警时间间隔内 $ctlRes = $this->alarmIntervalControl($msg_data); if(!$ctlRes['success']){ return $ctlRes; } // 取车辆数据 $where = array('DeviceNumber|GpsDeviceNumber' => $msg_data['device_number']); $fields= 'CityId,LicensePlate,FullName,JgClientRegistrationId'; if( !S( 'jpush_vinfo_'.$msg_data['device_number'] ) ){ //如果不存在,则缓存 $vehicle_info = M('jms_vehicle')->field($fields)->where($where)->find(); S( 'jpush_vinfo_'.$msg_data['device_number'], $vehicle_info, 60 ); //缓存1分钟 } $vehicle_info = S( 'jpush_vinfo_'.$msg_data['device_number'] ); //从缓存取 if(!$vehicle_info['JgClientRegistrationId']){ return array('success'=> false,'message'=>'jpush registration id not exists!'); } }elseif( $msg_data['type'] == \Rlfd\Alarm\PushTypeEnum::BROADCASTING ){ //广播 }else{ return array( 'success' => false , 'message' => '未知的告警类型:'.$msg_data['type'] ); } // 推送平台 $platform = array('ios', 'android'); // 通知栏显示内容 $alert = $msg_data['content']; // 推送android $android_notification = array( 'title' => $msg_data['title'], 'builder_id' => 2, //通知栏样式 ); // 推送ios $ios_notification = array( 'sound' => 'default', 'badge' => '+1', 'content-available' => true ); // 可选项 $options = array( ); // 组装推送 $cid = $client->push()->getCid(); //var_dump($cid['body']['cidlist'][0]); $push_payload = $client->push() ->setCid($cid['body']['cidlist'][0]) ->setPlatform( $platform ) ->iosNotification($alert, $ios_notification) ->androidNotification($alert, $android_notification) ->options($options); if( $msg_data['type'] == \Rlfd\Alarm\PushTypeEnum::BROADCASTING ){ // 9-广播 $push_payload->addAllAudience(); }elseif( isset($vehicle_info) && $vehicle_info ){ // 单推 $msg_data['vehicle_info'] = $vehicle_info; $push_payload->addRegistrationId($vehicle_info['JgClientRegistrationId']); }else{ //未知类型 var_dump('出错了:'.$msg_data); return; } try { $response = $push_payload->send(); // 保存日志 $msg_data['response'] = array( 'code' => $response['http_code'], 'resp_msg' => 'ok', ); if(!$this->saveLog($msg_data)){ return array( 'success' => false , 'message' => 'add log failed 1' ); } if( $response['http_code'] == 200 ){ //如果成功了 if( C('FENCE_ALARM_INTERVAL') && $msg_data['type'] == \Rlfd\Alarm\PushTypeEnum::FENCE_ALARM ){ //设置最后围栏告警时间缓存 S('last_fence_alarm_'.$msg_data['device_number'], time(), C('FENCE_ALARM_INTERVAL')*60); } return array( 'success' => true , 'message' => 'ok' ); } } catch (\JPush\Exceptions\APIConnectionException $e) { // try something here $msg_data['response'] = array( 'code' => $e->getHttpCode(), 'resp_msg' => substr($e->__toString(), strpos($e->__toString(),'[')), ); if(!$this->saveLog($msg_data)){ return array( 'success' => false , 'message' => 'add log failed 2' ); } print $e; } catch (\JPush\Exceptions\APIRequestException $e) { // try something here $msg_data['response'] = array( 'code' => $e->getHttpCode(), 'resp_msg' => substr($e->__toString(), strpos($e->__toString(),'[')), ); if(!$this->saveLog($msg_data)){ return array( 'success' => false , 'message' => 'add log failed 3' ); } print $e; } } private function saveLog( $msg_data ){ $fdls_app_message = M('jms_baojing_message'); $log_data = array( 'ID' => create_guid(), 'Type' => $msg_data['type'], 'Title' => $msg_data['title'], 'Comment' => $msg_data['content'], 'SendStatus' => $msg_data['response']['code'], 'AddTime' => date('Y-m-d H:i:s'), 'RespMsg' => $msg_data['response']['resp_msg'], ); if( $msg_data['type'] != C('BROADCASTING') ){ // 非广播 $log_data['CityId'] = $msg_data['vehicle_info']['CityId']; $log_data['DeviceNumber'] = $msg_data['device_number']; $log_data['FullName'] = $msg_data['vehicle_info']['FullName']; $log_data['LicensePlate'] = $msg_data['vehicle_info']['LicensePlate']; } $result = $fdls_app_message->createAdd($log_data); return $result; } private function alarmIntervalControl( $msg_data ){ if(empty($msg_data)){ return array( 'success' => false , 'message' => '告警数据为空' ); } if($msg_data['type'] == \Rlfd\Alarm\PushTypeEnum::FENCE_ALARM ){ $interval = C('FENCE_ALARM_INTERVAL_SECOND') ? C('FENCE_ALARM_INTERVAL_SECOND') : 120; $type = '围栏告警'; $cache = 'last_fence_alarm_'; } if($msg_data['type'] == \Rlfd\Alarm\PushTypeEnum::OUTAGE_ALARM ){ $interval = C('OUTAGE_ALARM_INTERVAL_SECOND') ? C('OUTAGE_ALARM_INTERVAL_SECOND') : 300; $type = '断电告警'; $cache = 'last_outage_alarm_'; } if($msg_data['type'] == \Rlfd\Alarm\PushTypeEnum::FENCE_ALARM ){ $interval = C('HIGHVOLTAGE_ALARM_INTERVAL_SECOND') ? C('HIGHVOLTAGE_ALARM_INTERVAL_SECOND') : 300; $type = '高电压报警'; $cache = 'last_highvoltage_alarm_'; } if($msg_data['type'] == \Rlfd\Alarm\PushTypeEnum::SHIFT_ALARM ){ $redis = Redis('czapp_lock_status', 'hash'); $lockStatus = $redis->get($msg_data['device_number']); if($lockStatus == 1){ return array('success' => false, 'message' => $msg_data['device_number'].'已启用czapp锁车,无需推送gps位移告警'); } $interval = C('SHIFT_ALARM_INTERVAL_SECOND') ? C('SHIFT_ALARM_INTERVAL_SECOND') : 120; $type = 'gps位移报警'; $cache = 'last_shift_alarm_'; } if($msg_data['type'] == \Rlfd\Alarm\PushTypeEnum::LOCK_VEHICLE_ALARM ){ $interval = C('LOCK_ALARM_INTERVAL_SECOND') ? C('LOCK_ALARM_INTERVAL_SECOND') : 120; $type = '锁车报警'; $cache = 'last_lock_alarm_'; } if($msg_data['type'] == \Rlfd\Alarm\PushTypeEnum::TAKEAPART_ALARM ){ $interval = C('TAKEAPART_ALARM_INTERVAL_SECOND') ? C('TAKEAPART_ALARM_INTERVAL_SECOND') : 120; $type = '拆动报警'; $cache = 'last_takeapart_alarm_'; } if($msg_data['type'] == \Rlfd\Alarm\PushTypeEnum::TAKEAPART_ALARM ){ $interval = C('CRASH_ALARM_INTERVAL_SECOND') ? C('CRASH_ALARM_INTERVAL_SECOND') : 120; $type = '碰撞报警'; $cache = 'last_crash_alarm_'; } if($msg_data['type'] == \Rlfd\Alarm\PushTypeEnum::TURN_ALARM ){ $interval = C('TURN_ALARM_INTERVAL_SECOND') ? C('TURN_ALARM_INTERVAL_SECOND') : 120; $type = '翻转报警'; $cache = 'last_turn_alarm_'; } // \Rlfd\Alarm\PushTypeEnum::SHIFT_ALARM,// C('SHIFT_ALARM'), // 5-位移报警 // \Rlfd\Alarm\PushTypeEnum::LOCK_VEHICLE_ALARM,// C('LOCK_VEHICLE_ALARM'), // 6-锁车告警 return $this->getResultInterval($interval, $msg_data, $type, $cache); } public function getResultInterval( $interval, $msg_data, $type, $cache ){ $last_alarm_time = S( $cache.$msg_data['device_number'] ); if($last_alarm_time){ return array( 'success' => false , 'message' => $msg_data['device_number'] . '【'. $type .'】' . ' 不可推送,推送间隔最小为:'. $interval. '秒,最后一次告警时间:'.$last_alarm_time ); } $last_alarm_time = time(); S($cache.$msg_data['device_number'], $last_alarm_time, $interval); return array( 'success' => true , 'message' => $msg_data['device_number'] . '【'. $type .'】' .' 不在限制间隔内,可以推送'); } }