setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; var_dump($partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(NULL); break; default: throw new \Exception($err); } }); // Configure the group.id. All consumer with the same group.id will consume( 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于这个topic 分区的数量是没有意义的。) // different partitions. $conf->set('group.id', $group); // Initial list of Kafka brokers(添加 kafka集群服务器地址) $conf->set('metadata.broker.list', $broker_list); $topicConf = new RdKafka\TopicConf(); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'smallest': start from the beginning $topicConf->set('auto.offset.reset', 'earliest'); // Set the configuration to use for subscribed/assigned topics $conf->setDefaultTopicConf($topicConf); $consumer = new RdKafka\KafkaConsumer($conf); // 订阅轨迹数据topic $consumer->subscribe($topics); while (true) { $message = $consumer->consume(120*1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: $data = json_decode($message->payload,true); if( $data ){ //var_dump($data); $result = $this->handleAlarmData($data); if (APP_DEBUG && isset($result['message'])) { echo $result['message'] . PHP_EOL; } } break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } } } private function handleAlarmData( $data ){ if(!is_array($data)){ return array('success'=>false,'message'=>'failed,data must be array!'); } /* $data = array( 'tid' => $data['tid'], 'alarm' => $data['alarm'], 'is_low' => $data['is_low'], 'is_charge' => $data['is_charge'], 'is_charge_full' => $data['is_charge_full'], 'battery_level' => $data['battery_level'], 'timestamp' => $data['timestamp'] );*/ $cond=array('imei'=>$data['tid']); $device_info=M('devices')->where($cond)->find(); if(!$device_info){ return array('success'=>false,'message'=>$data['tid'].'devices not existed'); } $save_data_log=array( 'imei'=>$data['tid'], 'is_low' => $data['is_low'], 'is_charge' => $data['is_charge'], 'is_charge_full' => $data['is_charge_full'], 'battery_level' => $data['battery_level'], 'created_at'=>time(), ); if($data['battery_level']==0){ return array('success'=>false,'message'=>$data['tid'].' battery_level is 0'); } $table_name =getBatteryTableName(); M($table_name)->createAdd($save_data_log); if(!$device_info['user_id']){ $res=M('devices')->createSave(array('imei'=>$data['tid']),array('battery_level'=> $data['battery_level'])); return array('success'=>false,'message'=>$data['tid'].' userid not existed'); } $this->wxMsg = new \Jiaruan\WxTmp(); $send_user=M('users')->where(array('id'=>$device_info['user_id']))->find(); $alarm_state=0; $save_data=array('battery_level'=> $data['battery_level']); $alarm_time=$data['timestamp']; if($alarm_time<(time()-3600)){ $alarm_time=time(); } $alarm_data=array( 'device_number'=>$device_info['imei'], //'alarm_type'=>'low_recover', 'created_at'=>time(), 'creator_id'=>$device_info['creator_id'], 'alarm_time'=>date('Y-m-d H:i:s',$alarm_time), ); $send_interval = C('WX_BAT_ALARM_SEND_INTERVAL'); if (!$send_interval) { $send_interval=600; } $check_cond=array( 'device_number'=>$device_info['imei'], 'created_at'=>array('gt',time()-$send_interval), ); if($data['is_charge_full']){ //新版本充满只报一次 //充电充满 $alarm_data['alarm_type']='charge_full'; $check_cond['alarm_type']='charge_full'; if(!M('alarm_report')->where($check_cond)->count()){ $insertId=M('alarm_report')->createAdd($alarm_data); // 电量充满模板ID $full_template_id = C('WX_CHARGE_FULL_ALARM_TEMPLATE_ID'); if (!$full_template_id) { create_log('WX_CHARGE_FULL_ALARM_TEMPLATE_ID required', 'add_alarm'); return array('success'=>false,'message'=>'WX_CHARGE_FULL_ALARM_TEMPLATE_ID required'); } $sendmsg = array( 'touser'=>$send_user['wx_open_id'], 'template_id' => $full_template_id, //"NeSrB4ViIkRMOuCtoA6BIPPi_N5L-B5zaWcIXJZwkD0", 'data' => [ 'first' => [ 'value' =>$send_user['realname'].'的电子卡已充电完成', 'color' => '#173177' ], 'keyword1' => [ 'value' => $data['tid'], 'color' => '#173177' ], 'keyword2' => [ 'value' => date('Y-m-d H:i:s'), 'color' => '#173177' ], 'remark' => [ 'value' => '', 'color' => '#173177' ], ], ); $res=$this->wxMsg->sendMessage($sendmsg); create_log($device_info['imei'].' charge_full user:'.$send_user['id'].' openid:'.$send_user['wx_open_id'].' res:'.json_encode($res), 'wx_push_result_log'); //推送记录 $push_res_data = array( 'username'=>$send_user['realname'], 'device_number'=>$device_info['imei'], 'result'=> json_encode($res), 'created_at'=>time(), 'alarm_id'=>$insertId, 'alarm_type'=>$alarm_data['alarm_type'], 'creator_id'=>$device_info['creator_id'] ); M('wx_push_result_log')->createAdd($push_res_data); } } if($data['is_charge'] || $data['is_charge_full']){ $recover_cond=array( 'device_number'=>$device_info['imei'], 'alarm_reason'=>'low_bat', 'state'=>'start', ); if(M("alarm_records")->where($recover_cond)->count()){ $recover_data=array( 'end_time'=>time(), 'state'=>'end', 'result'=>'1', ); M('alarm_records')->createSave($recover_cond,$recover_data); if(M('alarm_records')->where(array('device_number'=>$device_info['imei'],'state'=>'start'))->count()){ $save_data['alarm_state']=1; }else{ $save_data['alarm_state']=0; } } /* //充电中 低电量告警恢复 $alarm_data['alarm_type']='is_charge'; $check_cond['alarm_type']='charge_full'; if(!M('alarm_report')->where($check_cond)->count()){ $insertId=M('alarm_report')->createAdd($alarm_data); $recover_cond=array( 'device_number'=>$device_info['imei'], 'alarm_reason'=>'low_bat', 'state'=>'start', ); $recover_data=array( 'end_time'=>time(), 'state'=>'end', 'result'=>'1', ); M('alarm_records')->createSave($recover_cond,$recover_data); if(M('alarm_records')->where(array('device_number'=>$device_info['imei'],'state'=>'start'))->count()){ $save_data['alarm_state']=1; }else{ $save_data['alarm_state']=0; } //充电中 推送低电量恢复模板ID $recover_template_id = C('WX_LOW_RECOVER_ALARM_TEMPLATE_ID'); if (!$recover_template_id) { create_log('WX_LOW_RECOVER_ALARM_TEMPLATE_ID required', 'add_alarm'); return false; } $sendmsg = array( 'touser'=>$send_user['wx_open_id'], 'template_id' => $recover_template_id, //"NeSrB4ViIkRMOuCtoA6BIPPi_N5L-B5zaWcIXJZwkD0", 'data' => [ 'first' => [ 'value' =>$send_user['realname'].'的电子卡正在充电', 'color' => '#173177' ], 'keyword1' => [ 'value' => $send_user['realname'], 'color' => '#173177' ], 'keyword2' => [ 'value' => $data['tid'], 'color' => '#173177' ], 'keyword3' => [ 'value' => $data['battery_level'], 'color' => '#173177' ], 'remark' => [ 'value' => '', 'color' => '#173177' ], ], ); $res=$this->wxMsg->sendMessage($sendmsg); //推送记录 $push_res_data = array( 'username'=>$send_user['realname'], 'device_number'=>$device_info['imei'], 'result'=> json_encode($res), 'created_at'=>time(), 'alarm_id'=>$insertId, 'alarm_type'=>$alarm_data['alarm_type'], 'creator_id'=>$device_info['creator_id'] ); M('wx_push_result_log')->createAdd($push_res_data); }*/ } if($data['is_low'] && !$data['is_charge']){ //低电量告警 $save_data['alarm_state']=1; //低电量告警 $alarm_data['alarm_type']='low_bat'; $insertId=M('alarm_report')->createAdd($alarm_data); $cond=array( 'device_number'=>$device_info['imei'], 'alarm_reason'=>'low_bat', 'state'=>'start', ); //如果不存在告警 则添加 if(!M("alarm_records")->where($cond)->count()){ $records_save_data=array( 'device_number'=>$device_info['imei'], 'alarm_reason'=>'low_bat', 'state'=>'start', 'start_time'=>time(), 'creator_id'=>$device_info['creator_id'] ); M('alarm_records')->createAdd($records_save_data); // 低电量模板ID $bat_template_id = C('WX_LOW_BAT_ALARM_TEMPLATE_ID'); if (!$bat_template_id) { create_log('WX_LOW_BAT_ALARM_TEMPLATE_ID required', 'add_alarm'); return array('success'=>false,'message'=>'WX_CHARGE_FULL_ALARM_TEMPLATE_ID required'); } $sendmsg = array( 'touser'=>$send_user['wx_open_id'], 'template_id' => $bat_template_id, //"NeSrB4ViIkRMOuCtoA6BIPPi_N5L-B5zaWcIXJZwkD0", 'data' => [ 'first' => [ 'value' =>$send_user['realname'].'的电子卡电量过低,请及时充电', 'color' => '#173177' ], 'keyword1' => [ 'value' => $data['tid'], 'color' => '#173177' ], 'keyword2' => [ 'value' => '低电量告警', 'color' => '#173177' ], 'keyword3' => [ 'value' => date('Y-m-d H:i:s'), 'color' => '#173177' ], 'remark' => [ 'value' => '', 'color' => '#173177' ], ], ); $res=$this->wxMsg->sendMessage($sendmsg); create_log($device_info['imei'].'is_low user:'.$send_user['id'].' openid:'.$send_user['wx_open_id'].' res:'.json_encode($res), 'wx_push_result_log'); //推送记录 $push_res_data = array( 'username'=>$send_user['realname'], 'device_number'=>$device_info['imei'], 'result'=> json_encode($res), 'created_at'=>time(), 'alarm_id'=>$insertId, 'alarm_type'=>$alarm_data['alarm_type'], 'creator_id'=>$device_info['creator_id'] ); M('wx_push_result_log')->createAdd($push_res_data); } } $cond=array('imei'=>$data['tid']); //M('users')->createSave($cond,array('battery_level'=> $data['battery_level'])); $res=M('devices')->createSave($cond,$save_data); //var_dump(M('devices')->getLastSql()); if($res){ return array('success'=>true,'message'=>$data['tid'].' update success,battery: '.$data['battery_level']); }else{ return array('success'=>false,'message'=>$data['tid'].' update failed'); } } }