git 2 年之前
父節點
當前提交
93b1451078
共有 1 個文件被更改,包括 51 次插入48 次删除
  1. 51 48
      Home/Lib/Action/ListeningFileCreateAction.class.php

+ 51 - 48
Home/Lib/Action/ListeningFileCreateAction.class.php

@@ -16,56 +16,59 @@ class ListeningFileCreateAction extends Action {
 	
  
 	public  function redis_to_kafka(  ){
-		ini_set('memory_limit', '256M');
-		$redis = new redis();
-		$redis->connect('192.168.1.105',6379);
-		$redis->select(1);
-		$result = $redis->ping();
-		echo "redis连接结果=>".$result.PHP_EOL;
-		// $conf = new RdKafka\Conf();
-		// $conf->set('metadata.broker.list', 'localhost:9092');
-		// $conf->setDrMsgCb(function ($kafka, $message) {
-		//     file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
-		// });
-		// $conf->setErrorCb(function ($kafka, $err, $reason) {
-		//     file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)",rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
-		// });
-		// $producer = new RdKafka\Producer($conf);
-		// $producer->setLogLevel(LOG_DEBUG);
-		// $producer->addBrokers("127.0.0.1");
-		
-		while(true)
-		{
-		    $res =  $redis->rPop('redis_to_kafka');
-		    if(!$res)
-		    {
-		        sleep(1);
-		        continue;
-		    }
-		    if(!file_exists($res))
-		    {
-		        continue;
-		    }
-		    $data =  file_get_contents($res);
-		    if(empty($data))
-		    {
-		        continue;
-		    }
-		    $data = explode('\n',$data);
-		    foreach($data as $item)
-		    {
-		        echo $item;
-		    }
-			//$res1= pathinfo(str_replace(".redis","",$res), PATHINFO_EXTENSION);
-			$res2=str_replace('.dat.redis',".md5",$res);
-			unlink($res2);
-			unlink($res);
-			
-			 
+				ini_set('memory_limit', '1024M');
+				$redis = new redis();
+				$redis->connect('192.168.1.105',6379);
+				$redis->select(1);
+				$result = $redis->ping();
+				echo "redis连接结果=>".$result.PHP_EOL;
+				
+				$i=0;
+				while(true)
+				{
+					$data=null;
+				    $res =  $redis->rPop('redis_to_kafka');
+					
+				    if(!$res)
+				    {
+				        sleep(1);
+				        continue;
+				    }
+				    if(!file_exists($res))
+				    {
+				        continue;
+				    }
+					echo $res.PHP_EOL;
+					$start_time = time();
+					$fuc=function($res)
+					{	
+						$handle = fopen($res, 'rb');
+						while (feof($handle)===false) {
+							# code...
+							
+							yield fgets($handle);
 			
+						}
+						fclose($handle);
+					};
+					$t=0;
+					foreach($fuc($res) as $value)
+					{
+						$t++;
+						$i++;
 		
-		}
-		
+					}
+					
+					
+				
+					$end_time = time();
+					echo  '一个文件处理完成时间'.($end_time-$start_time).'s';
+					echo '上传的文件有'.$t.'条'.PHP_EOL;
+					echo '总共有了'.$i.'条'.PHP_EOL;
+					$res1=str_replace(".dat.redis",".md5",$res);
+					unlink($res);
+					unlink($res1);
+					
 	}