BatteryAlarmKafkaAction.class.php 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. <?php
  2. class BatteryAlarmKafkaAction extends Action {
  3. public function kafka_index( ){
  4. $broker_list = C('KAFKA_BROKER_LIST');
  5. if (empty($broker_list)) {
  6. exit("KAFKA_BROKER_LIST must be config!".PHP_EOL);
  7. }
  8. $group = C('WXT_DATA_KAFKA_GROUP');
  9. if (empty($group)) {
  10. exit("WXT_DATA_KAFKA_GROUP must be config!".PHP_EOL);
  11. }
  12. $topics = C('C61_BATTERY_ALARM_KAFKA_TOPIC');
  13. if (empty($topics)) {
  14. exit("C61_BATTERY_ALARM_KAFKA_TOPIC must be config!".PHP_EOL);
  15. }
  16. $topics = explode(',',$topics);
  17. $conf = new RdKafka\Conf();
  18. // Set a rebalance callback to log partition assignments (optional)(当有新的消费进程加入或者退出消费组时,kafka 会自动重新分配分区给消费者进程,这里注册了一个回调函数,当分区被重新分配时触发)
  19. $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
  20. switch ($err) {
  21. case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
  22. echo "Assign: ";
  23. var_dump($partitions);
  24. $kafka->assign($partitions);
  25. break;
  26. case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
  27. echo "Revoke: ";
  28. var_dump($partitions);
  29. $kafka->assign(NULL);
  30. break;
  31. default:
  32. throw new \Exception($err);
  33. }
  34. });
  35. // Configure the group.id. All consumer with the same group.id will consume( 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于这个topic 分区的数量是没有意义的。)
  36. // different partitions.
  37. $conf->set('group.id', $group);
  38. // Initial list of Kafka brokers(添加 kafka集群服务器地址)
  39. $conf->set('metadata.broker.list', $broker_list);
  40. $topicConf = new RdKafka\TopicConf();
  41. // Set where to start consuming messages when there is no initial offset in
  42. // offset store or the desired offset is out of range.
  43. // 'smallest': start from the beginning
  44. $topicConf->set('auto.offset.reset', 'earliest');
  45. // Set the configuration to use for subscribed/assigned topics
  46. $conf->setDefaultTopicConf($topicConf);
  47. $consumer = new RdKafka\KafkaConsumer($conf);
  48. // 订阅轨迹数据topic
  49. $consumer->subscribe($topics);
  50. while (true) {
  51. $message = $consumer->consume(120*1000);
  52. switch ($message->err) {
  53. case RD_KAFKA_RESP_ERR_NO_ERROR:
  54. $data = json_decode($message->payload,true);
  55. if( $data ){
  56. //var_dump($data);
  57. $result = $this->handleAlarmData($data);
  58. if (APP_DEBUG && isset($result['message'])) {
  59. echo $result['message'] . PHP_EOL;
  60. }
  61. }
  62. break;
  63. case RD_KAFKA_RESP_ERR__PARTITION_EOF:
  64. echo "No more messages; will wait for more\n";
  65. break;
  66. case RD_KAFKA_RESP_ERR__TIMED_OUT:
  67. echo "Timed out\n";
  68. break;
  69. default:
  70. throw new \Exception($message->errstr(), $message->err);
  71. break;
  72. }
  73. }
  74. }
  75. private function handleAlarmData( $data ){
  76. if(!is_array($data)){
  77. return array('success'=>false,'message'=>'failed,data must be array!');
  78. }
  79. $cond=array('imei'=>$data['tid']);
  80. $device_info=M('devices')->where($cond)->find();
  81. if(!$device_info){
  82. return array('success'=>false,'message'=>$data['tid'].'devices not existed');
  83. }
  84. $save_data_log=array(
  85. 'imei'=>$data['tid'],
  86. 'is_low' => $data['is_low'],
  87. 'is_charge' => $data['is_charge'],
  88. 'is_charge_full' => $data['is_charge_full'],
  89. 'battery_level' => $data['battery_level'],
  90. 'created_at'=>time(),
  91. );
  92. if($data['battery_level']==0){
  93. return array('success'=>false,'message'=>$data['tid'].' battery_level is 0');
  94. }
  95. M('battery_log')->createAdd($save_data_log);
  96. if(!$device_info['bind_id']){
  97. $res=M('devices')->createSave(array('imei'=>$data['tid']),array('battery_level'=> $data['battery_level']));
  98. return array('success'=>false,'message'=>$data['tid'].' bind_id not existed');
  99. }
  100. $this->wxMsg = new \Jiaruan\WxTmp();
  101. $user_id=M('vehicles')->where(array('id'=>$device_info['bind_id']))->getField('user_id');
  102. if(!$user_id){
  103. return array('success'=>false,'message'=>$data['tid'].' user_id not existed');
  104. }
  105. $send_user=M('users')->where(array('id'=>$user_id))->find();
  106. $alarm_state=0;
  107. $save_data=array('battery_level'=> $data['battery_level']);
  108. $alarm_time=$data['timestamp'];
  109. if($alarm_time<(time()-3600)){
  110. $alarm_time=time();
  111. }
  112. $alarm_data=array(
  113. 'device_number'=>$device_info['imei'],
  114. //'alarm_type'=>'low_recover',
  115. 'created_at'=>time(),
  116. 'creator_id'=>$device_info['creator_id'],
  117. 'alarm_time'=>date('Y-m-d H:i:s',$alarm_time),
  118. );
  119. $send_interval = C('WX_BAT_ALARM_SEND_INTERVAL');
  120. if (!$send_interval) {
  121. $send_interval=600;
  122. }
  123. $check_cond=array(
  124. 'device_number'=>$device_info['imei'],
  125. 'created_at'=>array('gt',time()-$send_interval),
  126. );
  127. if($data['is_low']){
  128. //低电量告警
  129. $save_data['alarm_state']=1;
  130. //低电量告警
  131. $alarm_data['alarm_type']='low_bat';
  132. $insertId=M('alarm_report')->createAdd($alarm_data);
  133. // 低电量模板ID
  134. $bat_template_id = C('WX_LOW_BAT_ALARM_TEMPLATE_ID');
  135. if (!$bat_template_id) {
  136. create_log('WX_LOW_BAT_ALARM_TEMPLATE_ID required', 'add_alarm');
  137. return array('success'=>false,'message'=>'WX_CHARGE_FULL_ALARM_TEMPLATE_ID required');
  138. }
  139. $sendmsg = array(
  140. 'touser'=>$send_user['wx_open_id'],
  141. 'template_id' => $bat_template_id, //"NeSrB4ViIkRMOuCtoA6BIPPi_N5L-B5zaWcIXJZwkD0",
  142. 'data' => [
  143. 'first' => [ 'value' =>$send_user['realname'].'的设备电量过低,请及时充电', 'color' => '#173177' ],
  144. 'keyword1' => [ 'value' => $data['tid'], 'color' => '#173177' ],
  145. 'keyword2' => [ 'value' => date('Y-m-d H:i:s'), 'color' => '#173177' ],
  146. 'remark' => [ 'value' => '', 'color' => '#173177' ],
  147. ],
  148. );
  149. $res=$this->wxMsg->sendMessage($sendmsg);
  150. create_log($device_info['imei'].'is_low user:'.$send_user['id'].' openid:'.$send_user['wx_open_id'].' res:'.json_encode($res), 'wx_push_result_log');
  151. //推送记录
  152. $push_res_data = array(
  153. 'username'=>$send_user['realname'],
  154. 'device_number'=>$device_info['imei'],
  155. 'result'=> json_encode($res),
  156. 'created_at'=>time(),
  157. 'alarm_id'=>$insertId,
  158. 'alarm_type'=>$alarm_data['alarm_type'],
  159. 'creator_id'=>$device_info['creator_id']
  160. );
  161. M('wx_push_result_log')->createAdd($push_res_data);
  162. }
  163. $cond=array('imei'=>$data['tid']);
  164. //M('users')->createSave($cond,array('battery_level'=> $data['battery_level']));
  165. $res=M('devices')->createSave($cond,$save_data);
  166. //var_dump(M('devices')->getLastSql());
  167. if($res){
  168. return array('success'=>true,'message'=>$data['tid'].' update success,battery: '.$data['battery_level']);
  169. }else{
  170. return array('success'=>false,'message'=>$data['tid'].' update failed');
  171. }
  172. }
  173. }