|
@@ -83,47 +83,53 @@ class V1Action extends Action {
|
|
|
// $x = 0;
|
|
|
$sum = 0;
|
|
|
$locationPack = '';
|
|
|
+ $total = 0; //总数10000个数 消息总数 = total*10000+endsum
|
|
|
+ $endsum = 0; //最后一次数量
|
|
|
while (true) {
|
|
|
$message = $consumer->consume(30*1000);
|
|
|
// $_st = microtime(TRUE);
|
|
|
switch ($message->err) {
|
|
|
case RD_KAFKA_RESP_ERR_NO_ERROR:
|
|
|
- $locationPack .= ($message->payload).PHP_EOL;
|
|
|
- $fileName = $localDir.'/HC_'.$createTime . '.dat';
|
|
|
- $runTime = time() - $timeFram;
|
|
|
- $sum++;
|
|
|
- echo $sum.PHP_EOL;
|
|
|
- if($runTime < $fileTimeInterval){
|
|
|
- if($sum % 10000 == 0){
|
|
|
- echo 'start write routefile...'.PHP_EOL;
|
|
|
- $datRes = $this->writeRouteFile($fileName,$locationPack);
|
|
|
- if($datRes){
|
|
|
- $locationPack = '';
|
|
|
- $sum = 0;
|
|
|
+ $locationPack .= ($message->payload) . PHP_EOL;
|
|
|
+ $fileName = $localDir . '/HC_' . $createTime . '.dat';
|
|
|
+ $runTime = time() - $timeFram;
|
|
|
+ $sum++;
|
|
|
+ echo $sum . PHP_EOL;
|
|
|
+
|
|
|
+ if ($runTime < $fileTimeInterval) {
|
|
|
+ if ($sum % 10000 == 0) {
|
|
|
+ echo 'start write routefile...' . PHP_EOL;
|
|
|
+ $datRes = $this->writeRouteFile($fileName, $locationPack);
|
|
|
+ if ($datRes) {
|
|
|
+ $locationPack = '';
|
|
|
+ $sum = 0;
|
|
|
+ $total++;
|
|
|
+ }
|
|
|
}
|
|
|
+ } else {
|
|
|
+ $md5Res = $this->createRouteMD5file($fileName);
|
|
|
+ $timeFram = time();
|
|
|
+ $createTime = date('Ymd_His', $timeFram);
|
|
|
}
|
|
|
- }else{
|
|
|
- $md5Res = $this->createRouteMD5file($fileName);
|
|
|
- $timeFram = time();
|
|
|
- $createTime = date('Ymd_His', $timeFram);
|
|
|
- }
|
|
|
break;
|
|
|
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
|
|
|
echo "No more messages; will wait for more".PHP_EOL;
|
|
|
break;
|
|
|
case RD_KAFKA_RESP_ERR__TIMED_OUT:
|
|
|
- $fileName = $localDir.'/HC_'.$createTime . '.dat';
|
|
|
- if($locationPack != ''){
|
|
|
- $datRes = $this->writeRouteFile($fileName,$locationPack);
|
|
|
- if($datRes){
|
|
|
- $locationPack = '';
|
|
|
- $sum = 0;
|
|
|
+ $fileName = $localDir . '/HC_' . $createTime . '.dat';
|
|
|
+ if ($locationPack != '') {
|
|
|
+ $datRes = $this->writeRouteFile($fileName, $locationPack);
|
|
|
+ if ($datRes) {
|
|
|
+ $endsum = $sum;
|
|
|
+ $locationPack = '';
|
|
|
+ $sum = 0;
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- $md5Res = $this->createRouteMD5file($fileName);
|
|
|
- $timeFram = time();
|
|
|
- $createTime = date('Ymd_His', $timeFram);
|
|
|
- echo "Timed out\n";
|
|
|
+ $md5Res = $this->createRouteMD5file($fileName);
|
|
|
+ $timeFram = time();
|
|
|
+ $createTime = date('Ymd_His', $timeFram);
|
|
|
+ echo "Timed out\n";
|
|
|
+ echo "消息总数total:" . ($total * 10000 + $endsum) . PHP_EOL;
|
|
|
break;
|
|
|
default:
|
|
|
throw new \Exception($message->errstr(), $message->err);
|
|
@@ -137,18 +143,10 @@ class V1Action extends Action {
|
|
|
//deviceid,positioninfo.longitude,positioninfo.latitude,receivetime
|
|
|
$config = C('FTP_CONFIG');
|
|
|
|
|
|
- $writeData = $data;
|
|
|
+ debug_log('route_info', $data);
|
|
|
|
|
|
- if(!file_exists($fileName)){
|
|
|
-
|
|
|
- }
|
|
|
- debug_log('route_info',$writeData);
|
|
|
+ $writeData = utf8_encode($data);
|
|
|
|
|
|
- $writeData = utf8_encode($writeData);
|
|
|
- $cur_time = time();
|
|
|
- while(time()-$cur_time<10){
|
|
|
- $writeData .= $writeData.PHP_EOL;
|
|
|
- }
|
|
|
$res = Zmcoding\FtpFile::getInstance($config)->writeFile($fileName, $writeData);
|
|
|
|
|
|
return $res;
|