|
@@ -186,20 +186,30 @@ class ListeningFileCreateAction extends Action {
|
|
|
$start_time = microtime(true);
|
|
|
$fuc=function($res)
|
|
|
{
|
|
|
+ try{
|
|
|
$handle = fopen($res, 'rb');
|
|
|
+
|
|
|
+ }catch(Exception $e)
|
|
|
+ {
|
|
|
+ DahuaUtil::rlog('打开文件失败'.$e->getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
while (feof($handle)===false) {
|
|
|
# code...
|
|
|
-
|
|
|
yield fgets($handle);
|
|
|
|
|
|
}
|
|
|
fclose($handle);
|
|
|
};
|
|
|
+ DahuaUtil::rlog('读取文件成功');
|
|
|
$t=0;
|
|
|
foreach($fuc($res) as $value)
|
|
|
{
|
|
|
if(!empty($value))
|
|
|
{
|
|
|
+ try{
|
|
|
$topic->produce(RD_KAFKA_PARTITION_UA, 0,$value);
|
|
|
$rk->poll(0);
|
|
|
$t++;
|
|
@@ -207,7 +217,13 @@ class ListeningFileCreateAction extends Action {
|
|
|
while ($rk->getOutQLen() >10000) {
|
|
|
$rk->poll(10);
|
|
|
}
|
|
|
- }
|
|
|
+
|
|
|
+ }catch(Exception $e)
|
|
|
+ {
|
|
|
+ DahuaUtil::rlog('kafka存取异常'.$e->getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
|
|
|
}
|
|
|
|