From 05ba673bc1045abac496ff8bf85cd1a969d82a72 Mon Sep 17 00:00:00 2001 From: charukiewicz Date: Fri, 8 May 2015 09:59:07 -0500 Subject: [PATCH 01/35] added an option in config for workers to print a timestamp each time they begin looking for a job --- Config/queue.php | 5 ++++- Console/Command/QueueShell.php | 6 +++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/Config/queue.php b/Config/queue.php index fc033948..d78b6b93 100644 --- a/Config/queue.php +++ b/Config/queue.php @@ -37,5 +37,8 @@ 'log' => true, // set to false to disable (tmp = file in TMP dir) - 'notify' => 'tmp' + 'notify' => 'tmp', + + // prints a timestamp each time a worker begins looking for a job + 'workertimestamp' => false ]; diff --git a/Console/Command/QueueShell.php b/Console/Command/QueueShell.php index b77a0996..e126af31 100644 --- a/Console/Command/QueueShell.php +++ b/Console/Command/QueueShell.php @@ -200,7 +200,11 @@ public function runworker() { touch($pidFilePath . $pidFileName); } $this->_log('runworker', isset($pid) ? $pid : null); - $this->out('Looking for Job....'); + if (Configure::read('Queue.workertimestamp')) { + $this->out('['.date('Y-m-d H:i:s').'] Looking for Job....'); + } else { + $this->out('Looking for Job....'); + } $data = $this->QueuedTask->requestJob($this->_getTaskConf(), $group); if ($this->QueuedTask->exit === true) { $this->_exit = true; From 62858f44d7002aba93e463eee62f328935405038 Mon Sep 17 00:00:00 2001 From: charukiewicz Date: Fri, 8 May 2015 10:04:17 -0500 Subject: [PATCH 02/35] fixed indentation --- Console/Command/QueueShell.php | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Console/Command/QueueShell.php b/Console/Command/QueueShell.php index e126af31..bf27b72a 100644 --- a/Console/Command/QueueShell.php +++ b/Console/Command/QueueShell.php @@ -200,11 +200,11 @@ public function runworker() { touch($pidFilePath . $pidFileName); } $this->_log('runworker', isset($pid) ? $pid : null); - if (Configure::read('Queue.workertimestamp')) { - $this->out('['.date('Y-m-d H:i:s').'] Looking for Job....'); - } else { - $this->out('Looking for Job....'); - } + if (Configure::read('Queue.workertimestamp')) { + $this->out('['.date('Y-m-d H:i:s').'] Looking for Job....'); + } else { + $this->out('Looking for Job....'); + } $data = $this->QueuedTask->requestJob($this->_getTaskConf(), $group); if ($this->QueuedTask->exit === true) { $this->_exit = true; From 7b9e2e2d08e6b2846f68688fd8ba287ec9f9b1a5 Mon Sep 17 00:00:00 2001 From: Mark Scherer Date: Fri, 8 May 2015 18:34:54 +0200 Subject: [PATCH 03/35] Fix travis --- README.md | 4 +-- .../Controller/CronTasksControllerTest.php | 31 ------------------- 2 files changed, 2 insertions(+), 33 deletions(-) delete mode 100644 Test/Case/Controller/CronTasksControllerTest.php diff --git a/README.md b/README.md index 3c551b49..345bb84e 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # CakePHP Queue Plugin -[![Build Status](https://api.travis-ci.org/dereuromark/cakephp-queue.png)](https://travis-ci.org/dereuromark/cakephp-queue) +[![Build Status](https://api.travis-ci.org/dereuromark/cakephp-queue.svg)](https://travis-ci.org/dereuromark/cakephp-queue) [![Minimum PHP Version](http://img.shields.io/badge/php-%3E%3D%205.4-8892BF.svg)](https://php.net/) -[![License](https://poser.pugx.org/dereuromark/cakephp-queue/license.png)](https://packagist.org/packages/dereuromark/cakephp-queue) +[![License](https://poser.pugx.org/dereuromark/cakephp-queue/license)](https://packagist.org/packages/dereuromark/cakephp-queue) [![Total Downloads](https://poser.pugx.org/dereuromark/cakephp-queue/d/total.png)](https://packagist.org/packages/dereuromark/cakephp-queue) Modified by Mark Scherer ([dereuromark](https://github.com/dereuromark)) diff --git a/Test/Case/Controller/CronTasksControllerTest.php b/Test/Case/Controller/CronTasksControllerTest.php deleted file mode 100644 index 9eb129e6..00000000 --- a/Test/Case/Controller/CronTasksControllerTest.php +++ /dev/null @@ -1,31 +0,0 @@ -CronTasks = new TestCronTasksController(new CakeRequest, new CakeResponse); - $this->CronTasks->constructClasses(); - } - - public function testObject() { - $this->assertInstanceOf('CronTasksController', $this->CronTasks); - } - -} - -class TestCronTasksController extends CronTasksController { - - public $autoRender = false; - - public function redirect($url, $status = null, $exit = true) { - $this->redirectUrl = $url; - } - -} From 44d66f7c40ab6aab8880ac7e65f6dc3ed71f2500 Mon Sep 17 00:00:00 2001 From: charukiewicz Date: Fri, 8 May 2015 11:35:25 -0500 Subject: [PATCH 04/35] fixed config indentation --- Config/queue.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Config/queue.php b/Config/queue.php index d78b6b93..e588050e 100644 --- a/Config/queue.php +++ b/Config/queue.php @@ -39,6 +39,6 @@ // set to false to disable (tmp = file in TMP dir) 'notify' => 'tmp', - // prints a timestamp each time a worker begins looking for a job - 'workertimestamp' => false + // prints a timestamp each time a worker begins looking for a job + 'workertimestamp' => false ]; From 65f44818788e34e6b8684a5ad9f765375c3e8a3f Mon Sep 17 00:00:00 2001 From: Mark Scherer Date: Fri, 8 May 2015 18:46:49 +0200 Subject: [PATCH 05/35] always show timestamp --- Config/queue.php | 3 --- Console/Command/QueueShell.php | 13 +++++-------- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/Config/queue.php b/Config/queue.php index e588050e..f8f6a3ec 100644 --- a/Config/queue.php +++ b/Config/queue.php @@ -38,7 +38,4 @@ // set to false to disable (tmp = file in TMP dir) 'notify' => 'tmp', - - // prints a timestamp each time a worker begins looking for a job - 'workertimestamp' => false ]; diff --git a/Console/Command/QueueShell.php b/Console/Command/QueueShell.php index bf27b72a..9955fcf1 100644 --- a/Console/Command/QueueShell.php +++ b/Console/Command/QueueShell.php @@ -43,7 +43,7 @@ public function initialize() { foreach ($paths as $path) { $Folder = new Folder($path); - $res = array_merge($this->tasks, $Folder->find('Queue.*\.php')); + $res = array_merge($this->tasks, $Folder->find('Queue.+\.php')); foreach ($res as &$r) { $r = basename($r, 'Task.php'); } @@ -54,7 +54,7 @@ public function initialize() { $pluginPaths = App::path('Console/Command/Task', $plugin); foreach ($pluginPaths as $pluginPath) { $Folder = new Folder($pluginPath); - $res = $Folder->find('Queue.*Task\.php'); + $res = $Folder->find('Queue.+Task\.php'); foreach ($res as &$r) { $r = $plugin . '.' . basename($r, 'Task.php'); } @@ -200,11 +200,8 @@ public function runworker() { touch($pidFilePath . $pidFileName); } $this->_log('runworker', isset($pid) ? $pid : null); - if (Configure::read('Queue.workertimestamp')) { - $this->out('['.date('Y-m-d H:i:s').'] Looking for Job....'); - } else { - $this->out('Looking for Job....'); - } + $this->out('[' . date('Y-m-d H:i:s') . '] Looking for Job ...'); + $data = $this->QueuedTask->requestJob($this->_getTaskConf(), $group); if ($this->QueuedTask->exit === true) { $this->_exit = true; @@ -352,7 +349,7 @@ public function getOptionParser() { ]; return parent::getOptionParser() - ->description(__d('cake_console', "...")) + ->description(__d('cake_console', "Simple and minimalistic job queue (or deferred-task) system.")) ->addSubcommand('clean', [ 'help' => __d('cake_console', 'Remove old jobs (cleanup)'), 'parser' => $subcommandParser From 263333e298ca0464dd7c96ecf2c0969cfdbb9541 Mon Sep 17 00:00:00 2001 From: Mark Scherer Date: Thu, 14 May 2015 19:38:15 +0200 Subject: [PATCH 06/35] travis --- .travis.yml | 64 ++++++++++++---------------------- View/CronTasks/admin_index.ctp | 2 +- 2 files changed, 24 insertions(+), 42 deletions(-) diff --git a/.travis.yml b/.travis.yml index ec075d36..c0873771 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,56 +6,38 @@ php: - 5.6 env: - - CAKE_VERSION=2.6 - - CAKE_VERSION=2.7 + global: + - REPO_NAME=cakephp-queue + - PLUGIN_NAME=Queue + - REQUIRE="" + + matrix: + - DB=mysql CAKE_VERSION=2.6 + - DB=mysql CAKE_VERSION=2.7 matrix: include: - php: 5.4 env: - - PHPCS=1 - - allow_failures: - - php: 5.4 - env: - - PHPCS=1 + - DB=mysql CAKE_VERSION=2.6 COVERALLS=1 before_script: - - sh -c "if [ '$PHPCS' != '1' ]; then git clone --depth 1 --branch $CAKE_VERSION git://github.com/cakephp/cakephp ../cakephp; else git clone --depth 1 --branch 2.6 git://github.com/cakephp/cakephp ../cakephp; fi" && cd ../cakephp - - rm -R app - - git clone --depth 1 --branch master git://github.com/dereuromark/tools-app app - - git clone --depth 1 --branch master git://github.com/dereuromark/cakephp-tools plugins/Tools - - git clone --depth 1 --branch master git://github.com/dereuromark/cakephp-shim plugins/Shim - - cp -R ../cakephp-queue plugins/Queue - - sh -c "mysql -e 'CREATE DATABASE cakephp_test;'" - - mkdir ./app/tmp - - mkdir ./app/tmp/logs - - mkdir ./app/tmp/queue - - mkdir ./app/tmp/tests - - mkdir ./app/tmp/cache - - mkdir ./app/tmp/cache/persistent - - mkdir ./app/tmp/cache/models - - chmod -R 0777 ./app/tmp - - sh -c "if [ '$PHPCS' = '1' ]; then pear channel-discover pear.cakephp.org; fi" - - sh -c "if [ '$PHPCS' = '1' ]; then pear install --alldeps cakephp/CakePHP_CodeSniffer; fi" - - phpenv rehash - - echo " 'Database/Mysql', - 'database' => 'cakephp_test', - 'host' => '0.0.0.0', - 'login' => 'travis', - 'persistent' => false, - ); - }" > app/Config/database.php - - sh -c "if [ '$TRAVIS_PHP_VERSION' != '5.2' ]; then composer global require 'phpunit/phpunit=3.7.33'; fi" - - sh -c "if [ '$TRAVIS_PHP_VERSION' != '5.2' ]; then ln -s ~/.composer/vendor/phpunit/phpunit/PHPUnit ./vendors/PHPUnit; fi" - - cd app + - git clone -b master https://github.com/FriendsOfCake/travis.git --depth 1 ../travis + - ../travis/before_script.sh + - cd ../cakephp/app + - composer require --dev --no-interaction --prefer-source dereuromark/cakephp-tools:0.* + - echo " Model/AppModel.php + - echo " Controller/AppController.php + - echo " Config/email.php + - echo "Configure::write('Security.salt', 'AycG93b0qyJfIxfs2guVoUubWwvniR2G0FgaC9mi');" >> Config/bootstrap.php + - echo "Configure::write('Security.cipherSeed', '16659201697453542496749683615');" >> Config/bootstrap.php + - cd .. script: - - sh -c "if [ '$PHPCS' != '1' ]; then ../lib/Cake/Console/cake test Queue AllQueue --stderr; fi" - - sh -c "if [ '$PHPCS' = '1' ]; then cd ..; phpcs -p --extensions=php --standard=CakePHP ./plugins/Queue; fi" + - ../travis/script.sh + +after_success: + - ../travis/after_success.sh notifications: email: false diff --git a/View/CronTasks/admin_index.ctp b/View/CronTasks/admin_index.ctp index a8171f76..4acc4eeb 100644 --- a/View/CronTasks/admin_index.ctp +++ b/View/CronTasks/admin_index.ctp @@ -56,7 +56,7 @@ foreach ($cronTasks as $cronTask):
-element('pagination', [], ['plugin'=>'tools']); ?>
+element('Tools.pagination'); ?> From d19a892fd6a84fbbffa9da392192338805c35789 Mon Sep 17 00:00:00 2001 From: Mark Scherer Date: Thu, 14 May 2015 19:47:14 +0200 Subject: [PATCH 07/35] badge --- README.md | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 345bb84e..0d6254fb 100644 --- a/README.md +++ b/README.md @@ -1,22 +1,10 @@ # CakePHP Queue Plugin -[![Build Status](https://api.travis-ci.org/dereuromark/cakephp-queue.svg)](https://travis-ci.org/dereuromark/cakephp-queue) +[![Build Status](https://api.travis-ci.org/dereuromark/cakephp-queue.svg?branch=2.x)](https://travis-ci.org/dereuromark/cakephp-queue) [![Minimum PHP Version](http://img.shields.io/badge/php-%3E%3D%205.4-8892BF.svg)](https://php.net/) [![License](https://poser.pugx.org/dereuromark/cakephp-queue/license)](https://packagist.org/packages/dereuromark/cakephp-queue) [![Total Downloads](https://poser.pugx.org/dereuromark/cakephp-queue/d/total.png)](https://packagist.org/packages/dereuromark/cakephp-queue) -Modified by Mark Scherer ([dereuromark](https://github.com/dereuromark)) -- CakePHP2.x support -- Some minor fixes -- Added crontasks (as a different approach on specific problems) -- Possible (optional) Tools Plugin dependencies for frontend access via /admin/queue -- Config key "queue" is now "Queue" ($config['Queue'][...]) - -Added by Christian Charukiewicz ([charukiewicz](https://github.com/charukiewicz)): -- Configuration option 'gcprop' is now 'gcprob' -- Fixed typo in README and variable name (Propability -> Probability) -- Added a few lines about createJob() usage to README -- Added comments to queue.php explaining configuration options - +This branch is for use with **CakePHP 2**. ## Background: @@ -164,3 +152,19 @@ Play around with it, but just don't shoot over the top. * Add priority * Cleanup and better test coverage + +### History + +Modified by Mark Scherer ([dereuromark](https://github.com/dereuromark)) +- CakePHP2.x support +- Some minor fixes +- Added crontasks (as a different approach on specific problems) +- Possible (optional) Tools Plugin dependencies for frontend access via /admin/queue +- Config key "queue" is now "Queue" ($config['Queue'][...]) + +Added by Christian Charukiewicz ([charukiewicz](https://github.com/charukiewicz)): +- Configuration option 'gcprop' is now 'gcprob' +- Fixed typo in README and variable name (Propability -> Probability) +- Added a few lines about createJob() usage to README +- Added comments to queue.php explaining configuration options + From 61ac1db7f3b6560c0f3fd3c1d7aa7b857279fa8d Mon Sep 17 00:00:00 2001 From: Mark Scherer Date: Fri, 31 Jul 2015 13:11:30 +0200 Subject: [PATCH 08/35] Use Flash component --- Controller/QueueController.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Controller/QueueController.php b/Controller/QueueController.php index 364e8d60..173127c9 100644 --- a/Controller/QueueController.php +++ b/Controller/QueueController.php @@ -43,7 +43,7 @@ public function admin_index() { public function admin_reset() { $this->request->allowMethod('post'); $res = $this->QueuedTask->truncate(); - + if ($res) { $message = __d('queue', 'OK'); $class = 'success'; @@ -51,9 +51,9 @@ public function admin_reset() { $message = __d('queue', 'Error'); $class = 'error'; } - + if (isset($this->Flash)) { - $this->Flash->message($message, $class); + $this->Flash->$class($message); } else { $this->Session->setFlash($message, 'default', ['class' => $class]); } From f1b9d9c3b0eff8b4862282ac92a2465bd83169f0 Mon Sep 17 00:00:00 2001 From: Mark Scherer Date: Fri, 31 Jul 2015 13:22:33 +0200 Subject: [PATCH 09/35] remove cake_console domain --- Console/Command/QueueShell.php | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/Console/Command/QueueShell.php b/Console/Command/QueueShell.php index 9955fcf1..dfda26f0 100644 --- a/Console/Command/QueueShell.php +++ b/Console/Command/QueueShell.php @@ -330,12 +330,12 @@ public function getOptionParser() { /* 'dry-run'=> array( 'short' => 'd', - 'help' => __d('cake_console', 'Dry run the update, no jobs will actually be added.'), + 'help' => 'Dry run the update, no jobs will actually be added.', 'boolean' => true ), 'log'=> array( 'short' => 'l', - 'help' => __d('cake_console', 'Log all ouput to file log.txt in TMP dir'), + 'help' => 'Log all ouput to file log.txt in TMP dir', 'boolean' => true ), */ @@ -344,30 +344,30 @@ public function getOptionParser() { $subcommandParserFull = $subcommandParser; $subcommandParserFull['options']['group'] = [ 'short' => 'g', - 'help' => __d('cake_console', 'Group'), + 'help' => 'Group', 'default' => '' ]; return parent::getOptionParser() ->description(__d('cake_console', "Simple and minimalistic job queue (or deferred-task) system.")) ->addSubcommand('clean', [ - 'help' => __d('cake_console', 'Remove old jobs (cleanup)'), + 'help' => 'Remove old jobs (cleanup)', 'parser' => $subcommandParser ]) ->addSubcommand('add', [ - 'help' => __d('cake_console', 'Add Job'), + 'help' => 'Add Job', 'parser' => $subcommandParser ]) ->addSubcommand('install', [ - 'help' => __d('cake_console', 'Install info'), + 'help' => 'Install info', 'parser' => $subcommandParser ]) ->addSubcommand('uninstall', [ - 'help' => __d('cake_console', 'Uninstall info'), + 'help' => 'Uninstall info', 'parser' => $subcommandParser ]) ->addSubcommand('runworker', [ - 'help' => __d('cake_console', 'Run Worker'), + 'help' => 'Run Worker', 'parser' => $subcommandParserFull ]); } From 4fc72a1ba8595e2b28b4121716157f83df384ea4 Mon Sep 17 00:00:00 2001 From: Mark Scherer Date: Thu, 27 Aug 2015 20:03:29 +0200 Subject: [PATCH 10/35] travis --- .travis.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index c0873771..49d117db 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,14 +12,14 @@ env: - REQUIRE="" matrix: - - DB=mysql CAKE_VERSION=2.6 - DB=mysql CAKE_VERSION=2.7 + - DB=mysql CAKE_VERSION=2.8 matrix: include: - php: 5.4 env: - - DB=mysql CAKE_VERSION=2.6 COVERALLS=1 + - DB=mysql CAKE_VERSION=2.7 COVERALLS=1 before_script: - git clone -b master https://github.com/FriendsOfCake/travis.git --depth 1 ../travis From 99b73566b7741ad19878edf369fe02d8af17c372 Mon Sep 17 00:00:00 2001 From: Neil Holcomb Date: Sat, 31 Oct 2015 09:08:21 -0700 Subject: [PATCH 11/35] fixed spelling mistake changed "hte" to "the" --- Console/Command/CronShell.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Console/Command/CronShell.php b/Console/Command/CronShell.php index 4ab8ec1f..cbf3c03a 100644 --- a/Console/Command/CronShell.php +++ b/Console/Command/CronShell.php @@ -110,7 +110,7 @@ public function main() { } /** - * Look for a Queue Task of hte passed name and try to call add() on it. + * Look for a Queue Task of the passed name and try to call add() on it. * A QueueTask may provide an add function to enable the user to create new jobs via commandline. * * @return void From 19b303f3bc8c7e122001f8c7d2eb2ebe735baf4e Mon Sep 17 00:00:00 2001 From: Mark Scherer Date: Tue, 15 Dec 2015 00:15:30 +0100 Subject: [PATCH 12/35] Remove recursive property calls. --- Controller/CronTasksController.php | 2 -- 1 file changed, 2 deletions(-) diff --git a/Controller/CronTasksController.php b/Controller/CronTasksController.php index bef03d86..f0471933 100644 --- a/Controller/CronTasksController.php +++ b/Controller/CronTasksController.php @@ -20,7 +20,6 @@ public function beforeFilter() { * @return void */ public function index() { - $this->CronTask->recursive = 0; $cronTasks = $this->paginate(); $this->set(compact('cronTasks')); } @@ -111,7 +110,6 @@ public function delete($id = null) { * @return void */ public function admin_index() { - $this->CronTask->recursive = 0; $cronTasks = $this->paginate(); $this->set(compact('cronTasks')); } From dac8a04794600e5a1a7b5f94fbe71ce4a2651ad1 Mon Sep 17 00:00:00 2001 From: Mark Scherer Date: Sat, 26 Dec 2015 04:11:33 +0100 Subject: [PATCH 13/35] Cleanup --- Console/Command/QueueTestShell.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Console/Command/QueueTestShell.php b/Console/Command/QueueTestShell.php index bb386e67..262f3635 100644 --- a/Console/Command/QueueTestShell.php +++ b/Console/Command/QueueTestShell.php @@ -1,5 +1,7 @@ to('markscherer@gmx.de', 'Mark Test'); From 1a3dfafcde5c36d9610df95e5b28502d1da72528 Mon Sep 17 00:00:00 2001 From: Mark Scherer Date: Sun, 10 Jan 2016 03:59:02 +0100 Subject: [PATCH 14/35] Cleanup --- Model/CronTask.php | 12 ++++++------ View/CronTasks/admin_index.ctp | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/Model/CronTask.php b/Model/CronTask.php index 5c5b0057..58916505 100644 --- a/Model/CronTask.php +++ b/Model/CronTask.php @@ -21,14 +21,14 @@ class CronTask extends QueueAppModel { public $validate = [ 'jobtype' => [ - 'notempty' => [ - 'rule' => ['notempty'], + 'notEmpty' => [ + 'rule' => ['notEmpty'], 'message' => 'valErrMandatoryField', ], ], 'name' => [ - 'notempty' => [ - 'rule' => ['notempty'], + 'notEmpty' => [ + 'rule' => ['notEmpty'], 'message' => 'valErrMandatoryField', 'last' => true ], @@ -39,8 +39,8 @@ class CronTask extends QueueAppModel { ], ], 'title' => [ - 'notempty' => [ - 'rule' => ['notempty'], + 'notEmpty' => [ + 'rule' => ['notEmpty'], 'message' => 'valErrMandatoryField', 'last' => true ], diff --git a/View/CronTasks/admin_index.ctp b/View/CronTasks/admin_index.ctp index 4acc4eeb..9b9e9aaa 100644 --- a/View/CronTasks/admin_index.ctp +++ b/View/CronTasks/admin_index.ctp @@ -42,7 +42,7 @@ foreach ($cronTasks as $cronTask):
date(FORMAT_DB_DATETIME)) { - echo $this->Format->cIcon(ICON_WARNING, 'Achtung'); + echo $this->Format->cIcon(ICON_WARNING, ['title' => __('Warning')]); } ?> @@ -66,4 +66,4 @@ foreach ($cronTasks as $cronTask):
  • Html->link(__d('queue', 'Add %s', __d('queue', 'Cron Task')), ['action' => 'add']); ?>
- \ No newline at end of file + From b30bb9cfacc6e842c5f9b89b6e26a7ec0eb14474 Mon Sep 17 00:00:00 2001 From: Neil Holcomb Date: Thu, 7 Apr 2016 12:49:07 -0700 Subject: [PATCH 15/35] add priorty to queue tasks --- ...0_adding_priority_field_to_queue_tasks.php | 60 +++++++++++++++++++ Model/CronTask.php | 1 + Model/QueuedTask.php | 15 ++++- 3 files changed, 74 insertions(+), 2 deletions(-) create mode 100644 Config/Migration/1453822680_adding_priority_field_to_queue_tasks.php diff --git a/Config/Migration/1453822680_adding_priority_field_to_queue_tasks.php b/Config/Migration/1453822680_adding_priority_field_to_queue_tasks.php new file mode 100644 index 00000000..0226a984 --- /dev/null +++ b/Config/Migration/1453822680_adding_priority_field_to_queue_tasks.php @@ -0,0 +1,60 @@ + array( + 'create_field' => [ + 'queued_tasks' => [ + 'priority' => [ + 'type' => 'integer', + 'null' => false, + 'default' => 5, + 'length' => 4 + ], + 'indexes' => array( + 'priority' => array('column' => 'priority'), + ), + ], + ] + ), + 'down' => array( + 'drop_field' => [ + 'queued_tasks' => [ + 'priority' + ], + ] + ), + ); + +/** + * Before migration callback + * + * @param string $direction Direction of migration process (up or down) + * @return bool Should process continue + */ + public function before($direction) { + return true; + } + +/** + * After migration callback + * + * @param string $direction Direction of migration process (up or down) + * @return bool Should process continue + */ + public function after($direction) { + return true; + } +} diff --git a/Model/CronTask.php b/Model/CronTask.php index 58916505..552b5ee5 100644 --- a/Model/CronTask.php +++ b/Model/CronTask.php @@ -132,6 +132,7 @@ public function requestJob($capabilities, $group = null) { 'timediff(NOW(),notbefore) AS age' ], 'order' => [ + 'priority ASC', 'age DESC', 'id ASC' ], diff --git a/Model/QueuedTask.php b/Model/QueuedTask.php index b1c44daa..c8a3476c 100644 --- a/Model/QueuedTask.php +++ b/Model/QueuedTask.php @@ -11,6 +11,8 @@ */ class QueuedTask extends QueueAppModel { + public $_next_priority = 5; + public $rateHistory = []; public $exit = false; @@ -61,6 +63,12 @@ public function initConfig() { Configure::write('Queue', $conf); } + public function nextPriority($priority) + { + $this->_next_priority = $priority; + return $this; + } + /** * Add a new Job to the Queue. * @@ -76,8 +84,10 @@ public function createJob($jobName, $data = null, $notBefore = null, $group = nu 'jobtype' => $jobName, 'data' => serialize($data), 'group' => $group, - 'reference' => $reference + 'reference' => $reference, + 'priority' => $this->_next_priority ]; + $this->_next_priority = 5; // reset to default; if ($notBefore !== null) { $data['notbefore'] = date('Y-m-d H:i:s', strtotime($notBefore)); } @@ -119,6 +129,7 @@ public function requestJob($capabilities, $group = null) { 'age', ], 'order' => [ + 'priority ASC', 'age ASC', 'id ASC' ], @@ -175,7 +186,7 @@ public function requestJob($capabilities, $group = null) { //debug($key);ob_flush(); // try to update one of the found tasks with the key of this worker. - $this->query('UPDATE ' . $this->tablePrefix . $this->table . ' SET workerkey = "' . $key . '", fetched = "' . date('Y-m-d H:i:s') . '" WHERE ' . implode(' OR ', $whereClause) . ' ORDER BY ' . $this->virtualFields['age'] . ' ASC, id ASC LIMIT 1'); + $this->query('UPDATE ' . $this->tablePrefix . $this->table . ' SET workerkey = "' . $key . '", fetched = "' . date('Y-m-d H:i:s') . '" WHERE ' . implode(' OR ', $whereClause) . ' ORDER BY priority ASC, ' . $this->virtualFields['age'] . ' ASC, id ASC LIMIT 1'); // Read which one actually got updated, which is the job we are supposed to execute. $data = $this->find('first', [ From eda9127f40b18426b0deba756d840214c03e8581 Mon Sep 17 00:00:00 2001 From: Neil Holcomb Date: Thu, 7 Apr 2016 13:04:12 -0700 Subject: [PATCH 16/35] Added a try catch block to prevent a poorly written task from killing a worker --- Console/Command/QueueShell.php | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/Console/Command/QueueShell.php b/Console/Command/QueueShell.php index dfda26f0..3a05ec44 100644 --- a/Console/Command/QueueShell.php +++ b/Console/Command/QueueShell.php @@ -213,7 +213,18 @@ public function runworker() { if ($this->{$taskname}->autoUnserialize) { $data['data'] = unserialize($data['data']); } - $return = $this->{$taskname}->run($data['data'], $data['id']); + //prevent tasks that don't catch their own errors from killing this worker + try { + $return = $this->{$taskname}->run($data['data'], $data['id']); + } catch ( Exception $e) + { + //assume job failed + $return = false; + + //log the exception + $this->_logError( $taskname ."\n\n". $e->getMessage() ."\n\n". $e->getTraceAsString() ); + } + if ($return) { $this->QueuedTask->markJobDone($data['id']); $this->out('Job Finished.'); @@ -393,6 +404,21 @@ protected function _log($type, $pid = null) { } } + /** + * Timestamped log. + * + * @param string $message + * + * @internal param string $type Log type + * @internal param int $pid PID of the process + */ + protected function _logError($message = '') { + # log? + if (Configure::read('Queue.log')) { + CakeLog::write('queue-error', $message); + } + } + /** * Timestamped notification. * From dae2570611d211f3e9ccd5fec24795558c94598f Mon Sep 17 00:00:00 2001 From: Chris Renfrow Date: Fri, 6 Nov 2020 01:32:29 -0700 Subject: [PATCH 17/35] adding SQS support for queue system --- Console/Command/QueueShell.php | 128 ++++++++++++++++++ Model/QueuedTask.php | 230 ++++++++++++++++++++++++++++++++- 2 files changed, 357 insertions(+), 1 deletion(-) diff --git a/Console/Command/QueueShell.php b/Console/Command/QueueShell.php index 3a05ec44..9e812f84 100644 --- a/Console/Command/QueueShell.php +++ b/Console/Command/QueueShell.php @@ -261,6 +261,134 @@ public function runworker() { } } + /** + * Run a QueueWorker loop. + * Runs a Queue Worker process which will try to find unassigned jobs in the queue + * which it may run and try to fetch and execute them. + * + * @return void + */ + public function runworkersqs() { + //queue url is passed in thru URL + $queueUrl = $this->args[0]; + + if(!$queueUrl) { + $this->out('sqs queue url as first parameter is required'); + return; + } + + + if ($pidFilePath = Configure::read('Queue.pidfilepath')) { + if (!file_exists($pidFilePath)) { + mkdir($pidFilePath, 0755, true); + } + if (function_exists('posix_getpid')) { + $pid = posix_getpid(); + } else { + $pid = $this->QueuedTask->key(); + } + # global file + $fp = fopen($pidFilePath . 'queue.pid', "w"); + fwrite($fp, $pid); + fclose($fp); + # specific pid file + if (function_exists('posix_getpid')) { + $pid = posix_getpid(); + } else { + $pid = $this->QueuedTask->key(); + } + $pidFileName = 'queue_' . $pid . '.pid'; + $fp = fopen($pidFilePath . $pidFileName, "w"); + fwrite($fp, $pid); + fclose($fp); + } + // Enable Garbage Collector (PHP >= 5.3) + if (function_exists('gc_enable')) { + gc_enable(); + } + if (function_exists('pcntl_signal')) { + pcntl_signal(SIGTERM, [&$this, "_exit"]); + } + $this->_exit = false; + + $starttime = time(); + $group = null; + if (!empty($this->params['group'])) { + $group = $this->params['group']; + } + while (!$this->_exit) { + // make sure accidental overriding isnt possible + set_time_limit(0); + if (!empty($pidFilePath)) { + touch($pidFilePath . 'queue.pid'); + } + if (!empty($pidFileName)) { + touch($pidFilePath . $pidFileName); + } + //$this->_log('runworker', isset($pid) ? $pid : null); + $this->out('[' . date('Y-m-d H:i:s') . '] Looking for Job ...'); + + $data = $this->QueuedTask->requestSqsJob($queueUrl); + //$data = $this->QueuedTask->requestJob($this->_getTaskConf(), $group); + if ($this->QueuedTask->exit === true) { + $this->_exit = true; + } else { + if ($data) { + $this->out('Running Job of type "' . $data['jobtype'] . '"'); + $taskname = 'Queue' . $data['jobtype']; + + if ($this->{$taskname}->autoUnserialize) { + $data['data'] = unserialize($data['data']); + } + //prevent tasks that don't catch their own errors from killing this worker + + try { + $return = $this->{$taskname}->run($data['data'], $data['id']); + } catch ( Exception $e) + { + //assume job failed + $return = false; + + //log the exception + $this->_logError( $taskname ."\n\n". $e->getMessage() ."\n\n". $e->getTraceAsString() ); + } + + if ($return) { + $this->QueuedTask->markJobDoneSqs($data, $queueUrl); + $this->out('Job Finished.'); + } else { + $failureMessage = null; + if (isset($this->{$taskname}->failureMessage) && !empty($this->{$taskname}->failureMessage)) { + $failureMessage = $this->{$taskname}->failureMessage; + } + $this->QueuedTask->markJobFailedSqs($data, $queueUrl, $failureMessage); + $this->out('Job did not finish, requeued.'); + } + } elseif (Configure::read('Queue.exitwhennothingtodo')) { + $this->out('nothing to do, exiting.'); + $this->_exit = true; + } else { + $this->out('nothing to do, sleeping.'); + sleep(Configure::read('Queue.sleeptime')); + } + + // check if we are over the maximum runtime and end processing if so. + if (Configure::read('Queue.workermaxruntime') && (time() - $starttime) >= Configure::read('Queue.workermaxruntime')) { + $this->_exit = true; + $this->out('Reached runtime of ' . (time() - $starttime) . ' Seconds (Max ' . Configure::read('Queue.workermaxruntime') . '), terminating.'); + } + if ($this->_exit || rand(0, 100) > (100 - Configure::read('Queue.gcprob'))) { + $this->out('Performing Old job cleanup.'); + $this->QueuedTask->cleanOldJobs(); + } + $this->hr(); + } + } + if (!empty($pidFilePath)) { + unlink($pidFilePath . 'queue_' . $pid . '.pid'); + } + } + /** * Manually trigger a Finished job cleanup. * diff --git a/Model/QueuedTask.php b/Model/QueuedTask.php index c8a3476c..016b301f 100644 --- a/Model/QueuedTask.php +++ b/Model/QueuedTask.php @@ -1,4 +1,6 @@ virtualFields['status'] = '(CASE WHEN ' . $this->alias . '.notbefore > NOW() THEN \'NOT_READY\' WHEN ' . $this->alias . '.fetched IS null THEN \'NOT_STARTED\' WHEN ' . $this->alias . '.fetched IS NOT null AND ' . $this->alias . '.completed IS null AND ' . $this->alias . '.failed = 0 THEN \'IN_PROGRESS\' WHEN ' . $this->alias . '.fetched IS NOT null AND ' . $this->alias . '.completed IS null AND ' . $this->alias . '.failed > 0 THEN \'FAILED\' WHEN ' . $this->alias . '.fetched IS NOT null AND ' . $this->alias . '.completed IS NOT null THEN \'COMPLETED\' ELSE \'UNKNOWN\' END)'; + + $Settings = ClassRegistry::init('Symphosize.Setting'); + $settings = $Settings->getValues([ + 'aws_key', + 'aws_secret_key' + ]); + + //set up the flySystem + $this->sqsClient = SqsClient::factory([ + 'key' => $settings['aws_key'], + 'secret' => $settings['aws_secret_key'], + 'region' => 'us-east-1' + ]); } /** @@ -80,6 +97,56 @@ public function nextPriority($priority) * @return array Created Job array containing id, data, ... */ public function createJob($jobName, $data = null, $notBefore = null, $group = null, $reference = null) { + //to try and prevent duplicate jobs for already scheduled jobs we will try a "by dupekey" lookup for certain job types + $dupeKey = false; + switch($jobName) { + case 'CrewCalNotifyCalendarUpdate': + $dupeKey = $data['calendar_id']; + break; + case 'GoogleCalendarChannelKeepAlive': + $dupeKey = $data['id']; + break; + case 'ReinitFollowUpInstance': + $dupeKey = $data['instance_id']; + break; + case 'ReleaseModule': + $dupeKey = $data['company_id'] . '.' . $data['sub_service_id']; + break; + case 'SaveConnection': + $dupeKey = $data['bidId']; + break; + case 'SaveSingleConnection': + $dupeKey = $data['provider'] . '.' . $data['bidId']; + break; + case 'SyncIntercomCompany': + $dupeKey = $data['company_id']; + break; + case 'UpdateModuleService': + $dupeKey = $data['sub_service_id']; + break; + } + + //check for duplicate + if($dupeKey) { + $dupe = false; + try { + $dupe = $this->find('first', [ + 'conditions' => [ + 'jobtype' => $jobName, + 'dupekey' => $dupeKey, + 'fetched IS NULL' + + ], + 'contain' => false, + ]); + } catch(Exception $e) { + + } + if($dupe) { + return false; + } + } + $data = [ 'jobtype' => $jobName, 'data' => serialize($data), @@ -87,14 +154,44 @@ public function createJob($jobName, $data = null, $notBefore = null, $group = nu 'reference' => $reference, 'priority' => $this->_next_priority ]; + //save dupeKey if we have one + if($dupeKey) { + $data['dupekey'] = $dupeKey; + } $this->_next_priority = 5; // reset to default; if ($notBefore !== null) { $data['notbefore'] = date('Y-m-d H:i:s', strtotime($notBefore)); } $this->create(); - return $this->save($data); + $result = $this->save($data); + + //send sqs message + $this->triggerSqsMessage($jobName, $this->id); + + return $result; } + public function triggerSqsMessage($jobName, $taskId) { + try { + $queues = Configure::read('Queue.sqs_queues'); + if(array_key_exists($jobName, $queues)) { + $queueUrl = $queues[$jobName]; + } else { + $queueUrl = $queues['DEFAULT']; + } + + $response = $this->sqsClient->sendMessage(array( + 'QueueUrl' => $queueUrl, + 'MessageBody' => json_encode([ + 'id' => $taskId + ]) + )); + } catch(Exception $e) { + + } + + } + /** * Set exit to true on error * @@ -104,6 +201,137 @@ public function onError() { $this->exit = true; } + public function requestSqsJob($queueUrl, $waitTime=20) { + //get record from SQS + $result = $this->sqsClient->receiveMessage(array( + 'QueueUrl' => $queueUrl, + 'WaitTimeSeconds' => $waitTime + )); + + + if(!$result || !$result->get('Messages')) { + return []; + } + $message = $result->get('Messages')[0]; + + + try { + $data = json_decode($message['Body'], true); + } catch(\Exception $e) { + + return []; + } + + //read record from database + $dbRecord = $this->find('first', [ + 'conditions' => [ + 'id' => $data['id'] + ], + 'contain' => false, + ]); + + if(!$dbRecord || $dbRecord['QueuedTask']['completed']) { + //doesn't exist or is completed + $this->deleteSqsMessage($queueUrl, $message['ReceiptHandle']); + return []; + } + + //confirm we are not withing the last fetch window + if($dbRecord['QueuedTask']['fetched']) { + $maxTime = new DateTime('-1 minutes'); + $fetchDate = new DateTime($dbRecord['QueuedTask']['fetched']); + if($maxTime < $fetchDate) { + //not timed out yet, ignore + $this->deleteSqsMessage($queueUrl, $message['ReceiptHandle']); + return []; + } + } + + + //claim record against database + $key = $this->key(); + $date = date("Y-m-d H:i:s"); + + $db = $this->getDataSource(); + $dateString = $db->value($date, 'string'); + $workerKey = $db->value($key, 'string'); + + $this->updateAll([ + 'fetched' => $dateString, + 'workerkey' => $workerKey, + ],[ + 'id' => $dbRecord['QueuedTask']['id'], + 'fetched' => $dbRecord['QueuedTask']['fetched'], + 'workerkey' => $dbRecord['QueuedTask']['workerkey'], + ]); + + $confirmRecord = $this->find('first', [ + 'conditions' => [ + 'id' => $data['id'] + ], + 'contain' => false, + ]); + + if( + !$confirmRecord || + $confirmRecord['QueuedTask']['workerkey'] !== $key || + $confirmRecord['QueuedTask']['fetched'] !== $date + ) { + //did not claim + $this->deleteSqsMessage($queueUrl, $message['ReceiptHandle']); + return []; + } + + $confirmRecord['QueuedTask']['sqsReceiptHandle'] = $message['ReceiptHandle']; + + return $confirmRecord['QueuedTask']; + } + + public function markJobDoneSqs($dbRecord, $sqsQueueUrl) { + $fields = [ + $this->alias . '.completed' => "'" . date('Y-m-d H:i:s') . "'" + ]; + $conditions = [ + $this->alias . '.id' => $dbRecord['id'] + ]; + $this->updateAll($fields, $conditions); + + $this->deleteSqsMessage($sqsQueueUrl, $dbRecord['sqsReceiptHandle']); + } + + /** + * Mark a job as Failed, Incrementing the failed-counter and Requeueing it. + * + * @param int $id ID of task + * @param string $failureMessage Optional message to append to the failure_message field. + * @return bool Success + */ + public function markJobFailedSqs($dbRecord, $sqsQueueUrl, $failureMessage = null) { + $db = $this->getDataSource(); + $fields = [ + $this->alias . '.failed' => $this->alias . '.failed + 1', + $this->alias . '.failure_message' => $db->value($failureMessage), + ]; + $conditions = [ + $this->alias . '.id' => $dbRecord['id'] + ]; + $this->updateAll($fields, $conditions); + + $this->deleteSqsMessage($sqsQueueUrl, $dbRecord['sqsReceiptHandle']); + + } + + public function deleteSqsMessage($queueUrl, $receiptHandle) { + try { + $this->sqsClient->deleteMessage([ + 'QueueUrl' => $queueUrl, + 'ReceiptHandle' => $receiptHandle + ]); + } catch(\Exception $e) { + + } + } + /** * Look for a new job that can be processed with the current abilities and * from the specified group (or any if null). From d6d97190b774b12865ddb9b59febc36ebd35edd8 Mon Sep 17 00:00:00 2001 From: Chris Renfrow Date: Fri, 6 Nov 2020 07:01:52 -0700 Subject: [PATCH 18/35] adding some queue worker output to debug intercom sync jobs --- Model/QueuedTask.php | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/Model/QueuedTask.php b/Model/QueuedTask.php index 016b301f..7baca6d7 100644 --- a/Model/QueuedTask.php +++ b/Model/QueuedTask.php @@ -210,6 +210,7 @@ public function requestSqsJob($queueUrl, $waitTime=20) { if(!$result || !$result->get('Messages')) { + echo "\nNo messages\n"; return []; } $message = $result->get('Messages')[0]; @@ -218,7 +219,7 @@ public function requestSqsJob($queueUrl, $waitTime=20) { try { $data = json_decode($message['Body'], true); } catch(\Exception $e) { - + echo "\n JSON decode failed\n"; return []; } @@ -231,6 +232,8 @@ public function requestSqsJob($queueUrl, $waitTime=20) { ]); if(!$dbRecord || $dbRecord['QueuedTask']['completed']) { + echo "\n DB record lookup failed\n"; + print_r($dbRecord); //doesn't exist or is completed $this->deleteSqsMessage($queueUrl, $message['ReceiptHandle']); return []; @@ -277,6 +280,8 @@ public function requestSqsJob($queueUrl, $waitTime=20) { $confirmRecord['QueuedTask']['workerkey'] !== $key || $confirmRecord['QueuedTask']['fetched'] !== $date ) { + echo "\n claim failed\n"; + print_r($confirmRecord); //did not claim $this->deleteSqsMessage($queueUrl, $message['ReceiptHandle']); return []; From 3bcac80e68566d34c29cfa31c541add38aaec888 Mon Sep 17 00:00:00 2001 From: Chris Renfrow Date: Fri, 6 Nov 2020 07:23:57 -0700 Subject: [PATCH 19/35] adding new retry logic for when the DB record isn't ready but SQS triggered --- Model/QueuedTask.php | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/Model/QueuedTask.php b/Model/QueuedTask.php index 7baca6d7..1b863cbb 100644 --- a/Model/QueuedTask.php +++ b/Model/QueuedTask.php @@ -171,7 +171,7 @@ public function createJob($jobName, $data = null, $notBefore = null, $group = nu return $result; } - public function triggerSqsMessage($jobName, $taskId) { + public function triggerSqsMessage($jobName, $taskId, $retryCount=0) { try { $queues = Configure::read('Queue.sqs_queues'); if(array_key_exists($jobName, $queues)) { @@ -183,7 +183,9 @@ public function triggerSqsMessage($jobName, $taskId) { $response = $this->sqsClient->sendMessage(array( 'QueueUrl' => $queueUrl, 'MessageBody' => json_encode([ - 'id' => $taskId + 'id' => $taskId, + 'retryCount' => $retryCount, + 'jobtype' => $jobName ]) )); } catch(Exception $e) { @@ -233,7 +235,14 @@ public function requestSqsJob($queueUrl, $waitTime=20) { if(!$dbRecord || $dbRecord['QueuedTask']['completed']) { echo "\n DB record lookup failed\n"; + print_r($data); + echo "\ndb record\n"; print_r($dbRecord); + if(!$data['retryCount'] || $data['retryCount'] < 3) { + echo "\nAttempting a retry\n"; + sleep(1); + $this->triggerSqsMessage($data['jobtype'], $data['id'], $data['retryCount']++); + } //doesn't exist or is completed $this->deleteSqsMessage($queueUrl, $message['ReceiptHandle']); return []; From 4e74dcdc9cbd36ecd6f7803e122175887a8695dc Mon Sep 17 00:00:00 2001 From: Chris Renfrow Date: Fri, 6 Nov 2020 08:09:59 -0700 Subject: [PATCH 20/35] default delay of 2 seconds for SQS jobs to let the databse catch up --- Model/QueuedTask.php | 1 + 1 file changed, 1 insertion(+) diff --git a/Model/QueuedTask.php b/Model/QueuedTask.php index 1b863cbb..95445a9a 100644 --- a/Model/QueuedTask.php +++ b/Model/QueuedTask.php @@ -182,6 +182,7 @@ public function triggerSqsMessage($jobName, $taskId, $retryCount=0) { $response = $this->sqsClient->sendMessage(array( 'QueueUrl' => $queueUrl, + 'DelaySeconds' => 2, 'MessageBody' => json_encode([ 'id' => $taskId, 'retryCount' => $retryCount, From d787a45c79ade0e2474fbebeef01272010bd4713 Mon Sep 17 00:00:00 2001 From: Chris Renfrow Date: Wed, 11 Nov 2020 03:40:17 -0700 Subject: [PATCH 21/35] supporting delayed queue jobs --- Model/QueuedTask.php | 34 ++++++++++++++++++++++++++++------ 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/Model/QueuedTask.php b/Model/QueuedTask.php index 95445a9a..a84883ed 100644 --- a/Model/QueuedTask.php +++ b/Model/QueuedTask.php @@ -166,12 +166,28 @@ public function createJob($jobName, $data = null, $notBefore = null, $group = nu $result = $this->save($data); //send sqs message - $this->triggerSqsMessage($jobName, $this->id); + if ($notBefore !== null) { + $delaySeconds = 2; + try { + $now = strtotime('now'); + $targetDate = strtotime($notBefore); + $delaySeconds = abs($now - $targetDate); + if($delaySeconds > 900){ + $delaySeconds = 890; + } + } catch(Exception $e) { + + } + $this->triggerSqsMessage($jobName, $this->id, 0, $delaySeconds); + } else { + $this->triggerSqsMessage($jobName, $this->id); + } + return $result; } - public function triggerSqsMessage($jobName, $taskId, $retryCount=0) { + public function triggerSqsMessage($jobName, $taskId, $retryCount=0, $delaySeconds=2) { try { $queues = Configure::read('Queue.sqs_queues'); if(array_key_exists($jobName, $queues)) { @@ -182,7 +198,7 @@ public function triggerSqsMessage($jobName, $taskId, $retryCount=0) { $response = $this->sqsClient->sendMessage(array( 'QueueUrl' => $queueUrl, - 'DelaySeconds' => 2, + 'DelaySeconds' => $delaySeconds, 'MessageBody' => json_encode([ 'id' => $taskId, 'retryCount' => $retryCount, @@ -234,26 +250,32 @@ public function requestSqsJob($queueUrl, $waitTime=20) { 'contain' => false, ]); - if(!$dbRecord || $dbRecord['QueuedTask']['completed']) { + if(!$dbRecord) { echo "\n DB record lookup failed\n"; print_r($data); echo "\ndb record\n"; print_r($dbRecord); if(!$data['retryCount'] || $data['retryCount'] < 3) { echo "\nAttempting a retry\n"; - sleep(1); - $this->triggerSqsMessage($data['jobtype'], $data['id'], $data['retryCount']++); + $this->triggerSqsMessage($data['jobtype'], $data['id'], $data['retryCount']++, 10); } //doesn't exist or is completed $this->deleteSqsMessage($queueUrl, $message['ReceiptHandle']); return []; } + if($dbRecord['QueuedTask']['completed']) { + //already done + $this->deleteSqsMessage($queueUrl, $message['ReceiptHandle']); + return []; + } + //confirm we are not withing the last fetch window if($dbRecord['QueuedTask']['fetched']) { $maxTime = new DateTime('-1 minutes'); $fetchDate = new DateTime($dbRecord['QueuedTask']['fetched']); if($maxTime < $fetchDate) { + $this->triggerSqsMessage($data['jobtype'], $data['id'], $data['retryCount'], 60); //not timed out yet, ignore $this->deleteSqsMessage($queueUrl, $message['ReceiptHandle']); return []; From 327075c61401c0c1cda4afd6c1eb86494a69823c Mon Sep 17 00:00:00 2001 From: Chris Renfrow Date: Thu, 12 Nov 2020 10:22:24 -0700 Subject: [PATCH 22/35] adding some error logging and reschedule queued tasks logic --- Model/QueuedTask.php | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/Model/QueuedTask.php b/Model/QueuedTask.php index a84883ed..268e2895 100644 --- a/Model/QueuedTask.php +++ b/Model/QueuedTask.php @@ -4,6 +4,7 @@ App::uses('QueueAppModel', 'Queue.Model'); App::uses('Hash', 'Utility'); + /** * QueuedTask for queued tasks. * @@ -187,7 +188,24 @@ public function createJob($jobName, $data = null, $notBefore = null, $group = nu return $result; } + public function findAndRescheduleTasks() { + $backDate = date('Y-m-d H:i:s', strtotime('-5 minutes')); + + foreach($this->find('all', [ + 'conditions' => [ + 'created < ' => $backDate, + 'fetched IS NULL', + 'notbefore IS NULL', + + ], + 'contain' => false + ]) as $queuedTask) { + $this->triggerSqsMessage($queuedTask['QueuedTask']['jobtype'], $queuedTask['QueuedTask']['id']); + } + } + public function triggerSqsMessage($jobName, $taskId, $retryCount=0, $delaySeconds=2) { + $queueUrl = false; try { $queues = Configure::read('Queue.sqs_queues'); if(array_key_exists($jobName, $queues)) { @@ -206,7 +224,15 @@ public function triggerSqsMessage($jobName, $taskId, $retryCount=0, $delaySecond ]) )); } catch(Exception $e) { + try { + CakeLog::write( 'QUEUE_SQS_TRIGGER', json_encode([ + 'error' => $e->getMessage(), + 'queueUrl' => $queueUrl, + 'jobtype' => $jobName + ]) ); + } catch (Exception $e) { + } } } @@ -258,6 +284,11 @@ public function requestSqsJob($queueUrl, $waitTime=20) { if(!$data['retryCount'] || $data['retryCount'] < 3) { echo "\nAttempting a retry\n"; $this->triggerSqsMessage($data['jobtype'], $data['id'], $data['retryCount']++, 10); + } else { + CakeLog::write( 'SQS_RETRY_OUT_' . $data['id'], json_encode([ + 'data' => $data, + 'queueUrl' => $queueUrl + ]) ); } //doesn't exist or is completed $this->deleteSqsMessage($queueUrl, $message['ReceiptHandle']); @@ -314,6 +345,11 @@ public function requestSqsJob($queueUrl, $waitTime=20) { ) { echo "\n claim failed\n"; print_r($confirmRecord); + CakeLog::write( 'SQS_CLAIM_FAILED_' . $data['id'], json_encode([ + 'data' => $data, + 'queueUrl' => $queueUrl, + 'confirmRecord' => $confirmRecord + ]) ); //did not claim $this->deleteSqsMessage($queueUrl, $message['ReceiptHandle']); return []; From 3d653ef4e71c74b950a5f9fca3861992d8912176 Mon Sep 17 00:00:00 2001 From: Chris Renfrow Date: Thu, 10 Dec 2020 07:12:10 -0700 Subject: [PATCH 23/35] Queue retry logic now correctly increments the retry counter --- Model/QueuedTask.php | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/Model/QueuedTask.php b/Model/QueuedTask.php index 268e2895..8e0b86dd 100644 --- a/Model/QueuedTask.php +++ b/Model/QueuedTask.php @@ -282,8 +282,10 @@ public function requestSqsJob($queueUrl, $waitTime=20) { echo "\ndb record\n"; print_r($dbRecord); if(!$data['retryCount'] || $data['retryCount'] < 3) { - echo "\nAttempting a retry\n"; - $this->triggerSqsMessage($data['jobtype'], $data['id'], $data['retryCount']++, 10); + $data['retryCount']++; + echo "\nAttempting a retry: " . $data['retryCount'] . "\n"; + $this->triggerSqsMessage($data['jobtype'], $data['id'], $data['retryCount'], 10); + } else { CakeLog::write( 'SQS_RETRY_OUT_' . $data['id'], json_encode([ 'data' => $data, From b2d520258dc0e9b4a25f6873d8ebca55e2889e21 Mon Sep 17 00:00:00 2001 From: Chris Renfrow Date: Mon, 4 Jan 2021 12:15:56 -0700 Subject: [PATCH 24/35] reducing how many time we retry for db records not found to 1 retry attempt --- Model/QueuedTask.php | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/Model/QueuedTask.php b/Model/QueuedTask.php index 8e0b86dd..c303c05f 100644 --- a/Model/QueuedTask.php +++ b/Model/QueuedTask.php @@ -275,22 +275,20 @@ public function requestSqsJob($queueUrl, $waitTime=20) { ], 'contain' => false, ]); + if(!$data['retryCount']) { + $data['retryCount'] = 0; + } if(!$dbRecord) { echo "\n DB record lookup failed\n"; print_r($data); echo "\ndb record\n"; print_r($dbRecord); - if(!$data['retryCount'] || $data['retryCount'] < 3) { + if($data['retryCount'] < 1) { $data['retryCount']++; echo "\nAttempting a retry: " . $data['retryCount'] . "\n"; $this->triggerSqsMessage($data['jobtype'], $data['id'], $data['retryCount'], 10); - } else { - CakeLog::write( 'SQS_RETRY_OUT_' . $data['id'], json_encode([ - 'data' => $data, - 'queueUrl' => $queueUrl - ]) ); } //doesn't exist or is completed $this->deleteSqsMessage($queueUrl, $message['ReceiptHandle']); From 48eff1260ee053868ac4d01c962e454fdd1892ab Mon Sep 17 00:00:00 2001 From: Ron Mercado Date: Wed, 7 May 2025 23:25:18 +0800 Subject: [PATCH 25/35] fix: add additional dupekey to prevent overriding queue - jira issues: RES-5227 --- Model/QueuedTask.php | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Model/QueuedTask.php b/Model/QueuedTask.php index c303c05f..51eb0f38 100644 --- a/Model/QueuedTask.php +++ b/Model/QueuedTask.php @@ -114,10 +114,12 @@ public function createJob($jobName, $data = null, $notBefore = null, $group = nu $dupeKey = $data['company_id'] . '.' . $data['sub_service_id']; break; case 'SaveConnection': - $dupeKey = $data['bidId']; + $additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0; + $dupeKey = $data['bidId'] . '.' . $additionalKey; break; case 'SaveSingleConnection': - $dupeKey = $data['provider'] . '.' . $data['bidId']; + $additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0; + $dupeKey = $data['provider'] . '.' . $data['bidId'] . '.' . $additionalKey; break; case 'SyncIntercomCompany': $dupeKey = $data['company_id']; @@ -195,8 +197,6 @@ public function findAndRescheduleTasks() { 'conditions' => [ 'created < ' => $backDate, 'fetched IS NULL', - 'notbefore IS NULL', - ], 'contain' => false ]) as $queuedTask) { From 86c460cbda057aa112d9274f90d0987354b9aa57 Mon Sep 17 00:00:00 2001 From: Ron Mercado Date: Thu, 8 May 2025 00:10:23 +0800 Subject: [PATCH 26/35] fix: revert changes on findAndReschedule --- Model/QueuedTask.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Model/QueuedTask.php b/Model/QueuedTask.php index 51eb0f38..d26d21e7 100644 --- a/Model/QueuedTask.php +++ b/Model/QueuedTask.php @@ -119,7 +119,7 @@ public function createJob($jobName, $data = null, $notBefore = null, $group = nu break; case 'SaveSingleConnection': $additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0; - $dupeKey = $data['provider'] . '.' . $data['bidId'] . '.' . $additionalKey; + $dupeKey = $data['provider'] . '.' . $data['bidId']; break; case 'SyncIntercomCompany': $dupeKey = $data['company_id']; @@ -197,6 +197,7 @@ public function findAndRescheduleTasks() { 'conditions' => [ 'created < ' => $backDate, 'fetched IS NULL', + 'notbefore IS NULL', ], 'contain' => false ]) as $queuedTask) { From ec4dba95212b693a1d968f6e8628c4a5738b36ff Mon Sep 17 00:00:00 2001 From: Ron Mercado Date: Thu, 8 May 2025 00:10:59 +0800 Subject: [PATCH 27/35] fix: spacing --- Model/QueuedTask.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Model/QueuedTask.php b/Model/QueuedTask.php index d26d21e7..98a21d2c 100644 --- a/Model/QueuedTask.php +++ b/Model/QueuedTask.php @@ -197,7 +197,7 @@ public function findAndRescheduleTasks() { 'conditions' => [ 'created < ' => $backDate, 'fetched IS NULL', - 'notbefore IS NULL', + 'notbefore IS NULL', ], 'contain' => false ]) as $queuedTask) { From 9f26f71d40b7d93f7c3873c647090b2baf4a8952 Mon Sep 17 00:00:00 2001 From: Ron Mercado Date: Thu, 8 May 2025 04:50:24 +0800 Subject: [PATCH 28/35] fix: missing additional key on save single connection --- Model/QueuedTask.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Model/QueuedTask.php b/Model/QueuedTask.php index 98a21d2c..8d1dbbc5 100644 --- a/Model/QueuedTask.php +++ b/Model/QueuedTask.php @@ -119,7 +119,7 @@ public function createJob($jobName, $data = null, $notBefore = null, $group = nu break; case 'SaveSingleConnection': $additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0; - $dupeKey = $data['provider'] . '.' . $data['bidId']; + $dupeKey = $data['provider'] . '.' . $data['bidId'] . '.' . $additionalKey; break; case 'SyncIntercomCompany': $dupeKey = $data['company_id']; From 37ae0d121b5bc823aaaa01f9a33f6d9ede57bd38 Mon Sep 17 00:00:00 2001 From: xrompdev Date: Mon, 19 May 2025 23:50:05 +0800 Subject: [PATCH 29/35] feat: add workermaxmemorytimeout option --- Console/Command/QueueShell.php | 55 ++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/Console/Command/QueueShell.php b/Console/Command/QueueShell.php index 9e812f84..0e28c50b 100644 --- a/Console/Command/QueueShell.php +++ b/Console/Command/QueueShell.php @@ -206,6 +206,26 @@ public function runworker() { if ($this->QueuedTask->exit === true) { $this->_exit = true; } else { + // check if we are over the memory limit and end processing if so + $memoryLimit = Configure::read('Queue.workermaxmemory'); + $workerMaxMemoryTimeout = Configure::read('Queue.workermaxmemorytimeout'); + if ($memoryLimit) { + $memoryUsage = $this->_humanReadableBytes(memory_get_usage(true)); + $this->out('Memory usage: ' . $memoryUsage); + if ($memoryUsage >= $memoryLimit) { + $this->out('Reached memory limit of ' . $memoryUsage . ' (Max ' . $memoryLimit . 'MB), skipping this job.'); + // Mark job as failed due to memory constraints + $this->QueuedTask->markJobFailed($data['id'], 'Not enough memory to run this job. Worker memory usage hit ' . $memoryUsage . 'MB, over the ' . $memoryLimit . 'MB max limit.'); + if ($workerMaxMemoryTimeout) { + $this->out('Exiting in ' . $workerMaxMemoryTimeout . ' seconds due to memory limit.'); + sleep($workerMaxMemoryTimeout); + $this->_exit = true; + } + sleep(Configure::read('Queue.sleeptime')); + $this->hr(); + continue; + } + } if ($data) { $this->out('Running Job of type "' . $data['jobtype'] . '"'); $taskname = 'Queue' . $data['jobtype']; @@ -333,6 +353,26 @@ public function runworkersqs() { if ($this->QueuedTask->exit === true) { $this->_exit = true; } else { + // check if we are over the memory limit and end processing if so + $memoryLimit = Configure::read('Queue.workermaxmemory'); + $workerMaxMemoryTimeout = Configure::read('Queue.workermaxmemorytimeout'); + if ($memoryLimit) { + $memoryUsage = $this->_humanReadableBytes(memory_get_usage(true)); + $this->out('Memory usage: ' . $memoryUsage); + if ($memoryUsage >= $memoryLimit) { + $this->out('Reached memory limit of ' . $memoryUsage . ' (Max ' . $memoryLimit . 'MB), skipping this job.'); + // Mark job as failed due to memory constraints + $this->QueuedTask->markJobFailed($data['id'], 'Not enough memory to run this job. Worker memory usage hit ' . $memoryUsage . 'MB, over the ' . $memoryLimit . 'MB max limit.'); + if ($workerMaxMemoryTimeout) { + $this->out('Exiting in ' . $workerMaxMemoryTimeout . ' seconds due to memory limit.'); + sleep($workerMaxMemoryTimeout); + $this->_exit = true; + } + sleep(Configure::read('Queue.sleeptime')); + $this->hr(); + continue; + } + } if ($data) { $this->out('Running Job of type "' . $data['jobtype'] . '"'); $taskname = 'Queue' . $data['jobtype']; @@ -621,4 +661,19 @@ public function __destruct() { } } +/** + * Format bytes into human readable format + * + * @param int $bytes Number of bytes + * @return string Human readable bytes format + */ + protected function _humanReadableBytes($bytes) { + $units = ['B', 'KB', 'MB', 'GB', 'TB']; + $bytes = max($bytes, 0); + $pow = floor(($bytes ? log($bytes) : 0) / log(1024)); + $pow = min($pow, count($units) - 1); + $bytes /= pow(1024, $pow); + return round($bytes, 2) . ' ' . $units[$pow]; + } + } From d6f96fb9f3b51d4aed556a81c38ce9f75dd6064b Mon Sep 17 00:00:00 2001 From: xrompdev Date: Mon, 3 Nov 2025 02:08:00 +0800 Subject: [PATCH 30/35] feat: add ECS mode support and enhance job processing logic - Introduced ECS mode in QueueShell for conditional message processing. - Updated output messages to reflect the current mode (ECS or EC2). - Added isCompanyForEcsQueue method in QueuedTask to determine ECS eligibility based on company settings. - Enhanced job handling in QueuedTask to support new job names for ECS-enabled companies. --- Console/Command/QueueShell.php | 31 +++++++++++++++++++++++++++++-- Model/QueuedTask.php | 31 +++++++++++++++++++++++++++++-- 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/Console/Command/QueueShell.php b/Console/Command/QueueShell.php index 0e28c50b..652ee9e9 100644 --- a/Console/Command/QueueShell.php +++ b/Console/Command/QueueShell.php @@ -297,6 +297,14 @@ public function runworkersqs() { return; } + // Check if running in ECS mode + $enableEcs = !empty($this->params['enable-ecs']); + if ($enableEcs) { + $this->out('[ECS MODE] Only processing messages'); + } else { + $this->out('[EC2 MODE] Only processing messages'); + } + if ($pidFilePath = Configure::read('Queue.pidfilepath')) { if (!file_exists($pidFilePath)) { @@ -346,7 +354,7 @@ public function runworkersqs() { touch($pidFilePath . $pidFileName); } //$this->_log('runworker', isset($pid) ? $pid : null); - $this->out('[' . date('Y-m-d H:i:s') . '] Looking for Job ...'); + $this->out('[' . date('Y-m-d H:i:s') . '] Looking for ' . ($enableEcs ? 'ECS' : 'EC2') . ' Job ...'); $data = $this->QueuedTask->requestSqsJob($queueUrl); //$data = $this->QueuedTask->requestJob($this->_getTaskConf(), $group); @@ -375,6 +383,11 @@ public function runworkersqs() { } if ($data) { $this->out('Running Job of type "' . $data['jobtype'] . '"'); + // For ECS consumers, allow tasks suffixed with "-ECS" to map to their base task + // Remove the "-ECS" suffix to get the base task name + if ($enableEcs) { + $data['jobtype'] = preg_replace('/-ECS$/i', '', $data['jobtype']); + } $taskname = 'Queue' . $data['jobtype']; if ($this->{$taskname}->autoUnserialize) { @@ -527,6 +540,16 @@ public function getOptionParser() { 'default' => '' ]; + $subcommandParserSqs = [ + 'options' => [ + 'enable-ecs' => [ + 'help' => 'Enable ECS mode - only process messages', + 'boolean' => true, + 'default' => false + ] + ] + ]; + return parent::getOptionParser() ->description(__d('cake_console', "Simple and minimalistic job queue (or deferred-task) system.")) ->addSubcommand('clean', [ @@ -548,7 +571,11 @@ public function getOptionParser() { ->addSubcommand('runworker', [ 'help' => 'Run Worker', 'parser' => $subcommandParserFull - ]); + ]) + ->addSubcommand('runworkersqs', [ + 'help' => 'Run Worker SQS', + 'parser' => $subcommandParserSqs + ]); } /** diff --git a/Model/QueuedTask.php b/Model/QueuedTask.php index 8d1dbbc5..c5ce4854 100644 --- a/Model/QueuedTask.php +++ b/Model/QueuedTask.php @@ -87,6 +87,27 @@ public function nextPriority($priority) return $this; } + public function isCompanyForEcsQueue($bidId) { + $companyBid = ClassRegistry::init('CompanyBid')->find('first', [ + 'conditions' => [ + 'CompanyBid.id' => $bidId, + ], + 'contain' => false, + 'fields' => ['company_id'] + ]); + if (empty($companyBid)) { + return false; + } + $companySetting = ClassRegistry::init('Symphosize.CompanySetting')->find('first', [ + 'conditions' => [ + 'CompanySetting.company_id' => $companyBid['CompanyBid']['company_id'], + 'CompanySetting.slug' => 'ecs_queue_enabled', + ], + 'contain' => false, + ]); + return !empty($companySetting['CompanySetting']['value']); + } + /** * Add a new Job to the Queue. * @@ -114,12 +135,18 @@ public function createJob($jobName, $data = null, $notBefore = null, $group = nu $dupeKey = $data['company_id'] . '.' . $data['sub_service_id']; break; case 'SaveConnection': - $additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0; + $additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0; $dupeKey = $data['bidId'] . '.' . $additionalKey; + if ($this->isCompanyForEcsQueue($data['bidId'])) { + $jobName = 'SaveConnection-ECS'; + } break; case 'SaveSingleConnection': - $additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0; + $additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0; $dupeKey = $data['provider'] . '.' . $data['bidId'] . '.' . $additionalKey; + if ($this->isCompanyForEcsQueue($data['bidId'])) { + $jobName = 'SaveSingleConnection-ECS'; + } break; case 'SyncIntercomCompany': $dupeKey = $data['company_id']; From f3fbeb4b87af83dbd576d279ae301a796c535a78 Mon Sep 17 00:00:00 2001 From: xrompdev Date: Wed, 5 Nov 2025 01:22:47 +0800 Subject: [PATCH 31/35] feat: enforce ecs flag for all company --- Model/QueuedTask.php | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Model/QueuedTask.php b/Model/QueuedTask.php index c5ce4854..57249d16 100644 --- a/Model/QueuedTask.php +++ b/Model/QueuedTask.php @@ -88,6 +88,9 @@ public function nextPriority($priority) } public function isCompanyForEcsQueue($bidId) { + if (Configure::read('Queue.enforceEcsForAll.connection')) { + return true; + } $companyBid = ClassRegistry::init('CompanyBid')->find('first', [ 'conditions' => [ 'CompanyBid.id' => $bidId, From f56eb5cabdb580f14fbe8d8b676c07c8b72cf63e Mon Sep 17 00:00:00 2001 From: rom Date: Mon, 1 Dec 2025 16:13:40 +0800 Subject: [PATCH 32/35] feat: queue enrtypoint for reinit default --- Model/QueuedTask.php | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) mode change 100644 => 100755 Model/QueuedTask.php diff --git a/Model/QueuedTask.php b/Model/QueuedTask.php old mode 100644 new mode 100755 index 57249d16..1b9ae510 --- a/Model/QueuedTask.php +++ b/Model/QueuedTask.php @@ -111,6 +111,19 @@ public function isCompanyForEcsQueue($bidId) { return !empty($companySetting['CompanySetting']['value']); } + private function isCompanyForReInitEcsQueue($companyId) { + if (Configure::read('Queue.enforceEcsForAll.reinit')) { + return true; + } + $companySetting = ClassRegistry::init('Symphosize.CompanySetting')->find('first', [ + 'conditions' => [ + 'CompanySetting.company_id' => $companyId, + 'CompanySetting.slug' => 'ecs_reinit_queue_enabled', + ], + 'contain' => false, + ]); + return !empty($companySetting['CompanySetting']['value']); + } /** * Add a new Job to the Queue. * @@ -133,6 +146,9 @@ public function createJob($jobName, $data = null, $notBefore = null, $group = nu break; case 'ReinitFollowUpInstance': $dupeKey = $data['instance_id']; + if ($this->isCompanyForReInitEcsQueue($data['company_id'])) { + $jobName = 'ReinitFollowUpInstance-ECS'; + } break; case 'ReleaseModule': $dupeKey = $data['company_id'] . '.' . $data['sub_service_id']; @@ -303,7 +319,7 @@ public function requestSqsJob($queueUrl, $waitTime=20) { $dbRecord = $this->find('first', [ 'conditions' => [ 'id' => $data['id'] - ], + ], 'contain' => false, ]); if(!$data['retryCount']) { From 9ff956c22106a05d498fc9d7271d84fd9a6611b1 Mon Sep 17 00:00:00 2001 From: rom Date: Sat, 13 Dec 2025 15:58:48 +0800 Subject: [PATCH 33/35] feat: completely enable ecs in default --- Model/QueuedTask.php | 9 --------- 1 file changed, 9 deletions(-) diff --git a/Model/QueuedTask.php b/Model/QueuedTask.php index 1b9ae510..0f6874db 100755 --- a/Model/QueuedTask.php +++ b/Model/QueuedTask.php @@ -146,9 +146,6 @@ public function createJob($jobName, $data = null, $notBefore = null, $group = nu break; case 'ReinitFollowUpInstance': $dupeKey = $data['instance_id']; - if ($this->isCompanyForReInitEcsQueue($data['company_id'])) { - $jobName = 'ReinitFollowUpInstance-ECS'; - } break; case 'ReleaseModule': $dupeKey = $data['company_id'] . '.' . $data['sub_service_id']; @@ -156,16 +153,10 @@ public function createJob($jobName, $data = null, $notBefore = null, $group = nu case 'SaveConnection': $additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0; $dupeKey = $data['bidId'] . '.' . $additionalKey; - if ($this->isCompanyForEcsQueue($data['bidId'])) { - $jobName = 'SaveConnection-ECS'; - } break; case 'SaveSingleConnection': $additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0; $dupeKey = $data['provider'] . '.' . $data['bidId'] . '.' . $additionalKey; - if ($this->isCompanyForEcsQueue($data['bidId'])) { - $jobName = 'SaveSingleConnection-ECS'; - } break; case 'SyncIntercomCompany': $dupeKey = $data['company_id']; From 0c043bdecb29f907bcd7e062541b94e7a1c18464 Mon Sep 17 00:00:00 2001 From: rom Date: Mon, 9 Feb 2026 16:16:51 +0800 Subject: [PATCH 34/35] feat: add worker max message process --- Console/Command/QueueShell.php | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/Console/Command/QueueShell.php b/Console/Command/QueueShell.php index 652ee9e9..a0908522 100644 --- a/Console/Command/QueueShell.php +++ b/Console/Command/QueueShell.php @@ -33,6 +33,8 @@ class QueueShell extends AppShell { protected $_exit; + protected $_messagesProcessed = 0; + /** * Overwrite shell initialize to dynamically load all Queue Related Tasks. * @@ -256,6 +258,7 @@ public function runworker() { $this->QueuedTask->markJobFailed($data['id'], $failureMessage); $this->out('Job did not finish, requeued.'); } + $this->_messagesProcessed++; } elseif (Configure::read('Queue.exitwhennothingtodo')) { $this->out('nothing to do, exiting.'); $this->_exit = true; @@ -269,6 +272,11 @@ public function runworker() { $this->_exit = true; $this->out('Reached runtime of ' . (time() - $starttime) . ' Seconds (Max ' . Configure::read('Queue.workermaxruntime') . '), terminating.'); } + // check if we have processed the maximum number of messages + if (Configure::read('Queue.workermaxmessages') && $this->_messagesProcessed >= Configure::read('Queue.workermaxmessages')) { + $this->_exit = true; + $this->out('Processed ' . $this->_messagesProcessed . ' messages (Max ' . Configure::read('Queue.workermaxmessages') . '), exiting gracefully.'); + } if ($this->_exit || rand(0, 100) > (100 - Configure::read('Queue.gcprob'))) { $this->out('Performing Old job cleanup.'); $this->QueuedTask->cleanOldJobs(); @@ -417,6 +425,7 @@ public function runworkersqs() { $this->QueuedTask->markJobFailedSqs($data, $queueUrl, $failureMessage); $this->out('Job did not finish, requeued.'); } + $this->_messagesProcessed++; } elseif (Configure::read('Queue.exitwhennothingtodo')) { $this->out('nothing to do, exiting.'); $this->_exit = true; @@ -430,6 +439,11 @@ public function runworkersqs() { $this->_exit = true; $this->out('Reached runtime of ' . (time() - $starttime) . ' Seconds (Max ' . Configure::read('Queue.workermaxruntime') . '), terminating.'); } + // check if we have processed the maximum number of messages + if (Configure::read('Queue.workermaxmessages') && $this->_messagesProcessed >= Configure::read('Queue.workermaxmessages')) { + $this->_exit = true; + $this->out('Processed ' . $this->_messagesProcessed . ' messages (Max ' . Configure::read('Queue.workermaxmessages') . '), exiting gracefully.'); + } if ($this->_exit || rand(0, 100) > (100 - Configure::read('Queue.gcprob'))) { $this->out('Performing Old job cleanup.'); $this->QueuedTask->cleanOldJobs(); From 474f1a2f91bc33db1c61f78ed543144ae36b4507 Mon Sep 17 00:00:00 2001 From: xrompdev Date: Wed, 22 Apr 2026 23:35:08 +0800 Subject: [PATCH 35/35] feat: add jitter to worker max runtime to prevent thundering herd Reads Queue.workermaxruntimejitter config and adds a random offset to each worker's max runtime so they don't all terminate simultaneously. --- Console/Command/QueueShell.php | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/Console/Command/QueueShell.php b/Console/Command/QueueShell.php index a0908522..7caa98f4 100644 --- a/Console/Command/QueueShell.php +++ b/Console/Command/QueueShell.php @@ -188,6 +188,9 @@ public function runworker() { $this->_exit = false; $starttime = time(); + $baseRuntime = Configure::read('Queue.workermaxruntime'); + $jitter = (int) Configure::read('Queue.workermaxruntimejitter'); + $maxRuntime = $baseRuntime ? $baseRuntime + rand(0, $jitter) : 0; $group = null; if (!empty($this->params['group'])) { $group = $this->params['group']; @@ -268,9 +271,9 @@ public function runworker() { } // check if we are over the maximum runtime and end processing if so. - if (Configure::read('Queue.workermaxruntime') && (time() - $starttime) >= Configure::read('Queue.workermaxruntime')) { + if ($maxRuntime && (time() - $starttime) >= $maxRuntime) { $this->_exit = true; - $this->out('Reached runtime of ' . (time() - $starttime) . ' Seconds (Max ' . Configure::read('Queue.workermaxruntime') . '), terminating.'); + $this->out('Reached runtime of ' . (time() - $starttime) . ' Seconds (Max ' . $maxRuntime . '), terminating.'); } // check if we have processed the maximum number of messages if (Configure::read('Queue.workermaxmessages') && $this->_messagesProcessed >= Configure::read('Queue.workermaxmessages')) { @@ -348,6 +351,9 @@ public function runworkersqs() { $this->_exit = false; $starttime = time(); + $baseRuntime = Configure::read('Queue.workermaxruntime'); + $jitter = (int) Configure::read('Queue.workermaxruntimejitter'); + $maxRuntime = $baseRuntime ? $baseRuntime + rand(0, $jitter) : 0; $group = null; if (!empty($this->params['group'])) { $group = $this->params['group']; @@ -435,9 +441,9 @@ public function runworkersqs() { } // check if we are over the maximum runtime and end processing if so. - if (Configure::read('Queue.workermaxruntime') && (time() - $starttime) >= Configure::read('Queue.workermaxruntime')) { + if ($maxRuntime && (time() - $starttime) >= $maxRuntime) { $this->_exit = true; - $this->out('Reached runtime of ' . (time() - $starttime) . ' Seconds (Max ' . Configure::read('Queue.workermaxruntime') . '), terminating.'); + $this->out('Reached runtime of ' . (time() - $starttime) . ' Seconds (Max ' . $maxRuntime . '), terminating.'); } // check if we have processed the maximum number of messages if (Configure::read('Queue.workermaxmessages') && $this->_messagesProcessed >= Configure::read('Queue.workermaxmessages')) {