123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203 |
- <?php
- class BatteryAlarmKafkaAction extends Action {
-
-
- public function kafka_index( ){
- $broker_list = C('KAFKA_BROKER_LIST');
- if (empty($broker_list)) {
- exit("KAFKA_BROKER_LIST must be config!".PHP_EOL);
- }
- $group = C('WXT_DATA_KAFKA_GROUP');
- if (empty($group)) {
- exit("WXT_DATA_KAFKA_GROUP must be config!".PHP_EOL);
- }
- $topics = C('C61_BATTERY_ALARM_KAFKA_TOPIC');
- if (empty($topics)) {
- exit("C61_BATTERY_ALARM_KAFKA_TOPIC must be config!".PHP_EOL);
- }
- $topics = explode(',',$topics);
- $conf = new RdKafka\Conf();
- // Set a rebalance callback to log partition assignments (optional)(当有新的消费进程加入或者退出消费组时,kafka 会自动重新分配分区给消费者进程,这里注册了一个回调函数,当分区被重新分配时触发)
- $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
- switch ($err) {
- case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
- echo "Assign: ";
- var_dump($partitions);
- $kafka->assign($partitions);
- break;
-
- case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
- echo "Revoke: ";
- var_dump($partitions);
- $kafka->assign(NULL);
- break;
-
- default:
- throw new \Exception($err);
- }
- });
- // Configure the group.id. All consumer with the same group.id will consume( 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于这个topic 分区的数量是没有意义的。)
- // different partitions.
- $conf->set('group.id', $group);
- // Initial list of Kafka brokers(添加 kafka集群服务器地址)
-
- $conf->set('metadata.broker.list', $broker_list);
- $topicConf = new RdKafka\TopicConf();
- // Set where to start consuming messages when there is no initial offset in
- // offset store or the desired offset is out of range.
- // 'smallest': start from the beginning
- $topicConf->set('auto.offset.reset', 'earliest');
- // Set the configuration to use for subscribed/assigned topics
- $conf->setDefaultTopicConf($topicConf);
- $consumer = new RdKafka\KafkaConsumer($conf);
- // 订阅轨迹数据topic
- $consumer->subscribe($topics);
- while (true) {
- $message = $consumer->consume(120*1000);
- switch ($message->err) {
- case RD_KAFKA_RESP_ERR_NO_ERROR:
- $data = json_decode($message->payload,true);
- if( $data ){
- //var_dump($data);
- $result = $this->handleAlarmData($data);
- if (APP_DEBUG && isset($result['message'])) {
- echo $result['message'] . PHP_EOL;
- }
- }
- break;
- case RD_KAFKA_RESP_ERR__PARTITION_EOF:
- echo "No more messages; will wait for more\n";
- break;
- case RD_KAFKA_RESP_ERR__TIMED_OUT:
- echo "Timed out\n";
- break;
- default:
- throw new \Exception($message->errstr(), $message->err);
- break;
- }
- }
- }
-
-
- private function handleAlarmData( $data ){
-
- if(!is_array($data)){
- return array('success'=>false,'message'=>'failed,data must be array!');
- }
-
-
-
- $cond=array('imei'=>$data['tid']);
- $device_info=M('devices')->where($cond)->find();
- if(!$device_info){
- return array('success'=>false,'message'=>$data['tid'].'devices not existed');
- }
- $save_data_log=array(
- 'imei'=>$data['tid'],
- 'is_low' => $data['is_low'],
- 'is_charge' => $data['is_charge'],
- 'is_charge_full' => $data['is_charge_full'],
- 'battery_level' => $data['battery_level'],
- 'created_at'=>time(),
- );
- if($data['battery_level']==0){
- return array('success'=>false,'message'=>$data['tid'].' battery_level is 0');
- }
-
- M('battery_log')->createAdd($save_data_log);
-
- if(!$device_info['bind_id']){
- $res=M('devices')->createSave(array('imei'=>$data['tid']),array('battery_level'=> $data['battery_level']));
- return array('success'=>false,'message'=>$data['tid'].' bind_id not existed');
- }
- $this->wxMsg = new \Jiaruan\WxTmp();
- $user_id=M('vehicles')->where(array('id'=>$device_info['bind_id']))->getField('user_id');
- if(!$user_id){
- return array('success'=>false,'message'=>$data['tid'].' user_id not existed');
- }
- $send_user=M('users')->where(array('id'=>$user_id))->find();
- $alarm_state=0;
- $save_data=array('battery_level'=> $data['battery_level']);
- $alarm_time=$data['timestamp'];
- if($alarm_time<(time()-3600)){
- $alarm_time=time();
- }
-
- $alarm_data=array(
- 'device_number'=>$device_info['imei'],
- //'alarm_type'=>'low_recover',
- 'created_at'=>time(),
- 'creator_id'=>$device_info['creator_id'],
- 'alarm_time'=>date('Y-m-d H:i:s',$alarm_time),
- );
- $send_interval = C('WX_BAT_ALARM_SEND_INTERVAL');
- if (!$send_interval) {
- $send_interval=600;
- }
- $check_cond=array(
- 'device_number'=>$device_info['imei'],
- 'created_at'=>array('gt',time()-$send_interval),
- );
-
- if($data['is_low']){
- //低电量告警
- $save_data['alarm_state']=1;
- //低电量告警
- $alarm_data['alarm_type']='low_bat';
- $insertId=M('alarm_report')->createAdd($alarm_data);
-
-
- // 低电量模板ID
- $bat_template_id = C('WX_LOW_BAT_ALARM_TEMPLATE_ID');
- if (!$bat_template_id) {
- create_log('WX_LOW_BAT_ALARM_TEMPLATE_ID required', 'add_alarm');
- return array('success'=>false,'message'=>'WX_CHARGE_FULL_ALARM_TEMPLATE_ID required');
- }
-
- $sendmsg = array(
- 'touser'=>$send_user['wx_open_id'],
- 'template_id' => $bat_template_id, //"NeSrB4ViIkRMOuCtoA6BIPPi_N5L-B5zaWcIXJZwkD0",
- 'data' => [
- 'first' => [ 'value' =>$send_user['realname'].'的设备电量过低,请及时充电', 'color' => '#173177' ],
- 'keyword1' => [ 'value' => $data['tid'], 'color' => '#173177' ],
- 'keyword2' => [ 'value' => date('Y-m-d H:i:s'), 'color' => '#173177' ],
- 'remark' => [ 'value' => '', 'color' => '#173177' ],
- ],
- );
- $res=$this->wxMsg->sendMessage($sendmsg);
- create_log($device_info['imei'].'is_low user:'.$send_user['id'].' openid:'.$send_user['wx_open_id'].' res:'.json_encode($res), 'wx_push_result_log');
- //推送记录
- $push_res_data = array(
- 'username'=>$send_user['realname'],
- 'device_number'=>$device_info['imei'],
- 'result'=> json_encode($res),
- 'created_at'=>time(),
- 'alarm_id'=>$insertId,
- 'alarm_type'=>$alarm_data['alarm_type'],
- 'creator_id'=>$device_info['creator_id']
- );
- M('wx_push_result_log')->createAdd($push_res_data);
-
-
- }
- $cond=array('imei'=>$data['tid']);
- //M('users')->createSave($cond,array('battery_level'=> $data['battery_level']));
- $res=M('devices')->createSave($cond,$save_data);
- //var_dump(M('devices')->getLastSql());
- if($res){
- return array('success'=>true,'message'=>$data['tid'].' update success,battery: '.$data['battery_level']);
- }else{
- return array('success'=>false,'message'=>$data['tid'].' update failed');
- }
-
-
-
- }
-
- }
|