setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; var_dump($partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(NULL); break; default: throw new \Exception($err); } }); // Configure the group.id. All consumer with the same group.id will consume // different partitions. $conf->set('group.id', $group); $conf->set('metadata.broker.list', $broker_list); $topicConf = new RdKafka\TopicConf(); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'smallest': start from the beginning $topicConf->set('auto.offset.reset', 'smallest'); // Set the configuration to use for subscribed/assigned topics $conf->setDefaultTopicConf($topicConf); $consumer = new RdKafka\KafkaConsumer($conf); // Subscribe to topic 'test' $consumer->subscribe($topics); $tmp_list = array(); $start = time(); $count = 0; //HC_YYYYMMDD_HHIISS.dat //HC_YYYYMMDD_HHIISS.md5 $localDir = C('FTP_LOCAL_DIR'); $timeFram = time(); $createTime = date('Ymd_His', $timeFram); $fileTimeInterval = C('FTP_FILE_CREATE_INTERVAL'); if(!$fileTimeInterval){ $fileTimeInterval = 30; } // $x = 0; $sum = 0; $locationPack = ''; $total = 0; //总数10000个数 消息总数 = total*10000+endsum $endsum = 0; //最后一次数量 while (true) { $message = $consumer->consume(30*1000); // $_st = microtime(TRUE); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: $locationPack .= ($message->payload) . PHP_EOL; $fileName = $localDir . '/HC_' . $createTime . '.dat'; $runTime = time() - $timeFram; $sum++; if($sum % 10000 ==0){ echo $sum . PHP_EOL; } if ($runTime < $fileTimeInterval) { if ($sum % 50000 == 0) { //echo 'start write routefile...' . PHP_EOL; echo 'locationPack length ' .strlen($locationPack).PHP_EOL; $datRes = $this->writeRouteFile($fileName, $locationPack); if ($datRes) { $locationPack = ''; $sum = 0; $total++; } } } else { $md5Res = $this->createRouteMD5file($fileName); $timeFram = time(); $createTime = date('Ymd_His', $timeFram); } break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: //echo "No more messages; will wait for more".PHP_EOL; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: $fileName = $localDir . '/HC_' . $createTime . '.dat'; if ($locationPack != '') { $datRes = $this->writeRouteFile($fileName, $locationPack); if ($datRes) { $endsum = $sum; $locationPack = ''; $sum = 0; } } $md5Res = $this->createRouteMD5file($fileName); $timeFram = time(); $createTime = date('Ymd_His', $timeFram); echo "Timed out\n"; echo "消息总数total:" . ($total * 50000 + $endsum) . PHP_EOL; break; default: throw new \Exception($message->errstr(), $message->err); break; } } } private function writeRouteFile( $fileName, $data ){ //deviceid,positioninfo.longitude,positioninfo.latitude,receivetime $config = C('FTP_CONFIG'); debug_log('route_info', $data); $res = Zmcoding\FtpFile::getInstance($config)->writeFile($fileName, $data); return $res; } private function createRouteMD5file( $fileName ){ if(!file_exists($fileName)){ return false; } $contents = strlen( file_get_contents($fileName, null, null, 0, 1) ); if($contents == 0){ unlink($fileName); return false; } $config = C('FTP_CONFIG'); //$writeData = md5_file($fileName); $pathInfo = pathinfo($fileName); $md5File = $pathInfo['dirname'].'/'.$pathInfo['filename'].'.md5'; //$writeData = $writeData. ' '. $pathInfo['basename']; $res = Zmcoding\FtpFile::getInstance($config)->writeFile($md5File, ''); return $res; } private function getT61GpsParseReslut( $data ){ if(!$data){ echo 'data empty!'.PHP_EOL; return false; } $imei = $data['imei']['value']; if(!$imei){ echo 'imei not existed!'.PHP_EOL; return false; } if(!isset($data['lng']['value'])){ echo '['.$imei.']data.lng.value not set!'.PHP_EOL; return false; } //检测是否有打包位置参数 $is_have_pkgfields = $data['extctrl']['extctrl_conf']['is_have_pkgfields']; if(!$is_have_pkgfields){ echo '['.$imei.']is_have_pkgfields empty!'.PHP_EOL; return false; } //检测是否有打包位置参数 $pkt_count = hexdec($data['pkt_count']['hex_value']); if(!$pkt_count){ echo '['.$imei.']pkt_count empty!'.PHP_EOL; return false; } $pkt_position_params = $data['pkt_position_params']; if(!$pkt_position_params){ echo '['.$imei.']pkt_position_params empty!'.PHP_EOL; return false; } //解析基准点定位信息 $base_points_info = array( 'DeviceId' => $imei, 'AlarmStatus' => $data['alarm_status']['hex_value'],//告警状态 'DeviceStatus' => $data['device_status']['hex_value'],//设备状态 'Longitude' => $data['lng']['value'], 'Latitude' => $data['lat']['value'], 'DeviceTime' => $data['timestamp']['value'],//本次打包时,第一个终端位置数据采集时间 'Voltage' => $data['voltage']['value'],//终端外部供电电压 ); //var_dump($base_points_info); //echo 'lat_hex = '.$data['lat']['hex_value'].',lng_hex = '.$data['lng']['hex_value'].PHP_EOL; //解析打包位置参数 $list = array(); foreach($pkt_position_params as $key => $row){ $tmp = array(); $tmp['DeviceId'] = $base_points_info['DeviceId']; $tmp['Altitude'] = $row['altitude']['value']; $tmp['Speed'] = $row['speed']['value']; $tmp['Direction'] = $row['direction']['value']; $tmp['SatelliteCount'] = $row['satellite_count']['value']; if($key < 1){ $tmp['DeviceTime'] = $row['relative_time']['value'] + $base_points_info['DeviceTime']; $tmp['Latitude'] = ($row['lat']['value'] + $base_points_info['Latitude'])/1000000; $tmp['Longitude'] = ($row['lng']['value'] + $base_points_info['Longitude'])/1000000; }else{ $tmp['DeviceTime'] = $row['relative_time']['value'] + $list[$key-1]['DeviceTime']; $tmp['Latitude'] = $row['lat']['value']/1000000 + $list[$key-1]['Latitude']; $tmp['Longitude'] = $row['lng']['value']/1000000 + $list[$key-1]['Longitude']; } $list[$key] = $tmp; } return $list; } }