false,'message'=>'addRfidDataToNingbo failed,methond login!'); } $RF_ID=strtoupper($data['mac']); $station_cond=array('mac'=>$RF_ID); $device_name=M('stations')->where($station_cond)->getField('name'); if(!$device_name){ return array('success'=>false,'message'=>'addRfidDataToNingbo failed,station not existed!'); } if($data['time']<(time()-180)){ $onlinetime=time(); }else{ $onlinetime=$data['time']; } $save_data=array( 'online_time'=>date('Y-m-d H:i:s',$onlinetime) ); M('stations')->createSave($station_cond,$save_data); if($data['methond']!='track'){ return array('success'=>false,'message'=>'addRfidDataToNingbo failed,methond not track!'); } if(!$data['labels']){ return array('success'=>false,'message'=>'addRfidDataToNingbo failed,labels not existed!'); } //$station_sql='SELECT DEVICE_NAME FROM DSSC2.ADM_DEV WHERE LOGIN_NAME=\''.$RF_ID.'\''; //$stid = oci_parse($conn, $station_sql); //oci_define_by_name($stid, 'DEVICE_NAME', $device_name); //oci_execute($stid); //oci_fetch($stid); foreach($data['labels'] as $val){ $RF_STAT=0; $plate_no=''; if($val['event']['entry']==1){ $RF_STAT=1; }elseif($val['event']['leave']==1){ $RF_STAT=2; } $RF_FLAGID=strtoupper($val['id']); if($RF_FLAGID=='00000000'){ continue; } $RF_DATE=date('Y-m-d H:i:s',$val['time']); $sql = 'INSERT INTO "DSSC2"."W_DW_RF_RECORD"("ID", "RF_ID", "RF_FLAGID", "RF_DATE", "RF_STAT") VALUES (DSSC2.SEQ_W_DW_RF_RECORD.nextval, \''.$RF_ID.'\', \''.$RF_FLAGID.'\', TO_DATE(\''.$RF_DATE.'\', \'SYYYY-MM-DD HH24:MI:SS\'), \''.$RF_STAT.'\')'; //var_dump($sql); //插入数据到oracle轨迹表 $stid = oci_parse($conn, $sql); $r = oci_execute($stid); if(!$r){ $this->debug_log( 'insert_oracle_error', $val ); throw new \Exception('insert data to oracle false'); } $vehicle_sql='SELECT o.PLATE_NO FROM DSSC3.W_DW_NON_MOTOR o,DSSC3.W_DW_RFID_TAGS s WHERE s.RFID_SN =\''.$RF_FLAGID.'\' AND o.rfid_id = s.id '; $stid = oci_parse($conn, $vehicle_sql); oci_define_by_name($stid, 'PLATE_NO', $plate_no); oci_execute($stid); oci_fetch($stid); var_dump($plate_no); $handle_data=array( 'RF_STAT'=>$RF_STAT, 'RF_FLAGID'=>$RF_FLAGID, 'RF_ID'=>$RF_ID, 'time'=>$val['time'], 'address'=>$device_name, 'plate_no'=>$plate_no ); $this->handleTotalData($handle_data); if(!$plate_no){ continue; } //检测布控 $this->checkControlAlarm($handle_data); //违规行驶检测 超速逆行检测 $this->checkIllegalDriving($handle_data); } oci_free_statement($stid); //oci_close($conn); return array('success'=>true,'message'=>'add success'); } public function pushRfidRouteToNingbo( ){ $broker_list = C('KAFKA_BROKER_LIST'); if (empty($broker_list)) { exit("KAFKA_BROKER_LIST must be config!".PHP_EOL); } $group = C('ROUTE_INDEX_KAFKA_GROUP'); if (empty($group)) { exit("ROUTE_INDEX_KAFKA_GROUP must be config!".PHP_EOL); } $topics = C('ROUTE_INDEX_KAFKA_TOPIC'); if (empty($topics)) { exit("ROUTE_INDEX_KAFKA_TOPIC must be config!".PHP_EOL); } $topics = explode(',',$topics); // 从 topic :rlstation_rfid_location 取轨迹 $conf = new RdKafka\Conf(); // Set a rebalance callback to log partition assignments (optional)(当有新的消费进程加入或者退出消费组时,kafka 会自动重新分配分区给消费者进程,这里注册了一个回调函数,当分区被重新分配时触发) $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; var_dump($partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(NULL); break; default: throw new \Exception($err); } }); // Configure the group.id. All consumer with the same group.id will consume( 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于这个topic 分区的数量是没有意义的。) // different partitions. $conf->set('group.id', $group); // Initial list of Kafka brokers(添加 kafka集群服务器地址) $conf->set('metadata.broker.list', $broker_list); $topicConf = new RdKafka\TopicConf(); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'smallest': start from the beginning $topicConf->set('auto.offset.reset', 'latest'); // Set the configuration to use for subscribed/assigned topics $conf->setDefaultTopicConf($topicConf); $consumer = new RdKafka\KafkaConsumer($conf); // 订阅轨迹数据topic $consumer->subscribe($topics); $config = C('ORACLE_CONFIG'); if (empty($config)) { exit("ORACLE_CONFIG must be config!".PHP_EOL); } $host= $config['host']; $port= $config['port']; $instance_name= $config['instance_name']; $username= $config['username']; $password= $config['password']; /* $host= '192.168.100.23'; $port= '1521'; $instance_name= 'helowin'; $username= 'DSSC3'; $password= 'Rliandssc3'; */ $conn = oci_connect($username, $password, $host.':'.$port.'/'. $instance_name,'AL32UTF8'); if (!$conn) { $e = oci_error(); trigger_error(htmlentities($e['message'], ENT_QUOTES), E_USER_ERROR); } while (true) { //var_dump($conn); $message = $consumer->consume(120*1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: $data = json_decode($message->payload,true); if( $data ){ $this->addRfidDataToNingbo($data,$conn); //$this->addRfidDataToRenlian($data); } break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: echo "default break"; $this->debug_log( 'default_Log', $message->errstr() ); $this->debug_log( 'default_Log', $message->err ); throw new \Exception($message->errstr(), $message->err); break; } } } private function addRfidDataToRenlian( $data ){ //{"methond":"track","mac":"ffc10063","gps":{"locationState":"A","lat":0,"latType":"N","lng":0,"lngType":"E"},"labels":[{"id":"01000423","event":{"dec":8,"lowBattery":0,"entry":0,"leave":1,"in":0},"time":"160621112931"},{"id":"01000424","event":{"dec":8,"lowBattery":0,"entry":0,"leave":1,"in":0},"time":"160621113202"},{"id":"01000422","event":{"dec":20,"lowBattery":0,"entry":1,"leave":0,"in":1},"time":"160621113303"}]} //var_dump($data); if($data['methond']!='track'){ return array('success'=>false,'message'=>'addRfidDataToNingbo failed,methond not track!'); } if(!$data['labels']){ return array('success'=>false,'message'=>'addRfidDataToNingbo failed,labels not existed!'); } var_dump($data); $conn = null; $host= '115.198.203.63'; $port= '1521'; $instance_name= 'ORCL'; $username= 'DSSC3'; $password= '123456'; $conn = new PDO("oci:dbname=//".$host.":".$port."/".$instance_name,$username,$password);// PDO方式 $conn->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); $RF_ID=strtoupper($data['mac']); foreach($data['labels'] as $val){ $RF_STAT=0; if($val['event']['entry']==1){ $RF_STAT=1; }elseif($val['event']['leave']==1){ $RF_STAT=2; } $RF_FLAGID=strtoupper($val['id']); $RF_DATE=date('Y-m-d H:i:s',$val['time']); $sql = 'INSERT INTO "ROOT"."W_DW_RF_RECORD"("RF_ID", "RF_FLAGID", "RF_DATE", "RF_STAT") VALUES (\''.$RF_ID.'\', \''.$RF_FLAGID.'\', TO_DATE(\''.$RF_DATE.'\', \'SYYYY-MM-DD HH24:MI:SS\'), \''.$RF_STAT.'\')'; var_dump($recvCount . '.' . $sql); // $res = $conn -> query($sql); } if ($conn){ $conn = null; } return array('success'=>true,'message'=>'add success'); } private function addControlAlarm( $data ){ $save_data=array( 'plate_no'=>$data['plate_no'], 'rfid_sn'=>$data['RF_FLAGID'], 'address'=>$data['address'], 'alarm_type'=>$data['alarm_type'], 'created_at'=>$data['time'], 'remark'=>$data['remark'], 'state'=>1, ); $res=M('control_alarm')->createAdd($save_data); return $res; } private function checkControlAlarm( $data ){ //先检测标签是否布控 $cond=array( 'control_obj'=>array('in',[$data['RF_FLAGID'],$data['plate_no']]), 'state'=>1 ); $ve_con=M('control_manage')->where($cond)->find(); //存在布控标签 并在时间内 if($ve_con && ($data['time']>$ve_con['start_time']) && ($data['time']<$ve_con['end_time'])){ $data['alarm_type']='control'; $this->addControlAlarm($data); } //检测区域布控 $cond2=array( 'control_obj'=>array("LIKE", '%'.$data['RF_ID'].'%'), 'state'=>1 ); $sta_con=M('control_manage')->where($cond2)->find(); //存在布控基站 if($sta_con && ($data['time']>$sta_con['start_time']) && ($data['time']<$sta_con['end_time'])){ if($sta_con['bw_ids']){//存在名单 $bwIdArr=explode(',',$sta_con['bw_ids']); //获取名单内标签 $rfid_arr=M('rfid_with_bw')->where(array('bw_id'=>array('in',$bwIdArr) ))->getField('rfid',true); if($sta_con['area_type']=='1'){//禁止活动区域 if($sta_con['bw_type']=='0'){//指定黑名单禁止 if(in_array($data['RF_FLAGID'],$rfid_arr)){ //在黑名单内 告警 $data['alarm_type']='forbid_in'; $data['remark']='驶入黑名单禁入区域'; $this->addControlAlarm($data); } }else{ //白名单的不禁止 if(!in_array($data['RF_FLAGID'],$rfid_arr)){ //不在白名单内 告警 $data['alarm_type']='forbid_in'; $data['remark']='驶入禁入区域'; $this->addControlAlarm($data); } } }else{//活动区域 //指定黑名单 if(in_array($data['RF_FLAGID'],$rfid_arr)){ //在黑名单内 告警 $data['remark']='驶入活动区域'; $data['alarm_type']='activity_in'; $this->addControlAlarm($data); } } }else{ //无黑白名单布控 全部禁止 $data['remark']='驶入禁入区域'; $data['alarm_type']='forbid_in'; $this->addControlAlarm($data); } } } private function checkIllegalDriving( $data ){ $redis = Redis("nbfd_stuck_section_data","hash"); //先查询基站是否设置卡点 $cond=array('macs'=>array("LIKE", '%'.$data['RF_ID'].'%')); $section_id=M('stuck_point')->where($cond)->getField('id'); //检测是否是前置卡点 $pre_section_cond=array('pre_spot'=>$section_id); $pre_section=M('stuck_section')->where($pre_section_cond)->find(); if($pre_section){ //如果是前置卡点 则记录标签进入卡点区间时间 $key = "stuck_section_".$pre_section['id']."_".$data['RF_FLAGID']; $passInfo=json_decode($redis->get($key),true); //存在逆行进入卡点时间 且启用超速检测 if($passInfo && ($passInfo['section']=='pos') && ($pre_section['retrograde_stat']=='1')){ $data['alarm_type']='retrograde'; $data['remark']=$pre_section['name'].'逆行'; $this->addControlAlarm($data); } $redisData = array( $key =>json_encode(array( "section" => 'pre', "time" => $data['time'], ) ) ); $redis->add($redisData); return; } //检查是否是后置卡点 $pos_section_cond=array('pos_spot'=>$section_id); $pos_section=M('stuck_section')->where($pos_section_cond)->find(); if($pos_section){ //后置卡点 取标签进入卡点区间时间 $key = "stuck_section_".$pos_section['id']."_".$data['RF_FLAGID']; $passInfo=json_decode($redis->get($key),true); //存在进入卡点时间 且启用超速检测 if($passInfo && ($passInfo['section']=='pre') && ($pos_section['over_speed_stat']=='1')){ $hour= ($data['time']-$passInfo['time'])/3600; $speed=($pos_section['distance']/1000)/$hour; if($speed>$pos_section['max_speed']){ //超速行驶 $data['alarm_type']='over_speed'; $data['remark']=$pos_section['name'].'超速,速度:'. round($speed,2); $this->addControlAlarm($data); } } //存经过后置卡点时间 $redisData = array( $key =>json_encode(array( "section" => 'pos', "time" => $data['time'], ) ) ); $redis->add($redisData); return; } } private function handleTotalData( $data ){ /* $data=array( 'RF_STAT'=>$RF_STAT, 'RF_FLAGID'=>$RF_FLAGID, 'RF_ID'=>$RF_ID, 'time'=>$val['time'], 'address'=>$station_info['DEVICE_NAME'] ); */ //统计表数据添加 $to_cond=array( 'mac'=>$data['RF_ID'], 'date'=>date('Y-m-d',$data['time']) ); if(!M('station_passing')->where($to_cond)->count()){ $total_data=array( 'address'=>$data['address'], 'mac'=>$data['RF_ID'], 'date'=>date('Y-m-d',$data['time']), 'num'=>1, ); $res=M('station_passing')->createAdd($total_data); }else{ $res=M('station_passing')->where($to_cond)->setInc('num'); } return $res; } public function test( ){ $topic = C('ROUTE_INDEX_KAFKA_TOPIC'); static $rk; if (!extension_loaded('rdkafka')){ echo 'pushToKafka fail,extension of rdkafka has not installed!!'.PHP_EOL; return false; } if(!$rk){ $conf = new Rdkafka\Conf(); $conf->set('batch.num.messages', 2); //$conf->set('linger.ms', 10); //$conf->set('log_level', (string) LOG_DEBUG); //$conf->set('debug', 'all'); $conf->setErrorCb(function($producer, $msg) { printf("%s: %s\n", rd_kafka_err2str($err), $errstr); }); $conf->setDrMsgCb(function($producer, $msg) { if($msg->err) { echo 'Message delivery failed:' . $msg->errstr(); } else { echo "sent message sucessfully."; } }); $rk = new RdKafka\Producer($conf); } var_dump($topic); //var_dump(C('KAFKA_BROKER_LIST'));die; //$rk->setLogLevel(LOG_DEBUG); $rk->addBrokers(C('KAFKA_BROKER_LIST')); $topic = $rk->newTopic($topic); $res='{"methond":"track","mac":"FF0435EE","gps":{"locationState":"A","lat":0,"latType":"N","lng":0,"lngType":"E"},"labels":[{"id":"0308EC58","event":{"dec":8,"lowBattery":0,"entry":0,"leave":1,"in":0},"time":"'.time().'"},{"id":"01000424","event":{"dec":8,"lowBattery":0,"entry":0,"leave":1,"in":0},"time":"1667529922"},{"id":"01000422","event":{"dec":20,"lowBattery":0,"entry":1,"leave":0,"in":1},"time":"1667529922"}]}'; $topic->produce(RD_KAFKA_PARTITION_UA, 0,$res); $rk->poll(0); while ($rk->getOutQLen() > 0) { $rk->poll(1); } } public function test2( ){ $topic = C('ROUTE_INDEX_KAFKA_TOPIC'); static $rk; if (!extension_loaded('rdkafka')){ echo 'pushToKafka fail,extension of rdkafka has not installed!!'.PHP_EOL; return false; } if(!$rk){ $conf = new Rdkafka\Conf(); $conf->set('batch.num.messages', 2); //$conf->set('linger.ms', 10); //$conf->set('log_level', (string) LOG_DEBUG); //$conf->set('debug', 'all'); $conf->setErrorCb(function($producer, $msg) { printf("%s: %s\n", rd_kafka_err2str($err), $errstr); }); $conf->setDrMsgCb(function($producer, $msg) { if($msg->err) { echo 'Message delivery failed:' . $msg->errstr(); } else { echo "sent message sucessfully."; } }); $rk = new RdKafka\Producer($conf); } var_dump($topic); //var_dump(C('KAFKA_BROKER_LIST'));die; //$rk->setLogLevel(LOG_DEBUG); $rk->addBrokers(C('KAFKA_BROKER_LIST')); $topic = $rk->newTopic($topic); $res='{"methond":"track","mac":"FF04C526","gps":{"locationState":"A","lat":0,"latType":"N","lng":0,"lngType":"E"},"labels":[{"id":"0308EC58","event":{"dec":8,"lowBattery":0,"entry":0,"leave":1,"in":0},"time":"'.time().'"},{"id":"01000424","event":{"dec":8,"lowBattery":0,"entry":0,"leave":1,"in":0},"time":"1667529922"},{"id":"01000422","event":{"dec":20,"lowBattery":0,"entry":1,"leave":0,"in":1},"time":"1667529922"}]}'; $info = array( 'DeviceId' => '869688888888888', //'State' => (string)$data['state'], //'Speed' => $data['speed'], 'Latitude'=>30.192289977186, 'Longitude'=>120.20063757299, 'DeviceTime' => time(), 'Altitude'=>108.951, //'LBS' => $data['lbs'], //'Direction' => $data['direction'], ); $topic->produce(RD_KAFKA_PARTITION_UA, 0,$res); $rk->poll(0); while ($rk->getOutQLen() > 0) { $rk->poll(1); } } public function debug_log( $filename, $data ){ $file = SOLUTION_LOG_PATH .'/'.date("Ymd", time()) ."/".$filename.".log"; $folder=dirname($file); if (!is_dir($folder)){ mkdir($folder,0777,true); } if(is_array($data)){ $data = json_encode($data); } file_put_contents($file, '[' . date('Y-m-d H:i:s') . ']' . $data . PHP_EOL,FILE_APPEND); } }