diff --git a/config/app.example.php b/config/app.example.php index 9c0cdbf4..ce4a0395 100644 --- a/config/app.example.php +++ b/config/app.example.php @@ -13,6 +13,9 @@ 'workerLifetime' => 60, // 1 minutes // Legacy: 'workermaxruntime' is deprecated but still supported + // optional random offset (0-N seconds) added per worker to stagger shutdowns in a fleet (0 = disabled) + 'workerLifetimeJitter' => 0, + // seconds of running time after which the PHP process will terminate, null uses workerLifetime * 2 'workerPhpTimeout' => null, // Legacy: 'workertimeout' is deprecated but still supported diff --git a/docs/sections/configuration.md b/docs/sections/configuration.md index 7573d5c4..5d6af088 100644 --- a/docs/sections/configuration.md +++ b/docs/sections/configuration.md @@ -49,6 +49,14 @@ You may create a file called `app_queue.php` inside your `config` folder (NOT th bin/cake queue run --max-runtime 300 # Run for 5 minutes ``` +- Optional per-worker jitter (in seconds) added to the worker lifetime: + + ```php + $config['Queue']['workerLifetimeJitter'] = 30; // up to +30s random offset per worker + ``` + + Each worker picks a random offset in `[0, workerLifetimeJitter]` at startup and adds it to its effective lifetime. Useful when many workers are spawned simultaneously (e.g. ECS/Kubernetes) to stagger shutdowns and avoid a thundering herd of concurrent restarts. Defaults to `0` (no jitter). + - Seconds of running time after which the PHP process of the worker will terminate: ```php diff --git a/src/Queue/Processor.php b/src/Queue/Processor.php index ea022074..9b14168c 100644 --- a/src/Queue/Processor.php +++ b/src/Queue/Processor.php @@ -197,6 +197,10 @@ public function run(array $args): int { $this->exit = false; $startTime = time(); + $jitterOffset = $this->computeLifetimeJitterOffset(); + if ($jitterOffset > 0) { + $this->io->out('Applying worker lifetime jitter: +' . $jitterOffset . ' seconds'); + } while (!$this->exit) { $this->setPhpTimeout($config['maxruntime']); @@ -237,6 +241,9 @@ public function run(array $args): int { throw new RuntimeException('Queue.workerLifetime (or deprecated workermaxruntime) config is required'); } $maxRuntime = $config['maxruntime'] ?? (int)$workerLifetime; + if ($maxRuntime > 0 && $jitterOffset > 0) { + $maxRuntime += $jitterOffset; + } // check if we are over the maximum runtime and end processing if so. if ($maxRuntime > 0 && (time() - $startTime) >= $maxRuntime) { $this->exit = true; @@ -626,6 +633,24 @@ protected function setPhpTimeout(?int $maxruntime): void { set_time_limit($timeLimit); } + /** + * Compute the per-worker lifetime jitter offset in seconds. + * + * Returns a random integer in [0, Queue.workerLifetimeJitter]. Used to stagger + * worker shutdowns so a fleet spawned at the same moment does not all exit + * on the same tick (thundering herd). + * + * @return int + */ + protected function computeLifetimeJitterOffset(): int { + $jitter = (int)Configure::read('Queue.workerLifetimeJitter', 0); + if ($jitter <= 0) { + return 0; + } + + return mt_rand(0, $jitter); + } + /** * @param array $args * diff --git a/tests/TestCase/Queue/ProcessorTest.php b/tests/TestCase/Queue/ProcessorTest.php index 4e3766d7..bd3a3ecc 100644 --- a/tests/TestCase/Queue/ProcessorTest.php +++ b/tests/TestCase/Queue/ProcessorTest.php @@ -442,4 +442,51 @@ public function testSetPhpTimeoutWithDeprecatedConfig() { Configure::delete('Queue.workertimeout'); } + /** + * @return void + */ + public function testComputeLifetimeJitterOffsetDefaultsToZero() { + $processor = new Processor(new Io(new ConsoleIo()), new NullLogger()); + + Configure::delete('Queue.workerLifetimeJitter'); + $result = $this->invokeMethod($processor, 'computeLifetimeJitterOffset'); + $this->assertSame(0, $result); + + Configure::write('Queue.workerLifetimeJitter', 0); + $result = $this->invokeMethod($processor, 'computeLifetimeJitterOffset'); + $this->assertSame(0, $result); + + Configure::delete('Queue.workerLifetimeJitter'); + } + + /** + * @return void + */ + public function testComputeLifetimeJitterOffsetWithinBounds() { + $processor = new Processor(new Io(new ConsoleIo()), new NullLogger()); + + Configure::write('Queue.workerLifetimeJitter', 15); + for ($i = 0; $i < 50; $i++) { + $result = $this->invokeMethod($processor, 'computeLifetimeJitterOffset'); + $this->assertIsInt($result); + $this->assertGreaterThanOrEqual(0, $result); + $this->assertLessThanOrEqual(15, $result); + } + + Configure::delete('Queue.workerLifetimeJitter'); + } + + /** + * @return void + */ + public function testComputeLifetimeJitterOffsetIgnoresNegative() { + $processor = new Processor(new Io(new ConsoleIo()), new NullLogger()); + + Configure::write('Queue.workerLifetimeJitter', -10); + $result = $this->invokeMethod($processor, 'computeLifetimeJitterOffset'); + $this->assertSame(0, $result); + + Configure::delete('Queue.workerLifetimeJitter'); + } + }