false,'message'=>'addRfidDataToNingbo failed,methond not track!'); } if(!$data['labels']){ return array('success'=>false,'message'=>'addRfidDataToNingbo failed,labels not existed!'); } $conn = null; $host= '61.175.203.188'; $port= '6521'; $instance_name= 'DSSC'; $username= 'dssc3'; $password= 'dssc3'; $conn = new PDO("oci:dbname=//".$host.":".$port."/".$instance_name,$username,$password);// PDO方式 $conn->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); $station_sql='SELECT DEVICE_NAME FROM DSSC2.ADM_DEV WHERE LOGIN_NAME='.$data['RF_ID']; $res = $conn->query($station_sql); $station_info = $res->fetch(PDO::FETCH_ASSOC); $RF_ID=strtoupper($data['mac']); foreach($data['labels'] as $val){ $RF_STAT=0; if($val['event']['entry']==1){ $RF_STAT=1; }elseif($val['event']['leave']==1){ $RF_STAT=2; } $RF_FLAGID=strtoupper($val['id']); $RF_DATE=date('Y-m-d H:i:s',$val['time']); $handle_data=array( 'RF_STAT'=>$RF_STAT, 'RF_FLAGID'=>$RF_FLAGID, 'RF_ID'=>$RF_ID, 'time'=>$val['time'], 'address'=>$station_info['DEVICE_NAME'] ); $this->handleTotalData($handle_data); //检测布控 $this->checkControlAlarm($handle_data,$conn); //违规行驶检测 超速逆行检测 $this->checkIllegalDriving($handle_data,$conn); $sql = 'INSERT INTO "DSSC2"."W_DW_RF_RECORD"("ID", "RF_ID", "RF_FLAGID", "RF_DATE", "RF_STAT") VALUES (DSSC2.SEQ_W_DW_RF_RECORD.nextval, \''.$RF_ID.'\', \''.$RF_FLAGID.'\', TO_DATE(\''.$RF_DATE.'\', \'SYYYY-MM-DD HH24:MI:SS\'), \''.$RF_STAT.'\')'; var_dump($sql); //插入数据到oracle轨迹表 //$res = $conn -> query($sql); } if ($conn){ $conn = null; } return array('success'=>true,'message'=>'add success'); } public function pushRfidRouteToNingbo( ){ $broker_list = C('KAFKA_BROKER_LIST'); if (empty($broker_list)) { exit("KAFKA_BROKER_LIST must be config!".PHP_EOL); } $group = C('ROUTE_INDEX_KAFKA_GROUP'); if (empty($group)) { exit("ROUTE_INDEX_KAFKA_GROUP must be config!".PHP_EOL); } $topics = 'ningbo_dahua'; if (empty($topics)) { exit("ROUTE_INDEX_KAFKA_TOPIC must be config!".PHP_EOL); } $topics = explode(',',$topics); // 从 topic :rlstation_rfid_location 取轨迹 $conf = new RdKafka\Conf(); // Set a rebalance callback to log partition assignments (optional)(当有新的消费进程加入或者退出消费组时,kafka 会自动重新分配分区给消费者进程,这里注册了一个回调函数,当分区被重新分配时触发) $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; var_dump($partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(NULL); break; default: throw new \Exception($err); } }); // Configure the group.id. All consumer with the same group.id will consume( 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于这个topic 分区的数量是没有意义的。) // different partitions. $conf->set('group.id', $group); // Initial list of Kafka brokers(添加 kafka集群服务器地址) $conf->set('metadata.broker.list', $broker_list); $topicConf = new RdKafka\TopicConf(); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'smallest': start from the beginning $topicConf->set('auto.offset.reset', 'smallest'); // Set the configuration to use for subscribed/assigned topics $conf->setDefaultTopicConf($topicConf); $consumer = new RdKafka\KafkaConsumer($conf); // 订阅轨迹数据topic $consumer->subscribe($topics); while (true) { $message = $consumer->consume(120*1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: $data = json_decode($message->payload,true); if( $data ){ //$this->addRfidDataToNingbo($data); $this->addRfidDataToRenlian($data); } break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } } } private function addRfidDataToRenlian( $data ){ //{"methond":"track","mac":"ffc10063","gps":{"locationState":"A","lat":0,"latType":"N","lng":0,"lngType":"E"},"labels":[{"id":"01000423","event":{"dec":8,"lowBattery":0,"entry":0,"leave":1,"in":0},"time":"160621112931"},{"id":"01000424","event":{"dec":8,"lowBattery":0,"entry":0,"leave":1,"in":0},"time":"160621113202"},{"id":"01000422","event":{"dec":20,"lowBattery":0,"entry":1,"leave":0,"in":1},"time":"160621113303"}]} //var_dump($data); if($data['methond']!='track'){ return array('success'=>false,'message'=>'addRfidDataToNingbo failed,methond not track!'); } if(!$data['labels']){ return array('success'=>false,'message'=>'addRfidDataToNingbo failed,labels not existed!'); } static $recvCount = 0; $recvCount++; var_dump($recvCount . ".".json_encode($data)); return; $conn = null; $host= '115.198.203.63'; $port= '1521'; $instance_name= 'ORCL'; $username= 'DSSC3'; $password= '123456'; $conn = new PDO("oci:dbname=//".$host.":".$port."/".$instance_name,$username,$password);// PDO方式 $conn->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); $RF_ID=strtoupper($data['mac']); foreach($data['labels'] as $val){ $RF_STAT=0; if($val['event']['entry']==1){ $RF_STAT=1; }elseif($val['event']['leave']==1){ $RF_STAT=2; } $RF_FLAGID=strtoupper($val['id']); $RF_DATE=date('Y-m-d H:i:s',$val['time']); $sql = 'INSERT INTO "ROOT"."W_DW_RF_RECORD"("RF_ID", "RF_FLAGID", "RF_DATE", "RF_STAT") VALUES (\''.$RF_ID.'\', \''.$RF_FLAGID.'\', TO_DATE(\''.$RF_DATE.'\', \'SYYYY-MM-DD HH24:MI:SS\'), \''.$RF_STAT.'\')'; var_dump($recvCount . '.' . $sql); // $res = $conn -> query($sql); } if ($conn){ $conn = null; } return array('success'=>true,'message'=>'add success'); } private function addControlAlarm( $data, $conn ){ $vehicle_sql='SELECT o.PLATE_NO,s.RFID_SN FROM DSSC3.W_DW_NON_MOTOR o,DSSC3.W_DW_RFID_TAGS s WHERE s.RFID_SN ='.$data['RF_FLAGID'].' AND o.rfid_id = s.id '; $res2 = $conn->query($vehicle_sql); $vehicle_info = $res2->fetch(PDO::FETCH_ASSOC); if(!$data['address']){ $station_sql='SELECT DEVICE_NAME FROM DSSC2.ADM_DEV WHERE LOGIN_NAME='.$data['RF_ID']; $res = $conn->query($station_sql); $station_info = $res->fetch(PDO::FETCH_ASSOC); $data['address']=$station_info['DEVICE_NAME']; } $save_data=array( 'plate_no'=>$vehicle_info['PLATE_NO'], 'rfid_sn'=>$data['RF_FLAGID'], 'address'=>$data['address'], 'alarm_type'=>$data['alarm_typ'], 'created_at'=>$data['time'], 'remark'=>$data['remark'] ); $res=M('control_alarm')->createAdd($save_data); return $res; } private function checkControlAlarm( $data, $conn ){ //先检测标签是否布控 $cond=array('control_obj'=>$data['RF_FLAGID']); $ve_con=M('control_manage')->where($cond)->find(); //存在布控标签 并在时间内 if($ve_con && ($val['time']>$ve_con['start_time']) && ($val['time']<$ve_con['end_time'])){ $this->addControlAlarm($data,$conn); } //检测区域布控 $cond2=array('control_obj'=>array("LIKE", '%'.$data['RF_ID'].'%')); $sta_con=M('control_manage')->where($cond2)->find(); //存在布控基站 if($sta_con && ($val['time']>$sta_con['start_time']) && ($val['time']<$sta_con['end_time'])){ if($sta_con['bw_ids']){ $bwIdArr=explode(',',$sta_con['bw_ids']); $info=M('bw_list')->where( array('id'=>array('in',$bwIdArr) ) )->find(); //判断是黑名单还是白名单 $rfid_arr=M('rfid_with_bw')->where(array('bw_id'=>array('in',$bwIdArr) ))->getField('rfid',true); if($sta_con['area_type']=='1'){ //禁止活动区域 if($info['type']=='0'){ //指定黑名单禁止 if(in_array($data['RF_FLAGID'],$rfid_arr)){ //在黑名单内 告警 $data['alarm_typ']='forbid_b'; $this->addControlAlarm($data,$conn); } }else{ //白名单的不禁止 if(!in_array($data['RF_FLAGID'],$rfid_arr)){ //不在白名单内 告警 $data['alarm_typ']='forbid_w'; $this->addControlAlarm($data,$conn); } } }else{ //活动区域 //指定黑名单 if(in_array($data['RF_FLAGID'],$rfid_arr)){ //在黑名单内 告警 $data['alarm_typ']='activity'; $this->addControlAlarm($data,$conn); } } }else{ //无黑白名单布控 全部禁止 $data['alarm_typ']='forbid_all'; $this->addControlAlarm($data,$conn); } } } private function checkIllegalDriving( $data, $conn ){ $redis = Redis("nbfd_stuck_section_data","hash"); //先查询基站是否设置卡点 $cond=array('macs'=>array("LIKE", '%'.$data['RF_ID'].'%')); $section_id=M('stuck_point')->where($cond)->getField('id'); //检测是否是前置卡点 $pre_section_cond=array('pre_spot'=>$section_id); $pre_section=M('stuck_section')->where($pre_section_cond)->find(); if($pre_section){ //如果是前置卡点 则记录标签进入卡点区间时间 $key = "stuck_section_".$pre_section['id']."_".$data['RF_FLAGID']; $passInfo=json_decode($redis->get($key),true); //存在逆行进入卡点时间 且启用超速检测 if($passInfo && ($passInfo['section']=='pos') && ($pos_section['retrograde_stat']==1)){ $data['alarm_typ']='retrograde'; $this->addControlAlarm($data,$conn); } $redisData = array( $key =>json_encode(array( "section" => 'pre', "time" => $data['time'], ) ) ); $redis->add($redisData); return; } //检查是否是后置卡点 $pos_section_cond=array('pos_spot'=>$section_id); $pos_section=M('stuck_section')->where($pos_section_cond)->find(); if($pos_section){ //后置卡点 取标签进入卡点区间时间 $key = "stuck_section_".$pos_section['id']."_".$data['RF_FLAGID']; $passInfo=json_decode($redis->get($key),true); //存在进入卡点时间 且启用超速检测 if($passInfo && ($passInfo['section']=='pre') && ($pos_section['over_speed_stat']==1)){ $hour= ($data['time']-$passInfo['time'])/3600; $speed=($pos_section['distance']/1000)/$hour; if($speed>$max_speed){ //超速行驶 $data['alarm_typ']='over_speed'; $data['remark']='速度:'. round($speed,2); $this->addControlAlarm($data,$conn); } } //存经过后置卡点时间 $redisData = array( $key =>json_encode(array( "section" => 'pos', "time" => $data['time'], ) ) ); $redis->add($redisData); return; } } private function handleTotalData( $data ){ /* $data=array( 'RF_STAT'=>$RF_STAT, 'RF_FLAGID'=>$RF_FLAGID, 'RF_ID'=>$RF_ID, 'time'=>$val['time'], 'address'=>$station_info['DEVICE_NAME'] ); */ //统计表数据添加 $to_cond=array( 'mac'=>$data['RF_ID'], 'date'=>date('Y-m-d',$data['time']); ); if(!M('station_passing')->where($to_cond)->count()){ $total_data=array( 'address'=>$data['address'], 'mac'=>$data['RF_ID'], 'date'=>date('Y-m-d',$data['time']), 'num'=>1, ); $res=M('station_passing')->createAdd($total_data); }else{ $res=M('station_passing')->where($to_cond)->setInc('num'); } return $res; } }