git пре 2 година
родитељ
комит
a529de8600

+ 22 - 9
Home/Lib/Action/InotifyMonitor.php

@@ -42,9 +42,11 @@ class InotifyMonitor {
              public $timeout  = 3;
              
              public $redis=null;
-          
+             
+             public $db=1;
+             public $key='redis_to_kafka';
            
-             public  function __construct( $paths,$ip='127.0.0.1',$password='',$post=6379){
+             public  function __construct( $paths,$ip='127.0.0.1',$password='',$post=6379,$db='1',$key='redis_to_kafka'){
                  if (!empty($paths)) {
                      foreach ($paths as $path) {
                          if (file_exists($path)) {
@@ -58,10 +60,13 @@ class InotifyMonitor {
                  }
                 
                 $this->redis = new Redis();
+                $this->db=$db;
+                $this->key=$key;
+
                 try{
                     $this->redis->pconnect($ip,$post,2.5);
                     $this->redis->auth($password); //设置密码
-                    $this->redis->select(1);
+                    $this->redis->select($this->db);
                     $result = $this->redis->ping();
                     if($result=='+PONG')
                     {
@@ -171,7 +176,14 @@ class InotifyMonitor {
                                          echo 'create file->'.$file.PHP_EOL;
                                 
                                          //写入redis
-                                         $this->set_redis($file);
+                                         try
+                                         {
+                                            $this->set_redis($file);
+                                         }catch(Exception $e)
+                                         {
+                                            DahuaUtil::rlog("redis数据库连接".$e->getMessage());
+                                         }
+                                       
                                          
                                      }
                                       
@@ -200,13 +212,14 @@ class InotifyMonitor {
               */
              public function set_redis($data)
              {
-                if(!strstr($data,'md5'))
+                if(strstr($data,'.zip'))
                 {   
                     if(rename($data,$data.'.redis'))
                     {
                         $data=$data.'.redis';
                     }
-                    $this->redis->lpush('redis_to_kafka',$data);
+                    
+                    $this->redis->lpush($this->key,$data);
                 }
                
 
@@ -214,10 +227,10 @@ class InotifyMonitor {
              //扫描目录下所有没有上传到redis文件
              public function scan_file()
              {  
-                
+                 echo '调用脚本'.PHP_EOL;
                  
-                   shell_exec("/home/wwwroot/nbfd_tp3/clear_file_redis.sh > /dev/null 2>&1 &");
-                  //echo shell_exec("/home/wwwroot/nbfd_tp3/clear_file_redis.sh");
+                   //shell_exec("/home/wwwroot/nbfd_tp3/clear_file_redis.sh > /dev/null 2>&1 &");
+                  echo shell_exec("/home/wwwroot/nbfd_tp3/clear_file_redis.sh");
                    return;
             }
             

+ 270 - 102
Home/Lib/Action/ListeningFileCreateAction.class.php

@@ -9,58 +9,184 @@ class ListeningFileCreateAction extends Action {
  
 	public  function start_listening(  ){
 		include('InotifyMonitor.php');
-				//文件夹,ip,密码,端口
-		$test = new InotifyMonitor(['/home/renlian'],'192.168.1.105','',6379);
+					//文件夹,ip,密码,端口
+		if(!C('WATCH_REDIS'))
+		{
+			DahuaUtil::rlog('WATCH_REDIS 该常量不存在');
+			$str= "'WATCH_REDIS'=>array(
+						'ip'=>'192.168.1.105',
+						'password'=>'',
+						'port'=>'6379',
+						'db'=>1,
+						'key'=>'redis_to_kafka',
+			 );";
+			DahuaUtil::rlog('WATCH_REDIS 具体格式:'.$str);
+			return;
+		
+		}
+		if(!C('WATCH_FTP_ADDRESS'))
+		{
+			DahuaUtil::rlog('WATCH_FTP_ADDRESS 该常量不存在');
+			$str= "'WATCH_FTP_ADDRESS'=>'/home/renlian'";
+			DahuaUtil::rlog('WATCH_FTP_ADDRESS 具体格式:'.$str);
+			return;
+		}
+		$path = C('WATCH_FTP_ADDRESS');
+		$redis = C('WATCH_REDIS');
+		$test = new InotifyMonitor([$path],$redis['ip'],$redis['password'],$redis['6379'],$redis['db'],$redis['key']);
 		$test->run();
+		
+			
+		 
+		
 	}
 	
  
 	public  function redis_to_kafka(  ){
-		            include('DahuaUtil.php');
-						ini_set('memory_limit', '1024M');
-						DahuaUtil::rlog('从redis中获取被扫描的文件');
-						try{
+		include('DahuaUtil.php');
+		ini_set('memory_limit', '1024M');
+		if (!extension_loaded('rdkafka')){
+			DahuaUtil::rlog('pushToKafka fail,extension of rdkafka has not installed!!'.PHP_EOL);
+			DahuaUtil::rlog('请重新安装kafka扩展');
+		   return false;
+		}
+		if (!extension_loaded('redis')){
+						
+			DahuaUtil::rlog('redis fail,extension of rdkafka has not installed!!'.PHP_EOL);
+			DahuaUtil::rlog('请重新安装redis扩展');
+			return false;
+		}
 		
-							$redis = new redis();
-							DahuaUtil::rlog('redis连接中....');
-							$redis->connect('192.168.1.105',6379);
-							$redis->select(1);
-							$result = $redis->ping();
-							if($result=='+PONG')
-							{
-								DahuaUtil::rlog('redis连接成功');
-							}else
-							{
-								DahuaUtil::rlog('redis连接失败');
-							}
+		if(!C('WATCH_REDIS'))
+		{
+			DahuaUtil::rlog('WATCH_REDIS 该常量不存在');
+			$str= "'WATCH_REDIS'=>array(
+				'ip'=>'192.168.1.105',
+				'password'=>'',
+				'port'=>'6379',
+				'db'=>1,
+				'key'=>'redis_to_kafka',
+			);";
+			DahuaUtil::rlog('WATCH_REDIS 具体格式:'.$str);
+			return;
+		}
+		if(!C('WATCH_FTP_ADDRESS'))
+		{
+			DahuaUtil::rlog('WATCH_FTP_ADDRESS 该常量不存在');
+			$str= "'WATCH_FTP_ADDRESS'=>'/home/renlian'";
+			DahuaUtil::rlog('WATCH_FTP_ADDRESS 具体格式:'.$str);
+			return;
+		
+		}
+		if(!C('WATCH_KAFKA'))
+		{
+			DahuaUtil::rlog('WATCH_KAFKA 该常量不存在');
+			$str="'WATCH_KAFKA'=>array(
+			'address'=>'192.168.1.105:9092',
+			'topic'=>'ningbo_rfid_kafka_topic',
+			);";
+		   DahuaUtil::rlog('WATCH_KAFKA 具体格式:'.$str);
+			return;
+		}
 						
-						}catch(Exception $e)
-						{
-							DahuaUtil::rlog('redis连接异常',$e->getMessage());
-						}
+		$path=C('WATCH_FTP_ADDRESS');
+						//kafka配置
+		$topic =C('WATCH_KAFKA')['topic'];
+		$kafka_address=C('WATCH_KAFKA')['address'];
+						//redis配置
+		$redis_ip=C('WATCH_REDIS')['ip'];
+		$redis_password=C('WATCH_REDIS')['password'];
+		$redis_port=C('WATCH_REDIS')['port'];
+		$redis_key=C('WATCH_REDIS')['key'];
+		$redis_db=C('WATCH_REDIS')['db'];
+		DahuaUtil::rlog('从redis中获取被扫描的文件存入kafka');
+		DahuaUtil::rlog('开始连接kafka');
+		$conf = new Rdkafka\Conf();
+						//$conf->set('batch.num.messages', 2);
+		$conf->set('metadata.broker.list',$kafka_address);
+		$conf->setErrorCb(function($producer, $msg) {
+			DahuaUtil::rlog(rd_kafka_err2str($err), $errstr);
+		});
 						
-						echo "redis连接结果=>".$result.PHP_EOL;
-						$i=0;
-						DahuaUtil::rlog('开始扫描文件');
-						while(true)
-						{
-							$data=null;
-						    $res =  $redis->rPop('redis_to_kafka');
-						    if(!$res)
-						    {
-						        sleep(1);
-						        continue;
-						    }
-						    if(!file_exists($res))
-						    {
-						        continue;
-						    }
+		$conf->setDrMsgCb(function($producer, $msg) {
+			if($msg->err) {
+				DahuaUtil::rlog('Message delivery failed:' . $msg->errstr());
+			}
+		    });
+					
+		$rk = new RdKafka\Producer($conf);
+		$topic = $rk->newTopic($topic);
+		DahuaUtil::rlog("kafka连接成功");
+		
+		try{
+		
+				$redis = new redis();
+				DahuaUtil::rlog('redis连接中....');
+				$redis->connect($redis_ip,$redis_port);
+				$redis->auth($redis_password);
+				$redis->select($redis_db);
+				$result = $redis->ping();
+				if($result=='+PONG')
+				{
+					DahuaUtil::rlog('redis连接成功');
+				}else
+				{
+					DahuaUtil::rlog('redis连接失败');
+				}
+						
+		}catch(Exception $e)
+		{
+			    DahuaUtil::rlog('redis连接异常',$e->getMessage());
+		}
+						
+						
+				$i=0;
+			   $f=0;
+				DahuaUtil::rlog('开始扫描文件');
+		while(true)
+		{
+			$data=null;
+		
+			$res =  $redis->rPop($redis_key);
+		    if(!$res)
+			{
+				sleep(1);
+				continue;
+			}
+			if(!file_exists($res))
+			{
+			    continue;
+			}
 							
-							DahuaUtil::rlog('扫描文件路径为:'.$res);
-							$start_time = microtime(true);
-							$fuc=function($res)
-							{	
-								$handle = fopen($res, 'rb');
+			DahuaUtil::rlog('扫描文件路径为:'.$res);
+			$zip = new ZipArchive();
+			DahuaUtil::rlog('开始解压文件:'.$res);
+			$new_res=str_replace(".zip.redis","redis.zip",$res);
+			//将名字中的.redis取消掉
+			rename($res,$new_res);
+			$res=$new_res;
+			if ($zip->open($res)===true){
+				$zip->extractTo($path);
+				$zip->close();
+				//删除压缩包
+				DahuaUtil::rlog('解压文件成功');
+				unlink($res);
+				DahuaUtil::rlog('删除压缩包');
+				$res=str_replace("redis.zip",".dat",$res);
+				DahuaUtil::rlog('压缩包中的文件为'.$res);
+								
+			}else
+			{
+				DahuaUtil::rlog('解压文件失败,进行跳过');
+				continue;
+			}
+		
+		
+					$f++;
+				   $start_time = microtime(true);
+					$fuc=function($res)
+					{	
+							$handle = fopen($res, 'rb');
 								while (feof($handle)===false) {
 									# code...
 									
@@ -68,91 +194,133 @@ class ListeningFileCreateAction extends Action {
 					
 								}
 								fclose($handle);
-							};
-							$t=0;
-							foreach($fuc($res) as $value)
-							{
-								if(!empty($value))
-								{
-									$t++;
-									$i++;
-								}
+					};
+					$t=0;
+					foreach($fuc($res) as $value)
+				   {
+						if(!empty($value))
+						{
+							$topic->produce(RD_KAFKA_PARTITION_UA, 0,$value);
+							$rk->poll(0);
+							$t++;
+						   $i++;
+							while ($rk->getOutQLen() >10000) {
+									$rk->poll(10);
+						    }	
+					    }
 								
-				
-							}
+				   }
 							
-							$end_time = microtime(true);
-							DahuaUtil::rlog('完成扫描文件:'.$res);
-							DahuaUtil::rlog('一个文件处理完成时间:'.($end_time-$start_time).'s');
-							DahuaUtil::rlog('上传的文件有'.$t.'条');
-							DahuaUtil::rlog('总共有了'.$i.'条');
-							$res1=str_replace(".dat.redis",".md5",$res);
-							unlink($res);
-							unlink($res1);
-							DahuaUtil::rlog('删除文件:'.$res);
-							DahuaUtil::rlog('删除md5文件:'.$res1);
-						}
 							
+					DahuaUtil::rlog('传送kafaka文件成功:'.$res);
+					$end_time = microtime(true);
+					$res1=str_replace(".dat",".md5",$res);
+					unlink($res);
+					unlink($res1);
+					DahuaUtil::rlog('完成扫描文件:'.$res);
+					DahuaUtil::rlog('删除文件:'.$res);
+				   DahuaUtil::rlog('删除md5文件:'.$res1);
+					DahuaUtil::rlog('一个文件处理完成时间:'.($end_time-$start_time).'s');
+					DahuaUtil::rlog('上传的文件有'.$t.'条');
+					DahuaUtil::rlog('总共有了'.$i.'条');
+					DahuaUtil::rlog('总共有的文件数据'.$f.'条');
+					echo '______________________________'.PHP_EOL;
+					echo '______________________________'.PHP_EOL;
+					echo '______________________________'.PHP_EOL;
+		  }				
 	}
 	
  
 	public  function scan_no_listening(  ){
 		include('DahuaUtil.php');
-		$dir = '/home/renlian';
-		$ip='192.168.1.105';
-		$post=6379;
-					if(is_file($dir))
-					{
-					   return;
-					}
-				    $redis = new Redis();
-					DahuaUtil::rlog('连接redis'. $data);
-				
-					try{
-							 $redis->connect($ip,$post,2.5);
-							 $redis->auth($password); //设置密码
-							$redis->select(1);
-							$result = $redis->ping();
-							if($result=='+PONG')
-							{
+		include('InotifyMonitor.php');
+		if(!C('WATCH_REDIS'))
+		{
+					DahuaUtil::rlog('WATCH_REDIS 该常量不存在');
+					$str= "'WATCH_REDIS'=>array(
+						'ip'=>'192.168.1.105',
+						'password'=>'',
+						'port'=>'6379',
+						'db'=>1,
+						'key'=>'redis_to_kafka',
+					);";
+					DahuaUtil::rlog('WATCH_REDIS 具体格式:'.$str);
+					return;
+		
+		}
+		if(!C('WATCH_FTP_ADDRESS'))
+		{
+					DahuaUtil::rlog('WATCH_FTP_ADDRESS 该常量不存在');
+					$str= "'WATCH_FTP_ADDRESS'=>'/home/renlian'";
+					DahuaUtil::rlog('WATCH_FTP_ADDRESS 具体格式:'.$str);
+					return;
+		}
+			
+		$dir = C('WATCH_FTP_ADDRESS');
+		$redis=C('WATCH_REDIS');
+		$ip=$redis['ip'];
+		$post=$redis['port'];
+		$password=$redis['password'];
+		$key=$redis['key'];
+		$db=$redis['db'];
+		
+		if(is_file($dir))
+		{
+			return;
+		}
+		$redis = new Redis();
+		DahuaUtil::rlog('连接redis'. $data);
+		try{
+				$redis->connect($ip,$post,2.5);
+				$redis->auth($password); //设置密码
+				$redis->select($db);
+				$result = $redis->ping();
+				if($result=='+PONG')
+				{
 				             
-				             DahuaUtil::rlog('redis连接成功');
-							}
-							else
-							{
-								DahuaUtil::rlog("redis连接失败=>".$result);
+				      DahuaUtil::rlog('redis连接成功');
+				}
+				else
+				{
+					DahuaUtil::rlog("redis连接失败=>".$result);
 								
-							}
+				}
 				                  
-				       }catch (Exception $e){
+			}catch (Exception $e){
 				
-				           DahuaUtil::rlog("redis连接异常".$e->getMessage());
+				DahuaUtil::rlog("redis连接异常".$e->getMessage());
 				
-				    }
+			 }
 					
-					DahuaUtil::rlog('开始扫描文件夹 '. $dir);
-				    $files = scandir($dir);
-				    foreach($files as $k=>$filename) {//务必使用!==,防止目录下出现类似文件名“0”等情况
-						
-				    if ($filename != "." && $filename != ".." &&!strstr($filename,'.redis')&&!strstr($filename,'.md5')&&is_file($dir.'/'.$filename))    {
+		DahuaUtil::rlog('开始扫描文件夹 '. $dir);
+		$files = scandir($dir);
+		foreach($files as $k=>$filename) {//务必使用!==,防止目录下出现类似文件名“0”等情况
+		     if ($filename != "." && $filename != ".."   &&!strstr($filename,'.zip.redis')&&!strstr($filename,'redis.zip')&&strstr($filename,'.zip') &&is_file($dir.'/'.$filename)) {
 				          $data = $dir.'/'.$filename;
 						  
-							  $file_md5 = str_replace('.dat','.md5',$data);
+							  $file_md5 = str_replace('.zip','.md5',$data);
 							  if(!is_file($file_md5))
 							  {
 								  continue;
 							  }
-						  DahuaUtil::rlog('将文件添加到redis中'. $data);
+						     DahuaUtil::rlog('将文件添加到redis中'. $data);
 				          if(rename($data,$data.'.redis'))
-				          {
-				              $redis->lpush('redis_to_kafka',$data.'.redis');
+				          {		
+							      try{
+								     $redis->lpush($key,$data.'.redis');
+							      }catch(Exception $e)
+							     {	
+								     DahuaUtil::rlog("redis连接异常".$e->getMessage());
+								//重新连接redis
+								   }
+				              
 				          }
 				    }
 				                   }
 				   closedir($dir);
-					 $redis->close();
+					$redis->close();
 				
-				   return;
+		return;
 	}
 	
 

+ 1 - 0
Home/Lib/Action/RouteRfidKafkaAction.class.php

@@ -76,6 +76,7 @@ class RouteRfidKafkaAction extends Action {
 			
 		   $r = oci_execute($stid);
 			if(!$r){
+				$val['mac']=$RF_ID;
 				$this->debug_log( 'insert_oracle_error', $val );
 				return array('success'=>false,'message'=>'addRfidDataToNingbo failed,insert_oracle_error!');
 			}