|
@@ -92,6 +92,7 @@ class ListeningFileCreateAction extends Action {
|
|
|
$path=C('WATCH_FTP_ADDRESS');
|
|
|
//kafka配置
|
|
|
$topic =C('WATCH_KAFKA')['topic'];
|
|
|
+ $topic1 =C('WATCH_KAFKA')['topic1'];
|
|
|
$kafka_address=C('WATCH_KAFKA')['address'];
|
|
|
//redis配置
|
|
|
$redis_ip=C('WATCH_REDIS')['ip'];
|
|
@@ -116,6 +117,7 @@ class ListeningFileCreateAction extends Action {
|
|
|
|
|
|
$rk = new RdKafka\Producer($conf);
|
|
|
$topic = $rk->newTopic($topic);
|
|
|
+ $topic1 = $rk->newTopic($topic1);
|
|
|
DahuaUtil::rlog("kafka连接成功");
|
|
|
|
|
|
try{
|
|
@@ -149,7 +151,8 @@ class ListeningFileCreateAction extends Action {
|
|
|
{
|
|
|
$data=null;
|
|
|
try{
|
|
|
- $res = $redis->rPop($redis_key);
|
|
|
+ $res = $redis->rPop($redis_key);
|
|
|
+
|
|
|
}catch(Exception $e)
|
|
|
{
|
|
|
DahuaUtil::rlog('redis取出数据异常'.$e->getMessage());
|
|
@@ -220,6 +223,10 @@ class ListeningFileCreateAction extends Action {
|
|
|
};
|
|
|
DahuaUtil::rlog('读取文件成功');
|
|
|
$t=0;
|
|
|
+ //如果文件名称带有sta 代表是基站 否则是代表是轨迹
|
|
|
+ if(strstr(pathinfo($res)["filename"],"station")){
|
|
|
+ $topic=$topic1;
|
|
|
+ }
|
|
|
foreach($fuc($res) as $value)
|
|
|
{
|
|
|
if(!empty($value))
|
|
@@ -234,8 +241,10 @@ class ListeningFileCreateAction extends Action {
|
|
|
}
|
|
|
|
|
|
}catch(Exception $e)
|
|
|
- {
|
|
|
+ {
|
|
|
+ DahuaUtil::rlog('kafka存取异常'."路径是".$res);
|
|
|
DahuaUtil::rlog('kafka存取异常'.$e->getMessage());
|
|
|
+
|
|
|
throw new Exception($e->getMessage());
|
|
|
}
|
|
|
|