123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293 |
- <?php
- namespace catcher\library\crontab;
- use Swoole\Process;
- use catcher\library\crontab\Process as MProcess;
- use Swoole\Timer;
- class Master
- {
- use RegisterSignal, MProcess, Store, Table;
-
- protected $maxNum;
-
- protected $staticNum;
-
- protected $temporaryNum = 0;
-
- protected $processes = [];
-
- protected $master_pid;
-
- protected $kernel;
-
- protected $mater = 'catch-master';
-
- protected $master_start_at;
-
- protected $daemon = false;
-
- const VERSION = '1.0.0';
-
- const WAITING = 'waiting';
-
- const BUSYING = 'busying';
-
- public function start()
- {
-
- if ($this->daemon) {
- Process::daemon(true, false);
- }
-
-
-
- $this->timeTick(1000, $this->schedule());
-
- $this->registerSignal();
-
- $this->master_pid = getmypid();
- $this->master_start_at = time();
-
- $this->init();
-
- $this->storeMasterPid($this->master_pid);
-
- $this->initProcesses();
- }
-
- protected function timeTick(int $time, $callable)
- {
-
- Timer::set([
- 'enable_coroutine' => false,
- ]);
- Timer::tick($time, $callable);
- }
-
- protected function schedule()
- {
- return function () {
- $kernel = new $this->kernel;
- foreach ($kernel->tasks() as $cron) {
- if ($cron->can()) {
- list($waiting, $process) = $this->hasWaitingProcess();
- if ($waiting) {
-
- $process->push(serialize($cron));
- } else {
-
- $this->createProcess($cron);
- }
- }
- }
- };
- }
-
- protected function createProcess(Cron $cron)
- {
- if ($this->isCanCreateTemporaryProcess()) {
- $process = new Process(function (Process $process) use ($cron) {
- $cron->run();
- $process->exit();
- });
-
- $process->start();
- $this->temporaryNum += 1;
- }
- }
-
- protected function isCanCreateTemporaryProcess()
- {
- return ($this->table->count() + $this->temporaryNum) < $this->maxNum;
- }
-
- protected function createStaticProcess()
- {
- $process = new Process($this->createProcessCallback());
-
- $process->useQueue(1, 2|Process::IPC_NOWAIT);
- return $process;
- }
-
- protected function initProcesses()
- {
- for ($i = 0; $i < $this->staticNum; $i++) {
- $process = $this->createStaticProcess();
-
- $process->start();
- $this->processes[$process->pid] = $process;
- $this->addColumn($this->getColumnKey($process->pid), $this->processInfo($process));
- }
- }
-
- protected function getColumnKey($pid)
- {
- return 'process_'. $pid;
- }
-
- protected function init()
- {
- $this->staticNum = config('catch.schedule.static_worker_number');
- $this->maxNum = config('catch.schedule.max_worker_number');
- $this->initLog();
- file_put_contents($this->getSaveProcessStatusFile(), '');
- $this->createTable();
- $this->kernel = config('catch.schedule.schedule_kernel');
- }
-
- protected function initLog()
- {
- $channels = config('log.channels');
- $channels['schedule'] = config('catch.schedule.log');;
- config([
- 'channels' => $channels,
- 'default' => 'schedule',
- ], 'log');
- }
-
- public function daemon()
- {
- $this->daemon = true;
- return $this;
- }
- }
|