Fix thread safety issues in Communicator, Session, and AsyncPromiseFulfillerDecorator#433
Fix thread safety issues in Communicator, Session, and AsyncPromiseFulfillerDecorator#433neinhart wants to merge 1 commit intoChargeTimeEU:masterfrom
Conversation
…lfillerDecorator - Replace non-thread-safe ArrayDeque with ConcurrentLinkedDeque in Communicator transaction queue to fix race condition where RetryRunner thread and sendCall() concurrently access the queue without synchronization - Replace boolean failedFlag with AtomicBoolean for cross-thread visibility between RetryRunner and EventHandler - Replace unbounded CachedThreadPool with bounded ThreadPoolExecutor in AsyncPromiseFulfillerDecorator (core=CPUs, max=CPUs*2, queue=1000, CallerRunsPolicy) to prevent thread explosion when many chargers send requests simultaneously - Remove unnecessary synchronized from Session.onCall() — all internal state is already thread-safe (ConcurrentHashMap, stateless Gson, immutable FeatureRepository). The lock serialized all incoming CALL messages per session despite OCPP supporting concurrent requests via unique message IDs - Add automatic cleanup of pendingPromises via whenComplete callback to prevent unbounded map growth
|
Hi @neinhart, I haven't fully looked into all issues, but this one: | Additionally, pendingPromises entries are never removed after completion, causing unbounded map growth. does not seem correct to me. See Session#completePendingPromise() which removes pendingPromises when completing. |
|
Hi @robert-s-ubi, thanks for the careful review and the correction — you're right that my wording in the PR description was inaccurate. Let me trace through the code more precisely: Normal success path — you're correct:
So in the successful path, entries are removed correctly as you pointed out. Exception path — where I think there is still a leak: In } catch (Exception ex) {
logger.warn(\"fulfillPromise() failed\", ex);
if (promise != null) {
promise.completeExceptionally(ex); // <-- bypasses completePendingPromise
}
}In this case, The That said, I agree the PR description overstates the issue — it's an edge case in the exception path, not a general leak. Would you prefer:
Happy to go either way. Thanks again for taking the time to review! |
@neinhart I'd prefer option 2, and put the promise leak in a separate PR. Something similar was already fixed with the promises for outgoing requests in this commit: 32903d4 - and that change replaced the existing promise removals with a single whenComplete() removal. I'd like to solve this issue the same way, if possible. |
robert-s-ubi
left a comment
There was a problem hiding this comment.
I haven't looked at the tests yet. I suppose not all will apply when the completion leak fix is removed?
| "An error occurred. Sending this information: uniqueId {}: action: {}, errorCode: {}," | ||
| + " errorDescription: {}", |
There was a problem hiding this comment.
Please revert this unnecessary formatting change.
| "Payload for Action is syntactically correct but at least one of the fields violates" | ||
| + " occurrence constraints"; |
There was a problem hiding this comment.
Please revert this unnecessary formatting change.
| "An internal error occurred and the receiver was not able to process the requested Action" | ||
| + " successfully"; |
There was a problem hiding this comment.
Please revert this unnecessary formatting change.
| if (request.validate()) { | ||
| CompletableFuture<Confirmation> promise = new CompletableFuture<>(); | ||
| promise.whenComplete(new ConfirmationHandler(id, action, communicator)); | ||
| promise.whenComplete((result, error) -> pendingPromises.remove(id)); |
There was a problem hiding this comment.
Please revert and make this change in a different PR.
| int coreSize = Runtime.getRuntime().availableProcessors(); | ||
| int maxSize = coreSize * 2; |
There was a problem hiding this comment.
I'm not sure about the effects of this. Note that this also affects the request handling on the charging station side, and I've seen CSMS "bomb" charging stations with parallel ChangeConfigurationRequests for all read-write configuration settings. Would this still work the same as before?
Also, I gather that once the threads are exhausted (which may be only 2 on a single-core charging station), the executor switches to executing the request handler within the onCall() caller's thread. But if then an exception is thrown in the request handler, it would not result in the same response as when it was thrown in a separate thread, would it?
Problem
Several thread safety issues exist in
ocpp-commonthat can cause data corruptionor resource exhaustion under concurrent load:
Communicator:
transactionQueueuses non-thread-safeArrayDeque, but isaccessed concurrently by
sendCall()and theRetryRunnerthread withoutsynchronization.
failedFlagis a plainbooleanwith no cross-threadvisibility guarantee.
AsyncPromiseFulfillerDecorator: Default executor is
Executors.newCachedThreadPool()which creates threads without bound. When many chargers send requests simultaneously,
this can exhaust memory with
OutOfMemoryError: unable to create new native thread.Session.onCall(): The
synchronizedkeyword serializes all incoming CALL messagesper session, but all internal state is already thread-safe (
ConcurrentHashMap,stateless Gson, immutable
FeatureRepository). Additionally,pendingPromisesentries are never removed after completion, causing unbounded map growth.
Changes
ArrayDequewithConcurrentLinkedDequeandbooleanwithAtomicBooleanin
CommunicatorCachedThreadPoolwith boundedThreadPoolExecutorinAsyncPromiseFulfillerDecorator(core=CPUs, max=CPUs×2, queue=1000,CallerRunsPolicyfor natural backpressure). ExistingsetExecutor()APIis preserved for custom configuration.
synchronizedfromSession.onCall()whenCompletecallback to auto-removependingPromisesentriesTests
Added 3 new test classes (10 tests total):
AsyncPromiseFulfillerDecoratorTest— async delegation, non-blocking, executor resilienceCommunicatorConcurrencyTest— concurrent sendCall, concurrent sendCall + reconnectSessionConcurrencyTest— concurrent onCall, pendingPromises cleanup, concurrent completion