|
@@ -82,24 +82,28 @@ class V1Action extends Action {
|
|
|
}
|
|
|
// $x = 0;
|
|
|
$sum = 0;
|
|
|
+ $locationPack = '';
|
|
|
while (true) {
|
|
|
$message = $consumer->consume(30*1000);
|
|
|
// $_st = microtime(TRUE);
|
|
|
switch ($message->err) {
|
|
|
case RD_KAFKA_RESP_ERR_NO_ERROR:
|
|
|
- $locationPack = ($message->payload);
|
|
|
+ $locationPack .= ($message->payload).PHP_EOL;
|
|
|
$fileName = $localDir.'/HC_'.$createTime . '.dat';
|
|
|
$runTime = time() - $timeFram;
|
|
|
$sum++;
|
|
|
echo $sum.PHP_EOL;
|
|
|
- if($runTime < $fileTimeInterval){
|
|
|
- $datRes = $this->writeRouteFile($fileName, $locationPack);
|
|
|
- }else{
|
|
|
+ if($sum % 100000 == 0){
|
|
|
$datRes = $this->writeRouteFile($fileName,$locationPack);
|
|
|
+ if($datRes){
|
|
|
+ $locationPack = '';
|
|
|
+ $sum = 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if($runTime >= $fileTimeInterval){
|
|
|
$md5Res = $this->createRouteMD5file($fileName);
|
|
|
$timeFram = time();
|
|
|
$createTime = date('Ymd_His', $timeFram);
|
|
|
-
|
|
|
}
|
|
|
break;
|
|
|
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
|
|
@@ -107,6 +111,13 @@ class V1Action extends Action {
|
|
|
break;
|
|
|
case RD_KAFKA_RESP_ERR__TIMED_OUT:
|
|
|
$fileName = $localDir.'/HC_'.$createTime . '.dat';
|
|
|
+ if($locationPack != ''){
|
|
|
+ $datRes = $this->writeRouteFile($fileName,$locationPack);
|
|
|
+ if($datRes){
|
|
|
+ $locationPack = '';
|
|
|
+ $sum = 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
$md5Res = $this->createRouteMD5file($fileName);
|
|
|
$timeFram = time();
|
|
|
$createTime = date('Ymd_His', $timeFram);
|
|
@@ -132,8 +143,11 @@ class V1Action extends Action {
|
|
|
debug_log('route_info',$writeData);
|
|
|
|
|
|
$writeData = utf8_encode($writeData);
|
|
|
-
|
|
|
- $res = Zmcoding\FtpFile::getInstance($config)->writeFile($fileName, $writeData.PHP_EOL);
|
|
|
+ $cur_time = time();
|
|
|
+ while(time()-$cur_time<10){
|
|
|
+ $writeData .= $writeData.PHP_EOL;
|
|
|
+ }
|
|
|
+ $res = Zmcoding\FtpFile::getInstance($config)->writeFile($fileName, $writeData);
|
|
|
|
|
|
return $res;
|
|
|
}
|