setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; var_dump($partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(NULL); break; default: throw new \Exception($err); } }); // Configure the group.id. All consumer with the same group.id will consume // different partitions. $conf->set('group.id', $group); $conf->set('metadata.broker.list', $broker_list); $topicConf = new RdKafka\TopicConf(); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'smallest': start from the beginning $topicConf->set('auto.offset.reset', 'smallest'); // Set the configuration to use for subscribed/assigned topics $conf->setDefaultTopicConf($topicConf); $consumer = new RdKafka\KafkaConsumer($conf); // Subscribe to topic 'test' $consumer->subscribe($topics); $tmp_list = array(); $start = time(); $count = 0; //HC_YYYYMMDD_HHIISS.dat //HC_YYYYMMDD_HHIISS.md5 $localDir = C('FTP_LOCAL_DIR'); $timeFram = time(); $createTime = date('Ymd_His', $timeFram); $fileTimeInterval = C('FTP_FILE_CREATE_INTERVAL'); if(!$fileTimeInterval){ $fileTimeInterval = 30; } // $x = 0; // $sum = 0; while (true) { $message = $consumer->consume(30*1000); // $_st = microtime(TRUE); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: $locationPack = ($message->payload).'\n'; $fileName = $localDir.'/HC_'.json_decode($message->payload)->mac.$createTime . '.dat'; $runTime = time() - $timeFram; if($runTime < $fileTimeInterval){ $datRes = $this->writeRouteFile($fileName, $locationPack); }else{ $md5Res = $this->createRouteMD5file($fileName); $timeFram = time(); $createTime = date('Ymd_His', $timeFram); $datRes = $this->writeRouteFile($fileName); } break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more".PHP_EOL; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: $fileName = $localDir.'/HC_'.json_decode($message->payload)->mac.$createTime . '.dat'; $md5Res = $this->createRouteMD5file($fileName); $timeFram = time(); $createTime = date('Ymd_His', $timeFram); echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } } } private function writeRouteFile( $fileName, $data ){ //deviceid,positioninfo.longitude,positioninfo.latitude,receivetime $config = C('FTP_CONFIG'); $sRedis = Redis('open_vehicle', 'hash'); $writeData = $data; if(!file_exists($fileName)){ $writeData = 'deviceid,positioninfo.longitude,positioninfo.latitude,receivetime'.PHP_EOL.$writeData; } debug_log('route_info.log',json_encode($writeData,JSON_UNESCAPED_UNICODE)); $writeData = utf8_encode($writeData); $res = Zmcoding\FtpFile::getInstance($config)->writeFile($fileName, $writeData.PHP_EOL); return $res; } private function createRouteMD5file( $fileName ){ if(!file_exists($fileName)){ return false; } $contents = strlen( file_get_contents($fileName, null, null, 0, 1) ); if($contents == 0){ unlink($fileName); return false; } $config = C('FTP_CONFIG'); $writeData = md5_file($fileName); $pathInfo = pathinfo($fileName); $md5File = $pathInfo['dirname'].'/'.$pathInfo['filename'].'.md5'; $writeData = $writeData. ' '. $pathInfo['basename']; $res = Zmcoding\FtpFile::getInstance($config)->writeFile($md5File, $writeData); return $res; } private function getT61GpsParseReslut( $data ){ if(!$data){ echo 'data empty!'.PHP_EOL; return false; } $imei = $data['imei']['value']; if(!$imei){ echo 'imei not existed!'.PHP_EOL; return false; } if(!isset($data['lng']['value'])){ echo '['.$imei.']data.lng.value not set!'.PHP_EOL; return false; } //检测是否有打包位置参数 $is_have_pkgfields = $data['extctrl']['extctrl_conf']['is_have_pkgfields']; if(!$is_have_pkgfields){ echo '['.$imei.']is_have_pkgfields empty!'.PHP_EOL; return false; } //检测是否有打包位置参数 $pkt_count = hexdec($data['pkt_count']['hex_value']); if(!$pkt_count){ echo '['.$imei.']pkt_count empty!'.PHP_EOL; return false; } $pkt_position_params = $data['pkt_position_params']; if(!$pkt_position_params){ echo '['.$imei.']pkt_position_params empty!'.PHP_EOL; return false; } //解析基准点定位信息 $base_points_info = array( 'DeviceId' => $imei, 'AlarmStatus' => $data['alarm_status']['hex_value'],//告警状态 'DeviceStatus' => $data['device_status']['hex_value'],//设备状态 'Longitude' => $data['lng']['value'], 'Latitude' => $data['lat']['value'], 'DeviceTime' => $data['timestamp']['value'],//本次打包时,第一个终端位置数据采集时间 'Voltage' => $data['voltage']['value'],//终端外部供电电压 ); //var_dump($base_points_info); //echo 'lat_hex = '.$data['lat']['hex_value'].',lng_hex = '.$data['lng']['hex_value'].PHP_EOL; //解析打包位置参数 $list = array(); foreach($pkt_position_params as $key => $row){ $tmp = array(); $tmp['DeviceId'] = $base_points_info['DeviceId']; $tmp['Altitude'] = $row['altitude']['value']; $tmp['Speed'] = $row['speed']['value']; $tmp['Direction'] = $row['direction']['value']; $tmp['SatelliteCount'] = $row['satellite_count']['value']; if($key < 1){ $tmp['DeviceTime'] = $row['relative_time']['value'] + $base_points_info['DeviceTime']; $tmp['Latitude'] = ($row['lat']['value'] + $base_points_info['Latitude'])/1000000; $tmp['Longitude'] = ($row['lng']['value'] + $base_points_info['Longitude'])/1000000; }else{ $tmp['DeviceTime'] = $row['relative_time']['value'] + $list[$key-1]['DeviceTime']; $tmp['Latitude'] = $row['lat']['value']/1000000 + $list[$key-1]['Latitude']; $tmp['Longitude'] = $row['lng']['value']/1000000 + $list[$key-1]['Longitude']; } $list[$key] = $tmp; } return $list; } }