setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; var_dump($partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(NULL); break; default: throw new \Exception($err); } }); // Configure the group.id. All consumer with the same group.id will consume( 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于这个topic 分区的数量是没有意义的。) // different partitions. $conf->set('group.id', $group); // Initial list of Kafka brokers(添加 kafka集群服务器地址) $conf->set('metadata.broker.list', $broker_list); $topicConf = new RdKafka\TopicConf(); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'smallest': start from the beginning $topicConf->set('auto.offset.reset', 'earliest'); // Set the configuration to use for subscribed/assigned topics $conf->setDefaultTopicConf($topicConf); $consumer = new RdKafka\KafkaConsumer($conf); // 订阅轨迹数据topic $consumer->subscribe($topics); while (true) { $message = $consumer->consume(120*1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: $data = json_decode($message->payload,true); if( $data ){ //var_dump($data); $result = $this->handleAlarmData($data); if (APP_DEBUG && isset($result['message'])) { echo $result['message'] . PHP_EOL; } } break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } } } private function handleAlarmData( $data ){ if(!is_array($data)){ return array('success'=>false,'message'=>'failed,data must be array!'); } $cond=array('imei'=>$data['tid']); $device_info=M('devices')->where($cond)->find(); if(!$device_info){ return array('success'=>false,'message'=>$data['tid'].'devices not existed'); } $save_data_log=array( 'imei'=>$data['tid'], 'is_low' => $data['is_low'], 'is_charge' => $data['is_charge'], 'is_charge_full' => $data['is_charge_full'], 'battery_level' => $data['battery_level'], 'created_at'=>time(), ); if($data['battery_level']==0){ return array('success'=>false,'message'=>$data['tid'].' battery_level is 0'); } M('battery_log')->createAdd($save_data_log); if(!$device_info['bind_id']){ $res=M('devices')->createSave(array('imei'=>$data['tid']),array('battery_level'=> $data['battery_level'])); return array('success'=>false,'message'=>$data['tid'].' bind_id not existed'); } $this->wxMsg = new \Jiaruan\WxTmp(); $user_id=M('vehicles')->where(array('id'=>$device_info['bind_id']))->getField('user_id'); if(!$user_id){ return array('success'=>false,'message'=>$data['tid'].' user_id not existed'); } $send_user=M('users')->where(array('id'=>$user_id))->find(); $alarm_state=0; $save_data=array('battery_level'=> $data['battery_level']); $alarm_time=$data['timestamp']; if($alarm_time<(time()-3600)){ $alarm_time=time(); } $alarm_data=array( 'device_number'=>$device_info['imei'], //'alarm_type'=>'low_recover', 'created_at'=>time(), 'creator_id'=>$device_info['creator_id'], 'alarm_time'=>date('Y-m-d H:i:s',$alarm_time), ); $send_interval = C('WX_BAT_ALARM_SEND_INTERVAL'); if (!$send_interval) { $send_interval=600; } $check_cond=array( 'device_number'=>$device_info['imei'], 'created_at'=>array('gt',time()-$send_interval), ); if($data['is_low']){ //低电量告警 $save_data['alarm_state']=1; //低电量告警 $alarm_data['alarm_type']='low_bat'; $insertId=M('alarm_report')->createAdd($alarm_data); // 低电量模板ID $bat_template_id = C('WX_LOW_BAT_ALARM_TEMPLATE_ID'); if (!$bat_template_id) { create_log('WX_LOW_BAT_ALARM_TEMPLATE_ID required', 'add_alarm'); return array('success'=>false,'message'=>'WX_CHARGE_FULL_ALARM_TEMPLATE_ID required'); } $sendmsg = array( 'touser'=>$send_user['wx_open_id'], 'template_id' => $bat_template_id, //"NeSrB4ViIkRMOuCtoA6BIPPi_N5L-B5zaWcIXJZwkD0", 'data' => [ 'first' => [ 'value' =>$send_user['realname'].'的设备电量过低,请及时充电', 'color' => '#173177' ], 'keyword1' => [ 'value' => $data['tid'], 'color' => '#173177' ], 'keyword2' => [ 'value' => date('Y-m-d H:i:s'), 'color' => '#173177' ], 'remark' => [ 'value' => '', 'color' => '#173177' ], ], ); $res=$this->wxMsg->sendMessage($sendmsg); create_log($device_info['imei'].'is_low user:'.$send_user['id'].' openid:'.$send_user['wx_open_id'].' res:'.json_encode($res), 'wx_push_result_log'); //推送记录 $push_res_data = array( 'username'=>$send_user['realname'], 'device_number'=>$device_info['imei'], 'result'=> json_encode($res), 'created_at'=>time(), 'alarm_id'=>$insertId, 'alarm_type'=>$alarm_data['alarm_type'], 'creator_id'=>$device_info['creator_id'] ); M('wx_push_result_log')->createAdd($push_res_data); } $cond=array('imei'=>$data['tid']); //M('users')->createSave($cond,array('battery_level'=> $data['battery_level'])); $res=M('devices')->createSave($cond,$save_data); //var_dump(M('devices')->getLastSql()); if($res){ return array('success'=>true,'message'=>$data['tid'].' update success,battery: '.$data['battery_level']); }else{ return array('success'=>false,'message'=>$data['tid'].' update failed'); } } }