BatteryAlarmKafkaAction.class.php 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. <?php
  2. class BatteryAlarmKafkaAction extends Action {
  3. public function kafka_index( ){
  4. $broker_list = C('KAFKA_BROKER_LIST');
  5. if (empty($broker_list)) {
  6. exit("KAFKA_BROKER_LIST must be config!".PHP_EOL);
  7. }
  8. $group = C('WXT_DATA_KAFKA_GROUP');
  9. if (empty($group)) {
  10. exit("WXT_DATA_KAFKA_GROUP must be config!".PHP_EOL);
  11. }
  12. $topics = C('C61_BATTERY_ALARM_KAFKA_TOPIC');
  13. if (empty($topics)) {
  14. exit("C61_BATTERY_ALARM_KAFKA_TOPIC must be config!".PHP_EOL);
  15. }
  16. $topics = explode(',',$topics);
  17. $conf = new RdKafka\Conf();
  18. // Set a rebalance callback to log partition assignments (optional)(当有新的消费进程加入或者退出消费组时,kafka 会自动重新分配分区给消费者进程,这里注册了一个回调函数,当分区被重新分配时触发)
  19. $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
  20. switch ($err) {
  21. case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
  22. echo "Assign: ";
  23. var_dump($partitions);
  24. $kafka->assign($partitions);
  25. break;
  26. case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
  27. echo "Revoke: ";
  28. var_dump($partitions);
  29. $kafka->assign(NULL);
  30. break;
  31. default:
  32. throw new \Exception($err);
  33. }
  34. });
  35. // Configure the group.id. All consumer with the same group.id will consume( 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于这个topic 分区的数量是没有意义的。)
  36. // different partitions.
  37. $conf->set('group.id', $group);
  38. // Initial list of Kafka brokers(添加 kafka集群服务器地址)
  39. $conf->set('metadata.broker.list', $broker_list);
  40. $topicConf = new RdKafka\TopicConf();
  41. // Set where to start consuming messages when there is no initial offset in
  42. // offset store or the desired offset is out of range.
  43. // 'smallest': start from the beginning
  44. $topicConf->set('auto.offset.reset', 'earliest');
  45. // Set the configuration to use for subscribed/assigned topics
  46. $conf->setDefaultTopicConf($topicConf);
  47. $consumer = new RdKafka\KafkaConsumer($conf);
  48. // 订阅轨迹数据topic
  49. $consumer->subscribe($topics);
  50. while (true) {
  51. $message = $consumer->consume(120*1000);
  52. switch ($message->err) {
  53. case RD_KAFKA_RESP_ERR_NO_ERROR:
  54. $data = json_decode($message->payload,true);
  55. if( $data ){
  56. //var_dump($data);
  57. $result = $this->handleAlarmData($data);
  58. if (APP_DEBUG && isset($result['message'])) {
  59. echo $result['message'] . PHP_EOL;
  60. }
  61. }
  62. break;
  63. case RD_KAFKA_RESP_ERR__PARTITION_EOF:
  64. echo "No more messages; will wait for more\n";
  65. break;
  66. case RD_KAFKA_RESP_ERR__TIMED_OUT:
  67. echo "Timed out\n";
  68. break;
  69. default:
  70. throw new \Exception($message->errstr(), $message->err);
  71. break;
  72. }
  73. }
  74. }
  75. private function handleAlarmData( $data ){
  76. if(!is_array($data)){
  77. return array('success'=>false,'message'=>'failed,data must be array!');
  78. }
  79. /*
  80. $data = array(
  81. 'tid' => $data['tid'],
  82. 'alarm' => $data['alarm'],
  83. 'is_low' => $data['is_low'],
  84. 'is_charge' => $data['is_charge'],
  85. 'is_charge_full' => $data['is_charge_full'],
  86. 'battery_level' => $data['battery_level'],
  87. 'timestamp' => $data['timestamp']
  88. );*/
  89. $cond=array('imei'=>$data['tid']);
  90. $device_info=M('devices')->where($cond)->find();
  91. if(!$device_info){
  92. return array('success'=>false,'message'=>$data['tid'].'devices not existed');
  93. }
  94. $save_data_log=array(
  95. 'imei'=>$data['tid'],
  96. 'is_low' => $data['is_low'],
  97. 'is_charge' => $data['is_charge'],
  98. 'is_charge_full' => $data['is_charge_full'],
  99. 'battery_level' => $data['battery_level'],
  100. 'created_at'=>time(),
  101. );
  102. if($data['battery_level']==0){
  103. return array('success'=>false,'message'=>$data['tid'].' battery_level is 0');
  104. }
  105. $table_name =getBatteryTableName();
  106. M($table_name)->createAdd($save_data_log);
  107. if(!$device_info['user_id']){
  108. $res=M('devices')->createSave(array('imei'=>$data['tid']),array('battery_level'=> $data['battery_level']));
  109. return array('success'=>false,'message'=>$data['tid'].' userid not existed');
  110. }
  111. $this->wxMsg = new \Jiaruan\WxTmp();
  112. $send_user=M('users')->where(array('id'=>$device_info['user_id']))->find();
  113. $alarm_state=0;
  114. $save_data=array('battery_level'=> $data['battery_level']);
  115. $alarm_time=$data['timestamp'];
  116. if($alarm_time<(time()-3600)){
  117. $alarm_time=time();
  118. }
  119. $alarm_data=array(
  120. 'device_number'=>$device_info['imei'],
  121. //'alarm_type'=>'low_recover',
  122. 'created_at'=>time(),
  123. 'creator_id'=>$device_info['creator_id'],
  124. 'alarm_time'=>date('Y-m-d H:i:s',$alarm_time),
  125. );
  126. $send_interval = C('WX_BAT_ALARM_SEND_INTERVAL');
  127. if (!$send_interval) {
  128. $send_interval=600;
  129. }
  130. $check_cond=array(
  131. 'device_number'=>$device_info['imei'],
  132. 'created_at'=>array('gt',time()-$send_interval),
  133. );
  134. if($data['is_charge_full']){
  135. //新版本充满只报一次
  136. //充电充满
  137. $alarm_data['alarm_type']='charge_full';
  138. $check_cond['alarm_type']='charge_full';
  139. if(!M('alarm_report')->where($check_cond)->count()){
  140. $insertId=M('alarm_report')->createAdd($alarm_data);
  141. // 电量充满模板ID
  142. $full_template_id = C('WX_CHARGE_FULL_ALARM_TEMPLATE_ID');
  143. if (!$full_template_id) {
  144. create_log('WX_CHARGE_FULL_ALARM_TEMPLATE_ID required', 'add_alarm');
  145. return array('success'=>false,'message'=>'WX_CHARGE_FULL_ALARM_TEMPLATE_ID required');
  146. }
  147. $sendmsg = array(
  148. 'touser'=>$send_user['wx_open_id'],
  149. 'template_id' => $full_template_id, //"NeSrB4ViIkRMOuCtoA6BIPPi_N5L-B5zaWcIXJZwkD0",
  150. 'data' => [
  151. 'first' => [ 'value' =>$send_user['realname'].'的电子卡已充电完成', 'color' => '#173177' ],
  152. 'keyword1' => [ 'value' => $data['tid'], 'color' => '#173177' ],
  153. 'keyword2' => [ 'value' => date('Y-m-d H:i:s'), 'color' => '#173177' ],
  154. 'remark' => [ 'value' => '', 'color' => '#173177' ],
  155. ],
  156. );
  157. $res=$this->wxMsg->sendMessage($sendmsg);
  158. create_log($device_info['imei'].' charge_full user:'.$send_user['id'].' openid:'.$send_user['wx_open_id'].' res:'.json_encode($res), 'wx_push_result_log');
  159. //推送记录
  160. $push_res_data = array(
  161. 'username'=>$send_user['realname'],
  162. 'device_number'=>$device_info['imei'],
  163. 'result'=> json_encode($res),
  164. 'created_at'=>time(),
  165. 'alarm_id'=>$insertId,
  166. 'alarm_type'=>$alarm_data['alarm_type'],
  167. 'creator_id'=>$device_info['creator_id']
  168. );
  169. M('wx_push_result_log')->createAdd($push_res_data);
  170. }
  171. }
  172. if($data['is_charge'] || $data['is_charge_full']){
  173. $recover_cond=array(
  174. 'device_number'=>$device_info['imei'],
  175. 'alarm_reason'=>'low_bat',
  176. 'state'=>'start',
  177. );
  178. if(M("alarm_records")->where($recover_cond)->count()){
  179. $recover_data=array(
  180. 'end_time'=>time(),
  181. 'state'=>'end',
  182. 'result'=>'1',
  183. );
  184. M('alarm_records')->createSave($recover_cond,$recover_data);
  185. if(M('alarm_records')->where(array('device_number'=>$device_info['imei'],'state'=>'start'))->count()){
  186. $save_data['alarm_state']=1;
  187. }else{
  188. $save_data['alarm_state']=0;
  189. }
  190. }
  191. /* //充电中 低电量告警恢复
  192. $alarm_data['alarm_type']='is_charge';
  193. $check_cond['alarm_type']='charge_full';
  194. if(!M('alarm_report')->where($check_cond)->count()){
  195. $insertId=M('alarm_report')->createAdd($alarm_data);
  196. $recover_cond=array(
  197. 'device_number'=>$device_info['imei'],
  198. 'alarm_reason'=>'low_bat',
  199. 'state'=>'start',
  200. );
  201. $recover_data=array(
  202. 'end_time'=>time(),
  203. 'state'=>'end',
  204. 'result'=>'1',
  205. );
  206. M('alarm_records')->createSave($recover_cond,$recover_data);
  207. if(M('alarm_records')->where(array('device_number'=>$device_info['imei'],'state'=>'start'))->count()){
  208. $save_data['alarm_state']=1;
  209. }else{
  210. $save_data['alarm_state']=0;
  211. }
  212. //充电中 推送低电量恢复模板ID
  213. $recover_template_id = C('WX_LOW_RECOVER_ALARM_TEMPLATE_ID');
  214. if (!$recover_template_id) {
  215. create_log('WX_LOW_RECOVER_ALARM_TEMPLATE_ID required', 'add_alarm');
  216. return false;
  217. }
  218. $sendmsg = array(
  219. 'touser'=>$send_user['wx_open_id'],
  220. 'template_id' => $recover_template_id, //"NeSrB4ViIkRMOuCtoA6BIPPi_N5L-B5zaWcIXJZwkD0",
  221. 'data' => [
  222. 'first' => [ 'value' =>$send_user['realname'].'的电子卡正在充电', 'color' => '#173177' ],
  223. 'keyword1' => [ 'value' => $send_user['realname'], 'color' => '#173177' ],
  224. 'keyword2' => [ 'value' => $data['tid'], 'color' => '#173177' ],
  225. 'keyword3' => [ 'value' => $data['battery_level'], 'color' => '#173177' ],
  226. 'remark' => [ 'value' => '', 'color' => '#173177' ],
  227. ],
  228. );
  229. $res=$this->wxMsg->sendMessage($sendmsg);
  230. //推送记录
  231. $push_res_data = array(
  232. 'username'=>$send_user['realname'],
  233. 'device_number'=>$device_info['imei'],
  234. 'result'=> json_encode($res),
  235. 'created_at'=>time(),
  236. 'alarm_id'=>$insertId,
  237. 'alarm_type'=>$alarm_data['alarm_type'],
  238. 'creator_id'=>$device_info['creator_id']
  239. );
  240. M('wx_push_result_log')->createAdd($push_res_data);
  241. }*/
  242. }
  243. if($data['is_low'] && !$data['is_charge']){
  244. //低电量告警
  245. $save_data['alarm_state']=1;
  246. //低电量告警
  247. $alarm_data['alarm_type']='low_bat';
  248. $insertId=M('alarm_report')->createAdd($alarm_data);
  249. $cond=array(
  250. 'device_number'=>$device_info['imei'],
  251. 'alarm_reason'=>'low_bat',
  252. 'state'=>'start',
  253. );
  254. //如果不存在告警 则添加
  255. if(!M("alarm_records")->where($cond)->count()){
  256. $records_save_data=array(
  257. 'device_number'=>$device_info['imei'],
  258. 'alarm_reason'=>'low_bat',
  259. 'state'=>'start',
  260. 'start_time'=>time(),
  261. 'creator_id'=>$device_info['creator_id']
  262. );
  263. M('alarm_records')->createAdd($records_save_data);
  264. // 低电量模板ID
  265. $bat_template_id = C('WX_LOW_BAT_ALARM_TEMPLATE_ID');
  266. if (!$bat_template_id) {
  267. create_log('WX_LOW_BAT_ALARM_TEMPLATE_ID required', 'add_alarm');
  268. return array('success'=>false,'message'=>'WX_CHARGE_FULL_ALARM_TEMPLATE_ID required');
  269. }
  270. $sendmsg = array(
  271. 'touser'=>$send_user['wx_open_id'],
  272. 'template_id' => $bat_template_id, //"NeSrB4ViIkRMOuCtoA6BIPPi_N5L-B5zaWcIXJZwkD0",
  273. 'data' => [
  274. 'first' => [ 'value' =>$send_user['realname'].'的电子卡电量过低,请及时充电', 'color' => '#173177' ],
  275. 'keyword1' => [ 'value' => $data['tid'], 'color' => '#173177' ],
  276. 'keyword2' => [ 'value' => '低电量告警', 'color' => '#173177' ],
  277. 'keyword3' => [ 'value' => date('Y-m-d H:i:s'), 'color' => '#173177' ],
  278. 'remark' => [ 'value' => '', 'color' => '#173177' ],
  279. ],
  280. );
  281. $res=$this->wxMsg->sendMessage($sendmsg);
  282. create_log($device_info['imei'].'is_low user:'.$send_user['id'].' openid:'.$send_user['wx_open_id'].' res:'.json_encode($res), 'wx_push_result_log');
  283. //推送记录
  284. $push_res_data = array(
  285. 'username'=>$send_user['realname'],
  286. 'device_number'=>$device_info['imei'],
  287. 'result'=> json_encode($res),
  288. 'created_at'=>time(),
  289. 'alarm_id'=>$insertId,
  290. 'alarm_type'=>$alarm_data['alarm_type'],
  291. 'creator_id'=>$device_info['creator_id']
  292. );
  293. M('wx_push_result_log')->createAdd($push_res_data);
  294. }
  295. }
  296. $cond=array('imei'=>$data['tid']);
  297. //M('users')->createSave($cond,array('battery_level'=> $data['battery_level']));
  298. $res=M('devices')->createSave($cond,$save_data);
  299. //var_dump(M('devices')->getLastSql());
  300. if($res){
  301. return array('success'=>true,'message'=>$data['tid'].' update success,battery: '.$data['battery_level']);
  302. }else{
  303. return array('success'=>false,'message'=>$data['tid'].' update failed');
  304. }
  305. }
  306. }