From 68c69df42f1c83939c847c259676f017b1403077 Mon Sep 17 00:00:00 2001 From: Aaron Chung Date: Mon, 9 Mar 2026 18:58:55 -0700 Subject: [PATCH 1/4] Async event bus publishing in AsyncJobManagerImpl to reduce API thread contention publishOnEventBus() was calling _messageBus.publish() synchronously on the request thread, which blocks on MessageBusBase$Gate (an exclusive mutex). JFR analysis showed this causing up to 107ms waits on Jetty request threads, contributing to 502 errors from the upstream load balancer. Move event bus publishing to a dedicated single-threaded executor so API request threads are no longer blocked by Gate contention. Event ordering is preserved by the single-threaded executor. --- .../framework/jobs/impl/AsyncJobManagerImpl.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java index 7672b9dc6f97..faab23a50118 100644 --- a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java +++ b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java @@ -184,6 +184,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, private volatile long _executionRunNumber = 1; private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat")); + private final ExecutorService _eventBusPublisher = Executors.newSingleThreadExecutor(new NamedThreadFactory("AsyncJobMgr-EventBus")); private ExecutorService _apiJobExecutor; private ExecutorService _workerJobExecutor; @@ -1378,6 +1379,7 @@ public boolean start() { @Override public boolean stop() { _heartbeatScheduler.shutdown(); + _eventBusPublisher.shutdown(); _apiJobExecutor.shutdown(); _workerJobExecutor.shutdown(); return true; @@ -1397,8 +1399,13 @@ protected AsyncJobManagerImpl() { } private void publishOnEventBus(AsyncJob job, String jobEvent) { - _messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL, - new Pair(job, jobEvent)); + try { + _eventBusPublisher.submit(() -> + _messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL, + new Pair(job, jobEvent))); + } catch (RejectedExecutionException e) { + s_logger.warn("Failed to publish async job event, event bus publisher is shut down", e); + } } @Override From 26e5d3c726df033339f7c3a8d42fd0b63da37761 Mon Sep 17 00:00:00 2001 From: Aaron Chung Date: Mon, 9 Mar 2026 19:21:31 -0700 Subject: [PATCH 2/4] Wrap async event bus publish in ManagedContextRunnable for DB connection safety The sole subscriber (ApiServer.handleAsyncJobPublishEvent) performs DAO reads (getUserIncludingRemoved, getAccount, findById) inside its callback. Without ManagedContextRunnable, the EventBus thread would not have proper TransactionLegacy lifecycle management, risking DB connection leaks. --- .../framework/jobs/impl/AsyncJobManagerImpl.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java index faab23a50118..dd351a3830b4 100644 --- a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java +++ b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java @@ -1400,9 +1400,13 @@ protected AsyncJobManagerImpl() { private void publishOnEventBus(AsyncJob job, String jobEvent) { try { - _eventBusPublisher.submit(() -> - _messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL, - new Pair(job, jobEvent))); + _eventBusPublisher.submit(new ManagedContextRunnable() { + @Override + protected void runInContext() { + _messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL, + new Pair(job, jobEvent)); + } + }); } catch (RejectedExecutionException e) { s_logger.warn("Failed to publish async job event, event bus publisher is shut down", e); } From 8f996acdeb467dce0abf59bdc9fa823bd261f445 Mon Sep 17 00:00:00 2001 From: Suresh Kumar Anaparti Date: Tue, 7 Apr 2026 16:09:45 +0530 Subject: [PATCH 3/4] logger fix --- .../cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java index dd351a3830b4..312bd805db91 100644 --- a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java +++ b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java @@ -1408,7 +1408,7 @@ protected void runInContext() { } }); } catch (RejectedExecutionException e) { - s_logger.warn("Failed to publish async job event, event bus publisher is shut down", e); + logger.warn("Failed to publish async job event, event bus publisher is shut down", e); } } From 97b15fc2af799902eb3697792f28583e77f55a91 Mon Sep 17 00:00:00 2001 From: Suresh Kumar Anaparti Date: Tue, 7 Apr 2026 17:16:01 +0530 Subject: [PATCH 4/4] review comments --- .../framework/jobs/impl/AsyncJobManagerImpl.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java index 312bd805db91..05cd763d2f2a 100644 --- a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java +++ b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java @@ -1403,8 +1403,13 @@ private void publishOnEventBus(AsyncJob job, String jobEvent) { _eventBusPublisher.submit(new ManagedContextRunnable() { @Override protected void runInContext() { - _messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL, - new Pair(job, jobEvent)); + try { + _messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL, + new Pair<>(job, jobEvent)); + } catch (Throwable t) { + logger.warn("Failed to publish async job event on message bus. jobId={}, jobEvent={}", + job != null ? job.getId() : null, jobEvent, t); + } } }); } catch (RejectedExecutionException e) {