Master.php 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. <?php
  2. // +----------------------------------------------------------------------
  3. // | CatchAdmin [Just Like ~ ]
  4. // +----------------------------------------------------------------------
  5. // | Copyright (c) 2017~2020 http://catchadmin.com All rights reserved.
  6. // +----------------------------------------------------------------------
  7. // | Licensed ( https://github.com/yanwenwu/catch-admin/blob/master/LICENSE.txt )
  8. // +----------------------------------------------------------------------
  9. // | Author: JaguarJack [ njphper@gmail.com ]
  10. // +----------------------------------------------------------------------
  11. namespace catcher\library\crontab;
  12. use Swoole\Process;
  13. use catcher\library\crontab\Process as MProcess;
  14. use Swoole\Timer;
  15. class Master
  16. {
  17. use RegisterSignal, MProcess, Store, Table;
  18. /**
  19. * 动态扩展的最大 process 数量
  20. *
  21. * @var int
  22. */
  23. protected $maxNum;
  24. /**
  25. * 常驻 process
  26. *
  27. * @var int
  28. */
  29. protected $staticNum;
  30. /**
  31. * 临时进程数量
  32. *
  33. * @var int
  34. */
  35. protected $temporaryNum = 0;
  36. /**
  37. * 存储 process 信息
  38. *
  39. * @var array
  40. */
  41. protected $processes = [];
  42. /**
  43. * 主进程ID
  44. *
  45. * @var
  46. */
  47. protected $master_pid;
  48. /**
  49. * @var string
  50. */
  51. protected $kernel;
  52. /**
  53. * pid 文件名称
  54. *
  55. * @var string
  56. */
  57. protected $mater = 'catch-master';
  58. /**
  59. * @var int
  60. */
  61. protected $master_start_at;
  62. /**
  63. * @var bool
  64. */
  65. protected $daemon = false;
  66. // 版本
  67. const VERSION = '1.0.0';
  68. // process 等待状态
  69. const WAITING = 'waiting';
  70. // process 繁忙状态
  71. const BUSYING = 'busying';
  72. /**
  73. * 启动进程
  74. *
  75. * @time 2020年07月07日
  76. * @return void
  77. */
  78. public function start()
  79. {
  80. // 守护进程
  81. if ($this->daemon) {
  82. Process::daemon(true, false);
  83. }
  84. // alarm 信号
  85. // Process::alarm(1000 * 1000);
  86. // 1s 调度一次
  87. $this->timeTick(1000, $this->schedule());
  88. // 注册信号
  89. $this->registerSignal();
  90. // pid
  91. $this->master_pid = getmypid();
  92. $this->master_start_at = time();
  93. // 初始化
  94. $this->init();
  95. // 存储 pid
  96. $this->storeMasterPid($this->master_pid);
  97. // 初始化进程
  98. $this->initProcesses();
  99. }
  100. /**
  101. * 自定义 tick 关闭协程
  102. *
  103. * @time 2020年07月08日
  104. * @param int $time
  105. * @param $callable
  106. * @return void
  107. */
  108. protected function timeTick(int $time, $callable)
  109. {
  110. // 关闭协程
  111. Timer::set([
  112. 'enable_coroutine' => false,
  113. ]);
  114. Timer::tick($time, $callable);
  115. }
  116. /**
  117. * 调度
  118. *
  119. * @time 2020年07月07日
  120. * @return \Closure
  121. */
  122. protected function schedule()
  123. {
  124. return function () {
  125. $kernel = new $this->kernel;
  126. foreach ($kernel->tasks() as $cron) {
  127. if ($cron->can()) {
  128. list($waiting, $process) = $this->hasWaitingProcess();
  129. if ($waiting) {
  130. // 向 process 投递 cron
  131. $process->push(serialize($cron));
  132. } else {
  133. // 创建临时 process 处理,处理完自动销毁
  134. $this->createProcess($cron);
  135. }
  136. }
  137. }
  138. };
  139. }
  140. /**
  141. * Create Task
  142. *
  143. * @time 2019年08月06日
  144. * @param Cron $cron
  145. * @return void
  146. */
  147. protected function createProcess(Cron $cron)
  148. {
  149. if ($this->isCanCreateTemporaryProcess()) {
  150. $process = new Process(function (Process $process) use ($cron) {
  151. $cron->run();
  152. $process->exit();
  153. });
  154. // $process->name(sprintf('worker: '));
  155. $process->start();
  156. $this->temporaryNum += 1;
  157. }
  158. }
  159. /**
  160. * 是否可以创建临时进程
  161. *
  162. * @time 2020年07月09日
  163. * @return bool
  164. */
  165. protected function isCanCreateTemporaryProcess()
  166. {
  167. return ($this->table->count() + $this->temporaryNum) < $this->maxNum;
  168. }
  169. /**
  170. * 创建静态 worker 进程
  171. *
  172. * @time 2020年07月05日
  173. * @return Process
  174. */
  175. protected function createStaticProcess()
  176. {
  177. $process = new Process($this->createProcessCallback());
  178. // 使用非阻塞队列
  179. $process->useQueue(1, 2|Process::IPC_NOWAIT);
  180. return $process;
  181. }
  182. /**
  183. * 初始化 workers
  184. *
  185. * @time 2020年07月03日
  186. * @return void
  187. */
  188. protected function initProcesses()
  189. {
  190. for ($i = 0; $i < $this->staticNum; $i++) {
  191. $process = $this->createStaticProcess();
  192. // $worker->name("[$i+1]catch-worker");
  193. $process->start();
  194. $this->processes[$process->pid] = $process;
  195. $this->addColumn($this->getColumnKey($process->pid), $this->processInfo($process));
  196. }
  197. }
  198. /**
  199. * 栏目 KEY
  200. *
  201. * @time 2020年07月09日
  202. * @param $pid
  203. * @return string
  204. */
  205. protected function getColumnKey($pid)
  206. {
  207. return 'process_'. $pid;
  208. }
  209. /**
  210. * 初始化文件
  211. *
  212. * @time 2020年07月09日
  213. * @return void
  214. */
  215. protected function init()
  216. {
  217. $this->staticNum = config('catch.schedule.static_worker_number');
  218. $this->maxNum = config('catch.schedule.max_worker_number');
  219. $this->initLog();
  220. file_put_contents($this->getSaveProcessStatusFile(), '');
  221. $this->createTable();
  222. $this->kernel = config('catch.schedule.schedule_kernel');
  223. }
  224. /**
  225. * 日志初始化
  226. *
  227. * @time 2020年07月09日
  228. * @return void
  229. */
  230. protected function initLog()
  231. {
  232. $channels = config('log.channels');
  233. $channels['schedule'] = config('catch.schedule.log');;
  234. config([
  235. 'channels' => $channels,
  236. 'default' => 'schedule',
  237. ], 'log');
  238. }
  239. /**
  240. * 开启 debug
  241. *
  242. * @time 2020年07月09日
  243. * @return $this
  244. */
  245. public function daemon()
  246. {
  247. $this->daemon = true;
  248. return $this;
  249. }
  250. }