|
@@ -16,142 +16,143 @@ class ListeningFileCreateAction extends Action {
|
|
|
|
|
|
|
|
|
public function redis_to_kafka( ){
|
|
|
- include('DahuaUtil.php');
|
|
|
- ini_set('memory_limit', '1024M');
|
|
|
- DahuaUtil::rlog('从redis中获取被扫描的文件');
|
|
|
- try{
|
|
|
-
|
|
|
- $redis = new redis();
|
|
|
- DahuaUtil::rlog('redis连接中....');
|
|
|
- $redis->connect('192.168.1.105',6379);
|
|
|
- $redis->select(1);
|
|
|
- $result = $redis->ping();
|
|
|
- if($result=='+PONG')
|
|
|
- {
|
|
|
- DahuaUtil::rlog('redis连接成功');
|
|
|
- }else
|
|
|
- {
|
|
|
- DahuaUtil::rlog('redis连接失败');
|
|
|
- }
|
|
|
-
|
|
|
- }catch(Exception $e)
|
|
|
- {
|
|
|
- DahuaUtil::rlog('redis连接异常',$e->getMessage());
|
|
|
- }
|
|
|
-
|
|
|
- echo "redis连接结果=>".$result.PHP_EOL;
|
|
|
- $i=0;
|
|
|
- DahuaUtil::rlog('开始扫描文件');
|
|
|
- while(true)
|
|
|
- {
|
|
|
- $data=null;
|
|
|
- $res = $redis->rPop('redis_to_kafka');
|
|
|
- if(!$res)
|
|
|
- {
|
|
|
- sleep(1);
|
|
|
- continue;
|
|
|
- }
|
|
|
- if(!file_exists($res))
|
|
|
- {
|
|
|
- continue;
|
|
|
- }
|
|
|
-
|
|
|
- DahuaUtil::rlog('扫描文件路径为:'.$res);
|
|
|
- $start_time = microtime(true);
|
|
|
- $fuc=function($res)
|
|
|
- {
|
|
|
- $handle = fopen($res, 'rb');
|
|
|
- while (feof($handle)===false) {
|
|
|
- # code...
|
|
|
-
|
|
|
- yield fgets($handle);
|
|
|
-
|
|
|
- }
|
|
|
- fclose($handle);
|
|
|
- };
|
|
|
- $t=0;
|
|
|
- foreach($fuc($res) as $value)
|
|
|
- {
|
|
|
- if(!empty($value))
|
|
|
+ include('DahuaUtil.php');
|
|
|
+ ini_set('memory_limit', '1024M');
|
|
|
+ DahuaUtil::rlog('从redis中获取被扫描的文件');
|
|
|
+ try{
|
|
|
+
|
|
|
+ $redis = new redis();
|
|
|
+ DahuaUtil::rlog('redis连接中....');
|
|
|
+ $redis->connect('192.168.1.105',6379);
|
|
|
+ $redis->select(1);
|
|
|
+ $result = $redis->ping();
|
|
|
+ if($result=='+PONG')
|
|
|
+ {
|
|
|
+ DahuaUtil::rlog('redis连接成功');
|
|
|
+ }else
|
|
|
+ {
|
|
|
+ DahuaUtil::rlog('redis连接失败');
|
|
|
+ }
|
|
|
+
|
|
|
+ }catch(Exception $e)
|
|
|
{
|
|
|
- $t++;
|
|
|
- $i++;
|
|
|
+ DahuaUtil::rlog('redis连接异常',$e->getMessage());
|
|
|
}
|
|
|
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- $end_time = microtime(true);
|
|
|
- DahuaUtil::rlog('完成扫描文件:'.$res);
|
|
|
- DahuaUtil::rlog('一个文件处理完成时间:'.($end_time-$start_time).'s');
|
|
|
- DahuaUtil::rlog('上传的文件有'.$t.'条');
|
|
|
- DahuaUtil::rlog('总共有了'.$i.'条');
|
|
|
- $res1=str_replace(".dat.redis",".md5",$res);
|
|
|
- unlink($res);
|
|
|
- unlink($res1);
|
|
|
- DahuaUtil::rlog('删除文件:'.$res);
|
|
|
- DahuaUtil::rlog('删除md5文件:'.$res1);
|
|
|
- }
|
|
|
+ echo "redis连接结果=>".$result.PHP_EOL;
|
|
|
+ $i=0;
|
|
|
+ DahuaUtil::rlog('开始扫描文件');
|
|
|
+ while(true)
|
|
|
+ {
|
|
|
+ $data=null;
|
|
|
+ $res = $redis->rPop('redis_to_kafka');
|
|
|
+ if(!$res)
|
|
|
+ {
|
|
|
+ sleep(1);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if(!file_exists($res))
|
|
|
+ {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ DahuaUtil::rlog('扫描文件路径为:'.$res);
|
|
|
+ $start_time = microtime(true);
|
|
|
+ $fuc=function($res)
|
|
|
+ {
|
|
|
+ $handle = fopen($res, 'rb');
|
|
|
+ while (feof($handle)===false) {
|
|
|
+ # code...
|
|
|
+
|
|
|
+ yield fgets($handle);
|
|
|
|
|
|
-
|
|
|
+ }
|
|
|
+ fclose($handle);
|
|
|
+ };
|
|
|
+ $t=0;
|
|
|
+ foreach($fuc($res) as $value)
|
|
|
+ {
|
|
|
+ if(!empty($value))
|
|
|
+ {
|
|
|
+ $t++;
|
|
|
+ $i++;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ $end_time = microtime(true);
|
|
|
+ DahuaUtil::rlog('完成扫描文件:'.$res);
|
|
|
+ DahuaUtil::rlog('一个文件处理完成时间:'.($end_time-$start_time).'s');
|
|
|
+ DahuaUtil::rlog('上传的文件有'.$t.'条');
|
|
|
+ DahuaUtil::rlog('总共有了'.$i.'条');
|
|
|
+ $res1=str_replace(".dat.redis",".md5",$res);
|
|
|
+ unlink($res);
|
|
|
+ unlink($res1);
|
|
|
+ DahuaUtil::rlog('删除文件:'.$res);
|
|
|
+ DahuaUtil::rlog('删除md5文件:'.$res1);
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
public function scan_no_listening( ){
|
|
|
include('DahuaUtil.php');
|
|
|
$dir = '/home/renlian';
|
|
|
$ip='192.168.1.105';
|
|
|
$post=6379;
|
|
|
- if(is_file($dir))
|
|
|
- {
|
|
|
- return;
|
|
|
- }
|
|
|
- $redis = new Redis();
|
|
|
- DahuaUtil::rlog('连接redis'. $data);
|
|
|
-
|
|
|
- try{
|
|
|
- $redis->connect($ip,$post,2.5);
|
|
|
- $redis->auth($password); //设置密码
|
|
|
- $redis->select(1);
|
|
|
- $result = $redis->ping();
|
|
|
- if($result=='+PONG')
|
|
|
+ if(is_file($dir))
|
|
|
{
|
|
|
-
|
|
|
- DahuaUtil::rlog('redis连接成功');
|
|
|
+ return;
|
|
|
}
|
|
|
- else
|
|
|
- {
|
|
|
- DahuaUtil::rlog("redis连接失败=>".$result);
|
|
|
+ $redis = new Redis();
|
|
|
+ DahuaUtil::rlog('连接redis'. $data);
|
|
|
+
|
|
|
+ try{
|
|
|
+ $redis->connect($ip,$post,2.5);
|
|
|
+ $redis->auth($password); //设置密码
|
|
|
+ $redis->select(1);
|
|
|
+ $result = $redis->ping();
|
|
|
+ if($result=='+PONG')
|
|
|
+ {
|
|
|
+
|
|
|
+ DahuaUtil::rlog('redis连接成功');
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ DahuaUtil::rlog("redis连接失败=>".$result);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ }catch (Exception $e){
|
|
|
+
|
|
|
+ DahuaUtil::rlog("redis连接异常".$e->getMessage());
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ DahuaUtil::rlog('开始扫描文件夹 '. $dir);
|
|
|
+ $files = scandir($dir);
|
|
|
+ foreach($files as $k=>$filename) {//务必使用!==,防止目录下出现类似文件名“0”等情况
|
|
|
|
|
|
- }
|
|
|
-
|
|
|
- }catch (Exception $e){
|
|
|
-
|
|
|
- DahuaUtil::rlog("redis连接异常".$e->getMessage());
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- DahuaUtil::rlog('开始扫描文件夹 '. $dir);
|
|
|
- $files = scandir($dir);
|
|
|
- foreach($files as $k=>$filename) {//务必使用!==,防止目录下出现类似文件名“0”等情况
|
|
|
+ if ($filename != "." && $filename != ".." &&!strstr($filename,'.redis')&&!strstr($filename,'.md5')&&is_file($dir.'/'.$filename)) {
|
|
|
+ $data = $dir.'/'.$filename;
|
|
|
+
|
|
|
+ $file_md5 = str_replace('.dat','.md5',$data);
|
|
|
+ if(!is_file($file_md5))
|
|
|
+ {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ DahuaUtil::rlog('将文件添加到redis中'. $data);
|
|
|
+ if(rename($data,$data.'.redis'))
|
|
|
+ {
|
|
|
+ $redis->lpush('redis_to_kafka',$data.'.redis');
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ closedir($dir);
|
|
|
+ $redis->close();
|
|
|
|
|
|
- if ($filename != "." && $filename != ".." &&!strstr($filename,'.redis')&&!strstr($filename,'.md5')&&is_file($dir.'/'.$filename)) {
|
|
|
- $data = $dir.'/'.$filename;
|
|
|
-
|
|
|
- $file_md5 = str_replace('.dat','.md5',$data);
|
|
|
- if(!is_file($file_md5))
|
|
|
- {
|
|
|
- continue;
|
|
|
- }
|
|
|
- DahuaUtil::rlog('将文件添加到redis中'. $data);
|
|
|
- if(rename($data,$data.'.redis'))
|
|
|
- {
|
|
|
- $redis->lpush('redis_to_kafka',$data.'.redis');
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- closedir($dir);
|
|
|
- $redis->close();
|
|
|
-
|
|
|
- return;
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
|