InotifyMonitor.php 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. <?php
  2. require 'DahuaUtil.php';
  3. class InotifyMonitor {
  4. const MONITOR_EVENT = IN_CLOSE_WRITE;
  5. const EVENT_MASK = [
  6. IN_ACCESS => 'File was accessed (read)',
  7. IN_MODIFY => 'File was modified',
  8. IN_ATTRIB => 'Metadata changed',
  9. IN_CLOSE_WRITE => 'File opened for writing was closed',
  10. IN_CLOSE_NOWRITE => 'File not opened for writing was closed',
  11. IN_OPEN => 'File was opened',
  12. IN_MOVED_TO => 'File moved into watched directory',
  13. IN_MOVED_FROM => 'File moved out of watched directory',
  14. IN_CREATE => 'File or directory created in watched directory',
  15. IN_DELETE => 'File or directory deleted in watched directory',
  16. IN_DELETE_SELF => 'Watched file or directory was deleted',
  17. IN_MOVE_SELF => 'Watch file or directory was moved',
  18. IN_CLOSE => 'Equals to IN_CLOSE_WRITE | IN_CLOSE_NOWRITE',
  19. IN_MOVE => 'Equals to IN_MOVED_FROM | IN_MOVED_TO',
  20. IN_ALL_EVENTS => 'Bitmask of all the above constants',
  21. IN_UNMOUNT => 'File system containing watched object was unmounted',
  22. IN_Q_OVERFLOW => 'Event queue overflowed (wd is -1 for this event)',
  23. IN_IGNORED => 'Watch was removed (explicitly by inotify_rm_watch() or because file was removed or filesystem unmounted',
  24. IN_ISDIR => 'Subject of this event is a directory',
  25. IN_ONLYDIR => 'Only watch pathname if it is a directory',
  26. IN_DONT_FOLLOW => 'Do not dereference pathname if it is a symlink',
  27. IN_MASK_ADD => 'Add events to watch mask for this pathname if it already exists',
  28. IN_ONESHOT => 'Monitor pathname for one event, then remove from watch list.',
  29. 1073741840 => 'High-bit: File not opened for writing was closed',
  30. 1073741856 => 'High-bit: File was opened',
  31. 1073742080 => 'High-bit: File or directory created in watched directory',
  32. 1073742336 => 'High-bit: File or directory deleted in watched directory',
  33. ];
  34. public $fds = [];
  35. public $paths = [];
  36. public $wds = [];
  37. public $timeout = 3;
  38. public $redis=null;
  39. public $db=1;
  40. public $key='redis_to_kafka';
  41. public function __construct( $paths,$ip='127.0.0.1',$password='',$post=6379,$db='1',$key='redis_to_kafka'){
  42. if (!empty($paths)) {
  43. foreach ($paths as $path) {
  44. if (file_exists($path)) {
  45. if (is_dir($path)) {
  46. $this->addDir($path);
  47. } else {
  48. $this->addFile($path);
  49. }
  50. }
  51. }
  52. }
  53. $this->redis = new Redis();
  54. $this->db=$db;
  55. $this->key=$key;
  56. try{
  57. $this->redis->pconnect($ip,$post,2.5);
  58. $this->redis->auth($password); //设置密码
  59. $this->redis->select($this->db);
  60. $result = $this->redis->ping();
  61. if($result=='+PONG')
  62. {
  63. DahuaUtil::rlog('redis连接成功');
  64. }
  65. else
  66. {
  67. DahuaUtil::rlog("redis连接失败=>".$result.PHP_EOL);
  68. }
  69. //扫描未存起来的redis文件
  70. DahuaUtil::rlog("扫描没有监听到的文件");
  71. $this->scan_file();
  72. DahuaUtil::rlog('已经调用了清理脚本');
  73. }catch (Exception $e){
  74. DahuaUtil::rlog("redis连接异常".$e->getMessage());
  75. throw new Exception($e->getMessage());
  76. }
  77. }
  78. public function __destruct( ){
  79. if (!empty($this->fds)) {
  80. foreach ($this->fds as $fd) {
  81. fclose($fd);
  82. }
  83. }
  84. }
  85. public function addFile( $file ){
  86. $file = realpath($file);
  87. $fd = inotify_init();
  88. $fid = (int)$fd;
  89. //保存inotify资源
  90. $this->fds[$fid] = $fd;
  91. //设置为非阻塞模式
  92. stream_set_blocking($this->fds[$fid], 0);
  93. //保存文件路径
  94. $this->paths[$fid] = $file;
  95. //保存监控描述�?
  96. $this->wds[$fid] =inotify_add_watch($this->fds[$fid], $file, self::MONITOR_EVENT);
  97. }
  98. public function addDir( $dir ){
  99. $dir = realpath($dir);
  100. if ($dh = opendir($dir)) {
  101. //将目录加入监控中
  102. $fd = inotify_init();
  103. //一般文件的资源描述符是一个整形,可以用来当索�?
  104. $fid = (int)$fd;
  105. $this->fds[$fid] = $fd;
  106. stream_set_blocking($this->fds[$fid], 0);
  107. $this->paths[$fid] = $dir;
  108. $this->wds[$fid] = inotify_add_watch($this->fds[$fid], $dir, self::MONITOR_EVENT);
  109. //遍历目录下文
  110. while (($file = readdir($dh)) !== false) {
  111. if ($file == '.' || $file == '..') {
  112. continue;
  113. }
  114. $file = $dir . DIRECTORY_SEPARATOR . $file;
  115. if (is_dir($file)) {
  116. $this->addDir($file);
  117. }
  118. }
  119. closedir($dh);
  120. }
  121. }
  122. public function remove( $fid ){
  123. unset($this->paths[$fid]);
  124. fclose($this->fds[$fid]);
  125. unset($this->fds[$fid]);
  126. }
  127. public function run( ){
  128. echo '我开始运行了'.PHP_EOL;
  129. while (true) {
  130. $reads = $this->fds;
  131. $write = [];
  132. $except = [];//异常事件
  133. if (stream_select($reads, $write, $except, $this->timeout) > 0) {
  134. if (!empty($reads)) {
  135. foreach ($reads as $read) {
  136. //从可读流中读取数�?
  137. $events = inotify_read($read);
  138. //资源描述符,整形
  139. $fid = (int)$read;
  140. //获取inotify实例的路�?
  141. $path = $this->paths[$fid];
  142. foreach ($events as $event) {
  143. echo '监听读取'.PHP_EOL;
  144. $file = $path .'/' . $event['name'];
  145. if ($event['mask']==IN_CLOSE_WRITE) {
  146. echo 'create file->'.$file.PHP_EOL;
  147. //写入redis
  148. try
  149. {
  150. $this->set_redis($file);
  151. }catch(Exception $e)
  152. {
  153. DahuaUtil::rlog("redis数据库连接".$e->getMessage());
  154. throw new Exception($e->getMessage());
  155. }
  156. }
  157. //echo $event['name'], ' --- ', self::EVENT_MASK[$event['mask']], PHP_EOL;
  158. }
  159. }
  160. }
  161. if (!empty($except)) {
  162. DahuaUtil::rlog("监听异常".json_encode($except));
  163. }
  164. } else {
  165. //echo '------------------', PHP_EOL;
  166. //echo '当前监控的路径列�?, PHP_EOL;
  167. //print_r($this->paths);
  168. //echo '------------------', PHP_EOL;
  169. }
  170. }
  171. }
  172. /**
  173. * function
  174. * 文件目录
  175. * @param [type] $data
  176. * @return void
  177. */
  178. public function set_redis($data)
  179. {
  180. $this->redis->ping();
  181. if(strstr($data,'.zip'))
  182. {
  183. if(rename($data,$data.'.redis'))
  184. {
  185. $data=$data.'.redis';
  186. $this->redis->lpush($this->key,$data);
  187. }
  188. }
  189. }
  190. //扫描目录下所有没有上传到redis文件
  191. public function scan_file()
  192. {
  193. shell_exec("/home/wwwroot/nbfd_tp3/clear_file_redis.sh > /dev/null 2>&1 &");
  194. // echo shell_exec("/home/wwwroot/nbfd_tp3/clear_file_redis.sh");
  195. return;
  196. }
  197. }