diff --git a/.travis.yml b/.travis.yml index ec075d36..49d117db 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,56 +6,38 @@ php: - 5.6 env: - - CAKE_VERSION=2.6 - - CAKE_VERSION=2.7 + global: + - REPO_NAME=cakephp-queue + - PLUGIN_NAME=Queue + - REQUIRE="" + + matrix: + - DB=mysql CAKE_VERSION=2.7 + - DB=mysql CAKE_VERSION=2.8 matrix: include: - php: 5.4 env: - - PHPCS=1 - - allow_failures: - - php: 5.4 - env: - - PHPCS=1 + - DB=mysql CAKE_VERSION=2.7 COVERALLS=1 before_script: - - sh -c "if [ '$PHPCS' != '1' ]; then git clone --depth 1 --branch $CAKE_VERSION git://github.com/cakephp/cakephp ../cakephp; else git clone --depth 1 --branch 2.6 git://github.com/cakephp/cakephp ../cakephp; fi" && cd ../cakephp - - rm -R app - - git clone --depth 1 --branch master git://github.com/dereuromark/tools-app app - - git clone --depth 1 --branch master git://github.com/dereuromark/cakephp-tools plugins/Tools - - git clone --depth 1 --branch master git://github.com/dereuromark/cakephp-shim plugins/Shim - - cp -R ../cakephp-queue plugins/Queue - - sh -c "mysql -e 'CREATE DATABASE cakephp_test;'" - - mkdir ./app/tmp - - mkdir ./app/tmp/logs - - mkdir ./app/tmp/queue - - mkdir ./app/tmp/tests - - mkdir ./app/tmp/cache - - mkdir ./app/tmp/cache/persistent - - mkdir ./app/tmp/cache/models - - chmod -R 0777 ./app/tmp - - sh -c "if [ '$PHPCS' = '1' ]; then pear channel-discover pear.cakephp.org; fi" - - sh -c "if [ '$PHPCS' = '1' ]; then pear install --alldeps cakephp/CakePHP_CodeSniffer; fi" - - phpenv rehash - - echo " 'Database/Mysql', - 'database' => 'cakephp_test', - 'host' => '0.0.0.0', - 'login' => 'travis', - 'persistent' => false, - ); - }" > app/Config/database.php - - sh -c "if [ '$TRAVIS_PHP_VERSION' != '5.2' ]; then composer global require 'phpunit/phpunit=3.7.33'; fi" - - sh -c "if [ '$TRAVIS_PHP_VERSION' != '5.2' ]; then ln -s ~/.composer/vendor/phpunit/phpunit/PHPUnit ./vendors/PHPUnit; fi" - - cd app + - git clone -b master https://github.com/FriendsOfCake/travis.git --depth 1 ../travis + - ../travis/before_script.sh + - cd ../cakephp/app + - composer require --dev --no-interaction --prefer-source dereuromark/cakephp-tools:0.* + - echo " Model/AppModel.php + - echo " Controller/AppController.php + - echo " Config/email.php + - echo "Configure::write('Security.salt', 'AycG93b0qyJfIxfs2guVoUubWwvniR2G0FgaC9mi');" >> Config/bootstrap.php + - echo "Configure::write('Security.cipherSeed', '16659201697453542496749683615');" >> Config/bootstrap.php + - cd .. script: - - sh -c "if [ '$PHPCS' != '1' ]; then ../lib/Cake/Console/cake test Queue AllQueue --stderr; fi" - - sh -c "if [ '$PHPCS' = '1' ]; then cd ..; phpcs -p --extensions=php --standard=CakePHP ./plugins/Queue; fi" + - ../travis/script.sh + +after_success: + - ../travis/after_success.sh notifications: email: false diff --git a/Config/Migration/1453822680_adding_priority_field_to_queue_tasks.php b/Config/Migration/1453822680_adding_priority_field_to_queue_tasks.php new file mode 100644 index 00000000..0226a984 --- /dev/null +++ b/Config/Migration/1453822680_adding_priority_field_to_queue_tasks.php @@ -0,0 +1,60 @@ + array( + 'create_field' => [ + 'queued_tasks' => [ + 'priority' => [ + 'type' => 'integer', + 'null' => false, + 'default' => 5, + 'length' => 4 + ], + 'indexes' => array( + 'priority' => array('column' => 'priority'), + ), + ], + ] + ), + 'down' => array( + 'drop_field' => [ + 'queued_tasks' => [ + 'priority' + ], + ] + ), + ); + +/** + * Before migration callback + * + * @param string $direction Direction of migration process (up or down) + * @return bool Should process continue + */ + public function before($direction) { + return true; + } + +/** + * After migration callback + * + * @param string $direction Direction of migration process (up or down) + * @return bool Should process continue + */ + public function after($direction) { + return true; + } +} diff --git a/Config/queue.php b/Config/queue.php index fc033948..f8f6a3ec 100644 --- a/Config/queue.php +++ b/Config/queue.php @@ -37,5 +37,5 @@ 'log' => true, // set to false to disable (tmp = file in TMP dir) - 'notify' => 'tmp' + 'notify' => 'tmp', ]; diff --git a/Console/Command/CronShell.php b/Console/Command/CronShell.php index 4ab8ec1f..cbf3c03a 100644 --- a/Console/Command/CronShell.php +++ b/Console/Command/CronShell.php @@ -110,7 +110,7 @@ public function main() { } /** - * Look for a Queue Task of hte passed name and try to call add() on it. + * Look for a Queue Task of the passed name and try to call add() on it. * A QueueTask may provide an add function to enable the user to create new jobs via commandline. * * @return void diff --git a/Console/Command/QueueShell.php b/Console/Command/QueueShell.php index b77a0996..7caa98f4 100644 --- a/Console/Command/QueueShell.php +++ b/Console/Command/QueueShell.php @@ -33,6 +33,8 @@ class QueueShell extends AppShell { protected $_exit; + protected $_messagesProcessed = 0; + /** * Overwrite shell initialize to dynamically load all Queue Related Tasks. * @@ -43,7 +45,7 @@ public function initialize() { foreach ($paths as $path) { $Folder = new Folder($path); - $res = array_merge($this->tasks, $Folder->find('Queue.*\.php')); + $res = array_merge($this->tasks, $Folder->find('Queue.+\.php')); foreach ($res as &$r) { $r = basename($r, 'Task.php'); } @@ -54,7 +56,7 @@ public function initialize() { $pluginPaths = App::path('Console/Command/Task', $plugin); foreach ($pluginPaths as $pluginPath) { $Folder = new Folder($pluginPath); - $res = $Folder->find('Queue.*Task\.php'); + $res = $Folder->find('Queue.+Task\.php'); foreach ($res as &$r) { $r = $plugin . '.' . basename($r, 'Task.php'); } @@ -186,6 +188,9 @@ public function runworker() { $this->_exit = false; $starttime = time(); + $baseRuntime = Configure::read('Queue.workermaxruntime'); + $jitter = (int) Configure::read('Queue.workermaxruntimejitter'); + $maxRuntime = $baseRuntime ? $baseRuntime + rand(0, $jitter) : 0; $group = null; if (!empty($this->params['group'])) { $group = $this->params['group']; @@ -200,11 +205,32 @@ public function runworker() { touch($pidFilePath . $pidFileName); } $this->_log('runworker', isset($pid) ? $pid : null); - $this->out('Looking for Job....'); + $this->out('[' . date('Y-m-d H:i:s') . '] Looking for Job ...'); + $data = $this->QueuedTask->requestJob($this->_getTaskConf(), $group); if ($this->QueuedTask->exit === true) { $this->_exit = true; } else { + // check if we are over the memory limit and end processing if so + $memoryLimit = Configure::read('Queue.workermaxmemory'); + $workerMaxMemoryTimeout = Configure::read('Queue.workermaxmemorytimeout'); + if ($memoryLimit) { + $memoryUsage = $this->_humanReadableBytes(memory_get_usage(true)); + $this->out('Memory usage: ' . $memoryUsage); + if ($memoryUsage >= $memoryLimit) { + $this->out('Reached memory limit of ' . $memoryUsage . ' (Max ' . $memoryLimit . 'MB), skipping this job.'); + // Mark job as failed due to memory constraints + $this->QueuedTask->markJobFailed($data['id'], 'Not enough memory to run this job. Worker memory usage hit ' . $memoryUsage . 'MB, over the ' . $memoryLimit . 'MB max limit.'); + if ($workerMaxMemoryTimeout) { + $this->out('Exiting in ' . $workerMaxMemoryTimeout . ' seconds due to memory limit.'); + sleep($workerMaxMemoryTimeout); + $this->_exit = true; + } + sleep(Configure::read('Queue.sleeptime')); + $this->hr(); + continue; + } + } if ($data) { $this->out('Running Job of type "' . $data['jobtype'] . '"'); $taskname = 'Queue' . $data['jobtype']; @@ -212,7 +238,18 @@ public function runworker() { if ($this->{$taskname}->autoUnserialize) { $data['data'] = unserialize($data['data']); } - $return = $this->{$taskname}->run($data['data'], $data['id']); + //prevent tasks that don't catch their own errors from killing this worker + try { + $return = $this->{$taskname}->run($data['data'], $data['id']); + } catch ( Exception $e) + { + //assume job failed + $return = false; + + //log the exception + $this->_logError( $taskname ."\n\n". $e->getMessage() ."\n\n". $e->getTraceAsString() ); + } + if ($return) { $this->QueuedTask->markJobDone($data['id']); $this->out('Job Finished.'); @@ -224,6 +261,7 @@ public function runworker() { $this->QueuedTask->markJobFailed($data['id'], $failureMessage); $this->out('Job did not finish, requeued.'); } + $this->_messagesProcessed++; } elseif (Configure::read('Queue.exitwhennothingtodo')) { $this->out('nothing to do, exiting.'); $this->_exit = true; @@ -233,9 +271,14 @@ public function runworker() { } // check if we are over the maximum runtime and end processing if so. - if (Configure::read('Queue.workermaxruntime') && (time() - $starttime) >= Configure::read('Queue.workermaxruntime')) { + if ($maxRuntime && (time() - $starttime) >= $maxRuntime) { + $this->_exit = true; + $this->out('Reached runtime of ' . (time() - $starttime) . ' Seconds (Max ' . $maxRuntime . '), terminating.'); + } + // check if we have processed the maximum number of messages + if (Configure::read('Queue.workermaxmessages') && $this->_messagesProcessed >= Configure::read('Queue.workermaxmessages')) { $this->_exit = true; - $this->out('Reached runtime of ' . (time() - $starttime) . ' Seconds (Max ' . Configure::read('Queue.workermaxruntime') . '), terminating.'); + $this->out('Processed ' . $this->_messagesProcessed . ' messages (Max ' . Configure::read('Queue.workermaxmessages') . '), exiting gracefully.'); } if ($this->_exit || rand(0, 100) > (100 - Configure::read('Queue.gcprob'))) { $this->out('Performing Old job cleanup.'); @@ -249,6 +292,176 @@ public function runworker() { } } + /** + * Run a QueueWorker loop. + * Runs a Queue Worker process which will try to find unassigned jobs in the queue + * which it may run and try to fetch and execute them. + * + * @return void + */ + public function runworkersqs() { + //queue url is passed in thru URL + $queueUrl = $this->args[0]; + + if(!$queueUrl) { + $this->out('sqs queue url as first parameter is required'); + return; + } + + // Check if running in ECS mode + $enableEcs = !empty($this->params['enable-ecs']); + if ($enableEcs) { + $this->out('[ECS MODE] Only processing messages'); + } else { + $this->out('[EC2 MODE] Only processing messages'); + } + + + if ($pidFilePath = Configure::read('Queue.pidfilepath')) { + if (!file_exists($pidFilePath)) { + mkdir($pidFilePath, 0755, true); + } + if (function_exists('posix_getpid')) { + $pid = posix_getpid(); + } else { + $pid = $this->QueuedTask->key(); + } + # global file + $fp = fopen($pidFilePath . 'queue.pid', "w"); + fwrite($fp, $pid); + fclose($fp); + # specific pid file + if (function_exists('posix_getpid')) { + $pid = posix_getpid(); + } else { + $pid = $this->QueuedTask->key(); + } + $pidFileName = 'queue_' . $pid . '.pid'; + $fp = fopen($pidFilePath . $pidFileName, "w"); + fwrite($fp, $pid); + fclose($fp); + } + // Enable Garbage Collector (PHP >= 5.3) + if (function_exists('gc_enable')) { + gc_enable(); + } + if (function_exists('pcntl_signal')) { + pcntl_signal(SIGTERM, [&$this, "_exit"]); + } + $this->_exit = false; + + $starttime = time(); + $baseRuntime = Configure::read('Queue.workermaxruntime'); + $jitter = (int) Configure::read('Queue.workermaxruntimejitter'); + $maxRuntime = $baseRuntime ? $baseRuntime + rand(0, $jitter) : 0; + $group = null; + if (!empty($this->params['group'])) { + $group = $this->params['group']; + } + while (!$this->_exit) { + // make sure accidental overriding isnt possible + set_time_limit(0); + if (!empty($pidFilePath)) { + touch($pidFilePath . 'queue.pid'); + } + if (!empty($pidFileName)) { + touch($pidFilePath . $pidFileName); + } + //$this->_log('runworker', isset($pid) ? $pid : null); + $this->out('[' . date('Y-m-d H:i:s') . '] Looking for ' . ($enableEcs ? 'ECS' : 'EC2') . ' Job ...'); + + $data = $this->QueuedTask->requestSqsJob($queueUrl); + //$data = $this->QueuedTask->requestJob($this->_getTaskConf(), $group); + if ($this->QueuedTask->exit === true) { + $this->_exit = true; + } else { + // check if we are over the memory limit and end processing if so + $memoryLimit = Configure::read('Queue.workermaxmemory'); + $workerMaxMemoryTimeout = Configure::read('Queue.workermaxmemorytimeout'); + if ($memoryLimit) { + $memoryUsage = $this->_humanReadableBytes(memory_get_usage(true)); + $this->out('Memory usage: ' . $memoryUsage); + if ($memoryUsage >= $memoryLimit) { + $this->out('Reached memory limit of ' . $memoryUsage . ' (Max ' . $memoryLimit . 'MB), skipping this job.'); + // Mark job as failed due to memory constraints + $this->QueuedTask->markJobFailed($data['id'], 'Not enough memory to run this job. Worker memory usage hit ' . $memoryUsage . 'MB, over the ' . $memoryLimit . 'MB max limit.'); + if ($workerMaxMemoryTimeout) { + $this->out('Exiting in ' . $workerMaxMemoryTimeout . ' seconds due to memory limit.'); + sleep($workerMaxMemoryTimeout); + $this->_exit = true; + } + sleep(Configure::read('Queue.sleeptime')); + $this->hr(); + continue; + } + } + if ($data) { + $this->out('Running Job of type "' . $data['jobtype'] . '"'); + // For ECS consumers, allow tasks suffixed with "-ECS" to map to their base task + // Remove the "-ECS" suffix to get the base task name + if ($enableEcs) { + $data['jobtype'] = preg_replace('/-ECS$/i', '', $data['jobtype']); + } + $taskname = 'Queue' . $data['jobtype']; + + if ($this->{$taskname}->autoUnserialize) { + $data['data'] = unserialize($data['data']); + } + //prevent tasks that don't catch their own errors from killing this worker + + try { + $return = $this->{$taskname}->run($data['data'], $data['id']); + } catch ( Exception $e) + { + //assume job failed + $return = false; + + //log the exception + $this->_logError( $taskname ."\n\n". $e->getMessage() ."\n\n". $e->getTraceAsString() ); + } + + if ($return) { + $this->QueuedTask->markJobDoneSqs($data, $queueUrl); + $this->out('Job Finished.'); + } else { + $failureMessage = null; + if (isset($this->{$taskname}->failureMessage) && !empty($this->{$taskname}->failureMessage)) { + $failureMessage = $this->{$taskname}->failureMessage; + } + $this->QueuedTask->markJobFailedSqs($data, $queueUrl, $failureMessage); + $this->out('Job did not finish, requeued.'); + } + $this->_messagesProcessed++; + } elseif (Configure::read('Queue.exitwhennothingtodo')) { + $this->out('nothing to do, exiting.'); + $this->_exit = true; + } else { + $this->out('nothing to do, sleeping.'); + sleep(Configure::read('Queue.sleeptime')); + } + + // check if we are over the maximum runtime and end processing if so. + if ($maxRuntime && (time() - $starttime) >= $maxRuntime) { + $this->_exit = true; + $this->out('Reached runtime of ' . (time() - $starttime) . ' Seconds (Max ' . $maxRuntime . '), terminating.'); + } + // check if we have processed the maximum number of messages + if (Configure::read('Queue.workermaxmessages') && $this->_messagesProcessed >= Configure::read('Queue.workermaxmessages')) { + $this->_exit = true; + $this->out('Processed ' . $this->_messagesProcessed . ' messages (Max ' . Configure::read('Queue.workermaxmessages') . '), exiting gracefully.'); + } + if ($this->_exit || rand(0, 100) > (100 - Configure::read('Queue.gcprob'))) { + $this->out('Performing Old job cleanup.'); + $this->QueuedTask->cleanOldJobs(); + } + $this->hr(); + } + } + if (!empty($pidFilePath)) { + unlink($pidFilePath . 'queue_' . $pid . '.pid'); + } + } + /** * Manually trigger a Finished job cleanup. * @@ -329,12 +542,12 @@ public function getOptionParser() { /* 'dry-run'=> array( 'short' => 'd', - 'help' => __d('cake_console', 'Dry run the update, no jobs will actually be added.'), + 'help' => 'Dry run the update, no jobs will actually be added.', 'boolean' => true ), 'log'=> array( 'short' => 'l', - 'help' => __d('cake_console', 'Log all ouput to file log.txt in TMP dir'), + 'help' => 'Log all ouput to file log.txt in TMP dir', 'boolean' => true ), */ @@ -343,32 +556,46 @@ public function getOptionParser() { $subcommandParserFull = $subcommandParser; $subcommandParserFull['options']['group'] = [ 'short' => 'g', - 'help' => __d('cake_console', 'Group'), + 'help' => 'Group', 'default' => '' ]; + $subcommandParserSqs = [ + 'options' => [ + 'enable-ecs' => [ + 'help' => 'Enable ECS mode - only process messages', + 'boolean' => true, + 'default' => false + ] + ] + ]; + return parent::getOptionParser() - ->description(__d('cake_console', "...")) + ->description(__d('cake_console', "Simple and minimalistic job queue (or deferred-task) system.")) ->addSubcommand('clean', [ - 'help' => __d('cake_console', 'Remove old jobs (cleanup)'), + 'help' => 'Remove old jobs (cleanup)', 'parser' => $subcommandParser ]) ->addSubcommand('add', [ - 'help' => __d('cake_console', 'Add Job'), + 'help' => 'Add Job', 'parser' => $subcommandParser ]) ->addSubcommand('install', [ - 'help' => __d('cake_console', 'Install info'), + 'help' => 'Install info', 'parser' => $subcommandParser ]) ->addSubcommand('uninstall', [ - 'help' => __d('cake_console', 'Uninstall info'), + 'help' => 'Uninstall info', 'parser' => $subcommandParser ]) ->addSubcommand('runworker', [ - 'help' => __d('cake_console', 'Run Worker'), + 'help' => 'Run Worker', 'parser' => $subcommandParserFull - ]); + ]) + ->addSubcommand('runworkersqs', [ + 'help' => 'Run Worker SQS', + 'parser' => $subcommandParserSqs + ]); } /** @@ -392,6 +619,21 @@ protected function _log($type, $pid = null) { } } + /** + * Timestamped log. + * + * @param string $message + * + * @internal param string $type Log type + * @internal param int $pid PID of the process + */ + protected function _logError($message = '') { + # log? + if (Configure::read('Queue.log')) { + CakeLog::write('queue-error', $message); + } + } + /** * Timestamped notification. * @@ -466,4 +708,19 @@ public function __destruct() { } } +/** + * Format bytes into human readable format + * + * @param int $bytes Number of bytes + * @return string Human readable bytes format + */ + protected function _humanReadableBytes($bytes) { + $units = ['B', 'KB', 'MB', 'GB', 'TB']; + $bytes = max($bytes, 0); + $pow = floor(($bytes ? log($bytes) : 0) / log(1024)); + $pow = min($pow, count($units) - 1); + $bytes /= pow(1024, $pow); + return round($bytes, 2) . ' ' . $units[$pow]; + } + } diff --git a/Console/Command/QueueTestShell.php b/Console/Command/QueueTestShell.php index bb386e67..262f3635 100644 --- a/Console/Command/QueueTestShell.php +++ b/Console/Command/QueueTestShell.php @@ -1,5 +1,7 @@ to('markscherer@gmx.de', 'Mark Test'); diff --git a/Controller/CronTasksController.php b/Controller/CronTasksController.php index bef03d86..f0471933 100644 --- a/Controller/CronTasksController.php +++ b/Controller/CronTasksController.php @@ -20,7 +20,6 @@ public function beforeFilter() { * @return void */ public function index() { - $this->CronTask->recursive = 0; $cronTasks = $this->paginate(); $this->set(compact('cronTasks')); } @@ -111,7 +110,6 @@ public function delete($id = null) { * @return void */ public function admin_index() { - $this->CronTask->recursive = 0; $cronTasks = $this->paginate(); $this->set(compact('cronTasks')); } diff --git a/Controller/QueueController.php b/Controller/QueueController.php index 364e8d60..173127c9 100644 --- a/Controller/QueueController.php +++ b/Controller/QueueController.php @@ -43,7 +43,7 @@ public function admin_index() { public function admin_reset() { $this->request->allowMethod('post'); $res = $this->QueuedTask->truncate(); - + if ($res) { $message = __d('queue', 'OK'); $class = 'success'; @@ -51,9 +51,9 @@ public function admin_reset() { $message = __d('queue', 'Error'); $class = 'error'; } - + if (isset($this->Flash)) { - $this->Flash->message($message, $class); + $this->Flash->$class($message); } else { $this->Session->setFlash($message, 'default', ['class' => $class]); } diff --git a/Model/CronTask.php b/Model/CronTask.php index 5c5b0057..552b5ee5 100644 --- a/Model/CronTask.php +++ b/Model/CronTask.php @@ -21,14 +21,14 @@ class CronTask extends QueueAppModel { public $validate = [ 'jobtype' => [ - 'notempty' => [ - 'rule' => ['notempty'], + 'notEmpty' => [ + 'rule' => ['notEmpty'], 'message' => 'valErrMandatoryField', ], ], 'name' => [ - 'notempty' => [ - 'rule' => ['notempty'], + 'notEmpty' => [ + 'rule' => ['notEmpty'], 'message' => 'valErrMandatoryField', 'last' => true ], @@ -39,8 +39,8 @@ class CronTask extends QueueAppModel { ], ], 'title' => [ - 'notempty' => [ - 'rule' => ['notempty'], + 'notEmpty' => [ + 'rule' => ['notEmpty'], 'message' => 'valErrMandatoryField', 'last' => true ], @@ -132,6 +132,7 @@ public function requestJob($capabilities, $group = null) { 'timediff(NOW(),notbefore) AS age' ], 'order' => [ + 'priority ASC', 'age DESC', 'id ASC' ], diff --git a/Model/QueuedTask.php b/Model/QueuedTask.php old mode 100644 new mode 100755 index b1c44daa..0f6874db --- a/Model/QueuedTask.php +++ b/Model/QueuedTask.php @@ -1,7 +1,10 @@ virtualFields['status'] = '(CASE WHEN ' . $this->alias . '.notbefore > NOW() THEN \'NOT_READY\' WHEN ' . $this->alias . '.fetched IS null THEN \'NOT_STARTED\' WHEN ' . $this->alias . '.fetched IS NOT null AND ' . $this->alias . '.completed IS null AND ' . $this->alias . '.failed = 0 THEN \'IN_PROGRESS\' WHEN ' . $this->alias . '.fetched IS NOT null AND ' . $this->alias . '.completed IS null AND ' . $this->alias . '.failed > 0 THEN \'FAILED\' WHEN ' . $this->alias . '.fetched IS NOT null AND ' . $this->alias . '.completed IS NOT null THEN \'COMPLETED\' ELSE \'UNKNOWN\' END)'; + + $Settings = ClassRegistry::init('Symphosize.Setting'); + $settings = $Settings->getValues([ + 'aws_key', + 'aws_secret_key' + ]); + + //set up the flySystem + $this->sqsClient = SqsClient::factory([ + 'key' => $settings['aws_key'], + 'secret' => $settings['aws_secret_key'], + 'region' => 'us-east-1' + ]); } /** @@ -61,6 +81,49 @@ public function initConfig() { Configure::write('Queue', $conf); } + public function nextPriority($priority) + { + $this->_next_priority = $priority; + return $this; + } + + public function isCompanyForEcsQueue($bidId) { + if (Configure::read('Queue.enforceEcsForAll.connection')) { + return true; + } + $companyBid = ClassRegistry::init('CompanyBid')->find('first', [ + 'conditions' => [ + 'CompanyBid.id' => $bidId, + ], + 'contain' => false, + 'fields' => ['company_id'] + ]); + if (empty($companyBid)) { + return false; + } + $companySetting = ClassRegistry::init('Symphosize.CompanySetting')->find('first', [ + 'conditions' => [ + 'CompanySetting.company_id' => $companyBid['CompanyBid']['company_id'], + 'CompanySetting.slug' => 'ecs_queue_enabled', + ], + 'contain' => false, + ]); + return !empty($companySetting['CompanySetting']['value']); + } + + private function isCompanyForReInitEcsQueue($companyId) { + if (Configure::read('Queue.enforceEcsForAll.reinit')) { + return true; + } + $companySetting = ClassRegistry::init('Symphosize.CompanySetting')->find('first', [ + 'conditions' => [ + 'CompanySetting.company_id' => $companyId, + 'CompanySetting.slug' => 'ecs_reinit_queue_enabled', + ], + 'contain' => false, + ]); + return !empty($companySetting['CompanySetting']['value']); + } /** * Add a new Job to the Queue. * @@ -72,19 +135,146 @@ public function initConfig() { * @return array Created Job array containing id, data, ... */ public function createJob($jobName, $data = null, $notBefore = null, $group = null, $reference = null) { + //to try and prevent duplicate jobs for already scheduled jobs we will try a "by dupekey" lookup for certain job types + $dupeKey = false; + switch($jobName) { + case 'CrewCalNotifyCalendarUpdate': + $dupeKey = $data['calendar_id']; + break; + case 'GoogleCalendarChannelKeepAlive': + $dupeKey = $data['id']; + break; + case 'ReinitFollowUpInstance': + $dupeKey = $data['instance_id']; + break; + case 'ReleaseModule': + $dupeKey = $data['company_id'] . '.' . $data['sub_service_id']; + break; + case 'SaveConnection': + $additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0; + $dupeKey = $data['bidId'] . '.' . $additionalKey; + break; + case 'SaveSingleConnection': + $additionalKey = isset($data['isStatusChanged']) ? (int) $data['isStatusChanged'] : 0; + $dupeKey = $data['provider'] . '.' . $data['bidId'] . '.' . $additionalKey; + break; + case 'SyncIntercomCompany': + $dupeKey = $data['company_id']; + break; + case 'UpdateModuleService': + $dupeKey = $data['sub_service_id']; + break; + } + + //check for duplicate + if($dupeKey) { + $dupe = false; + try { + $dupe = $this->find('first', [ + 'conditions' => [ + 'jobtype' => $jobName, + 'dupekey' => $dupeKey, + 'fetched IS NULL' + + ], + 'contain' => false, + ]); + } catch(Exception $e) { + + } + if($dupe) { + return false; + } + } + $data = [ 'jobtype' => $jobName, 'data' => serialize($data), 'group' => $group, - 'reference' => $reference + 'reference' => $reference, + 'priority' => $this->_next_priority ]; + //save dupeKey if we have one + if($dupeKey) { + $data['dupekey'] = $dupeKey; + } + $this->_next_priority = 5; // reset to default; if ($notBefore !== null) { $data['notbefore'] = date('Y-m-d H:i:s', strtotime($notBefore)); } $this->create(); - return $this->save($data); + $result = $this->save($data); + + //send sqs message + if ($notBefore !== null) { + $delaySeconds = 2; + try { + $now = strtotime('now'); + $targetDate = strtotime($notBefore); + $delaySeconds = abs($now - $targetDate); + if($delaySeconds > 900){ + $delaySeconds = 890; + } + } catch(Exception $e) { + + } + $this->triggerSqsMessage($jobName, $this->id, 0, $delaySeconds); + } else { + $this->triggerSqsMessage($jobName, $this->id); + } + + + return $result; } + public function findAndRescheduleTasks() { + $backDate = date('Y-m-d H:i:s', strtotime('-5 minutes')); + + foreach($this->find('all', [ + 'conditions' => [ + 'created < ' => $backDate, + 'fetched IS NULL', + 'notbefore IS NULL', + ], + 'contain' => false + ]) as $queuedTask) { + $this->triggerSqsMessage($queuedTask['QueuedTask']['jobtype'], $queuedTask['QueuedTask']['id']); + } + } + + public function triggerSqsMessage($jobName, $taskId, $retryCount=0, $delaySeconds=2) { + $queueUrl = false; + try { + $queues = Configure::read('Queue.sqs_queues'); + if(array_key_exists($jobName, $queues)) { + $queueUrl = $queues[$jobName]; + } else { + $queueUrl = $queues['DEFAULT']; + } + + $response = $this->sqsClient->sendMessage(array( + 'QueueUrl' => $queueUrl, + 'DelaySeconds' => $delaySeconds, + 'MessageBody' => json_encode([ + 'id' => $taskId, + 'retryCount' => $retryCount, + 'jobtype' => $jobName + ]) + )); + } catch(Exception $e) { + try { + CakeLog::write( 'QUEUE_SQS_TRIGGER', json_encode([ + 'error' => $e->getMessage(), + 'queueUrl' => $queueUrl, + 'jobtype' => $jobName + ]) ); + } catch (Exception $e) { + + } + } + + } + /** * Set exit to true on error * @@ -94,6 +284,165 @@ public function onError() { $this->exit = true; } + public function requestSqsJob($queueUrl, $waitTime=20) { + //get record from SQS + $result = $this->sqsClient->receiveMessage(array( + 'QueueUrl' => $queueUrl, + 'WaitTimeSeconds' => $waitTime + )); + + + if(!$result || !$result->get('Messages')) { + echo "\nNo messages\n"; + return []; + } + $message = $result->get('Messages')[0]; + + + try { + $data = json_decode($message['Body'], true); + } catch(\Exception $e) { + echo "\n JSON decode failed\n"; + return []; + } + + //read record from database + $dbRecord = $this->find('first', [ + 'conditions' => [ + 'id' => $data['id'] + ], + 'contain' => false, + ]); + if(!$data['retryCount']) { + $data['retryCount'] = 0; + } + + if(!$dbRecord) { + echo "\n DB record lookup failed\n"; + print_r($data); + echo "\ndb record\n"; + print_r($dbRecord); + if($data['retryCount'] < 1) { + $data['retryCount']++; + echo "\nAttempting a retry: " . $data['retryCount'] . "\n"; + $this->triggerSqsMessage($data['jobtype'], $data['id'], $data['retryCount'], 10); + + } + //doesn't exist or is completed + $this->deleteSqsMessage($queueUrl, $message['ReceiptHandle']); + return []; + } + + if($dbRecord['QueuedTask']['completed']) { + //already done + $this->deleteSqsMessage($queueUrl, $message['ReceiptHandle']); + return []; + } + + //confirm we are not withing the last fetch window + if($dbRecord['QueuedTask']['fetched']) { + $maxTime = new DateTime('-1 minutes'); + $fetchDate = new DateTime($dbRecord['QueuedTask']['fetched']); + if($maxTime < $fetchDate) { + $this->triggerSqsMessage($data['jobtype'], $data['id'], $data['retryCount'], 60); + //not timed out yet, ignore + $this->deleteSqsMessage($queueUrl, $message['ReceiptHandle']); + return []; + } + } + + + //claim record against database + $key = $this->key(); + $date = date("Y-m-d H:i:s"); + + $db = $this->getDataSource(); + $dateString = $db->value($date, 'string'); + $workerKey = $db->value($key, 'string'); + + $this->updateAll([ + 'fetched' => $dateString, + 'workerkey' => $workerKey, + ],[ + 'id' => $dbRecord['QueuedTask']['id'], + 'fetched' => $dbRecord['QueuedTask']['fetched'], + 'workerkey' => $dbRecord['QueuedTask']['workerkey'], + ]); + + $confirmRecord = $this->find('first', [ + 'conditions' => [ + 'id' => $data['id'] + ], + 'contain' => false, + ]); + + if( + !$confirmRecord || + $confirmRecord['QueuedTask']['workerkey'] !== $key || + $confirmRecord['QueuedTask']['fetched'] !== $date + ) { + echo "\n claim failed\n"; + print_r($confirmRecord); + CakeLog::write( 'SQS_CLAIM_FAILED_' . $data['id'], json_encode([ + 'data' => $data, + 'queueUrl' => $queueUrl, + 'confirmRecord' => $confirmRecord + ]) ); + //did not claim + $this->deleteSqsMessage($queueUrl, $message['ReceiptHandle']); + return []; + } + + $confirmRecord['QueuedTask']['sqsReceiptHandle'] = $message['ReceiptHandle']; + + return $confirmRecord['QueuedTask']; + } + + public function markJobDoneSqs($dbRecord, $sqsQueueUrl) { + $fields = [ + $this->alias . '.completed' => "'" . date('Y-m-d H:i:s') . "'" + ]; + $conditions = [ + $this->alias . '.id' => $dbRecord['id'] + ]; + $this->updateAll($fields, $conditions); + + $this->deleteSqsMessage($sqsQueueUrl, $dbRecord['sqsReceiptHandle']); + } + + /** + * Mark a job as Failed, Incrementing the failed-counter and Requeueing it. + * + * @param int $id ID of task + * @param string $failureMessage Optional message to append to the failure_message field. + * @return bool Success + */ + public function markJobFailedSqs($dbRecord, $sqsQueueUrl, $failureMessage = null) { + $db = $this->getDataSource(); + $fields = [ + $this->alias . '.failed' => $this->alias . '.failed + 1', + $this->alias . '.failure_message' => $db->value($failureMessage), + ]; + $conditions = [ + $this->alias . '.id' => $dbRecord['id'] + ]; + $this->updateAll($fields, $conditions); + + $this->deleteSqsMessage($sqsQueueUrl, $dbRecord['sqsReceiptHandle']); + + } + + public function deleteSqsMessage($queueUrl, $receiptHandle) { + try { + $this->sqsClient->deleteMessage([ + 'QueueUrl' => $queueUrl, + 'ReceiptHandle' => $receiptHandle + ]); + } catch(\Exception $e) { + + } + } + /** * Look for a new job that can be processed with the current abilities and * from the specified group (or any if null). @@ -119,6 +468,7 @@ public function requestJob($capabilities, $group = null) { 'age', ], 'order' => [ + 'priority ASC', 'age ASC', 'id ASC' ], @@ -175,7 +525,7 @@ public function requestJob($capabilities, $group = null) { //debug($key);ob_flush(); // try to update one of the found tasks with the key of this worker. - $this->query('UPDATE ' . $this->tablePrefix . $this->table . ' SET workerkey = "' . $key . '", fetched = "' . date('Y-m-d H:i:s') . '" WHERE ' . implode(' OR ', $whereClause) . ' ORDER BY ' . $this->virtualFields['age'] . ' ASC, id ASC LIMIT 1'); + $this->query('UPDATE ' . $this->tablePrefix . $this->table . ' SET workerkey = "' . $key . '", fetched = "' . date('Y-m-d H:i:s') . '" WHERE ' . implode(' OR ', $whereClause) . ' ORDER BY priority ASC, ' . $this->virtualFields['age'] . ' ASC, id ASC LIMIT 1'); // Read which one actually got updated, which is the job we are supposed to execute. $data = $this->find('first', [ diff --git a/README.md b/README.md index 3c551b49..0d6254fb 100644 --- a/README.md +++ b/README.md @@ -1,22 +1,10 @@ # CakePHP Queue Plugin -[![Build Status](https://api.travis-ci.org/dereuromark/cakephp-queue.png)](https://travis-ci.org/dereuromark/cakephp-queue) +[![Build Status](https://api.travis-ci.org/dereuromark/cakephp-queue.svg?branch=2.x)](https://travis-ci.org/dereuromark/cakephp-queue) [![Minimum PHP Version](http://img.shields.io/badge/php-%3E%3D%205.4-8892BF.svg)](https://php.net/) -[![License](https://poser.pugx.org/dereuromark/cakephp-queue/license.png)](https://packagist.org/packages/dereuromark/cakephp-queue) +[![License](https://poser.pugx.org/dereuromark/cakephp-queue/license)](https://packagist.org/packages/dereuromark/cakephp-queue) [![Total Downloads](https://poser.pugx.org/dereuromark/cakephp-queue/d/total.png)](https://packagist.org/packages/dereuromark/cakephp-queue) -Modified by Mark Scherer ([dereuromark](https://github.com/dereuromark)) -- CakePHP2.x support -- Some minor fixes -- Added crontasks (as a different approach on specific problems) -- Possible (optional) Tools Plugin dependencies for frontend access via /admin/queue -- Config key "queue" is now "Queue" ($config['Queue'][...]) - -Added by Christian Charukiewicz ([charukiewicz](https://github.com/charukiewicz)): -- Configuration option 'gcprop' is now 'gcprob' -- Fixed typo in README and variable name (Propability -> Probability) -- Added a few lines about createJob() usage to README -- Added comments to queue.php explaining configuration options - +This branch is for use with **CakePHP 2**. ## Background: @@ -164,3 +152,19 @@ Play around with it, but just don't shoot over the top. * Add priority * Cleanup and better test coverage + +### History + +Modified by Mark Scherer ([dereuromark](https://github.com/dereuromark)) +- CakePHP2.x support +- Some minor fixes +- Added crontasks (as a different approach on specific problems) +- Possible (optional) Tools Plugin dependencies for frontend access via /admin/queue +- Config key "queue" is now "Queue" ($config['Queue'][...]) + +Added by Christian Charukiewicz ([charukiewicz](https://github.com/charukiewicz)): +- Configuration option 'gcprop' is now 'gcprob' +- Fixed typo in README and variable name (Propability -> Probability) +- Added a few lines about createJob() usage to README +- Added comments to queue.php explaining configuration options + diff --git a/Test/Case/Controller/CronTasksControllerTest.php b/Test/Case/Controller/CronTasksControllerTest.php deleted file mode 100644 index 9eb129e6..00000000 --- a/Test/Case/Controller/CronTasksControllerTest.php +++ /dev/null @@ -1,31 +0,0 @@ -CronTasks = new TestCronTasksController(new CakeRequest, new CakeResponse); - $this->CronTasks->constructClasses(); - } - - public function testObject() { - $this->assertInstanceOf('CronTasksController', $this->CronTasks); - } - -} - -class TestCronTasksController extends CronTasksController { - - public $autoRender = false; - - public function redirect($url, $status = null, $exit = true) { - $this->redirectUrl = $url; - } - -} diff --git a/View/CronTasks/admin_index.ctp b/View/CronTasks/admin_index.ctp index a8171f76..9b9e9aaa 100644 --- a/View/CronTasks/admin_index.ctp +++ b/View/CronTasks/admin_index.ctp @@ -42,7 +42,7 @@ foreach ($cronTasks as $cronTask):
date(FORMAT_DB_DATETIME)) { - echo $this->Format->cIcon(ICON_WARNING, 'Achtung'); + echo $this->Format->cIcon(ICON_WARNING, ['title' => __('Warning')]); } ?> @@ -56,7 +56,7 @@ foreach ($cronTasks as $cronTask):
-element('pagination', [], ['plugin'=>'tools']); ?>
+element('Tools.pagination'); ?> @@ -66,4 +66,4 @@ foreach ($cronTasks as $cronTask): - \ No newline at end of file +