From 17376a1a47070f2fddd2bc955d8b07d316d40e31 Mon Sep 17 00:00:00 2001 From: Vibhav Singamshetty Date: Wed, 15 Apr 2026 14:00:02 -0700 Subject: [PATCH] fix(asyncio): Avoid InvalidStateError on late callbacks after cancel --- pulsar/asyncio.py | 2 ++ tests/asyncio_test.py | 21 +++++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index 7fa7c3d7..2db4935f 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -828,6 +828,8 @@ async def close(self) -> None: def _set_future(future: asyncio.Future, result: _pulsar.Result, value: Any): def complete(): + if future.done(): + return if result == _pulsar.Result.Ok: future.set_result(value) else: diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index 8a441c44..3cc1078c 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -39,6 +39,7 @@ Consumer, Producer, PulsarException, + _set_future, ) from pulsar.schema import ( # pylint: disable=import-error AvroSchema, @@ -484,5 +485,25 @@ class ExampleRecord(Record): # pylint: disable=too-few-public-methods self.assertEqual(msg.value().int_field, 42) +class AsyncioSetFutureTest(IsolatedAsyncioTestCase): + """Tests for asyncio bridge helpers (no live Pulsar broker).""" + + async def test_set_future_noop_when_future_cancelled(self): + loop = asyncio.get_running_loop() + fut = loop.create_future() + fut.cancel() + _set_future(fut, _pulsar.Result.Ok, None) + await asyncio.sleep(0) + self.assertTrue(fut.cancelled()) + + async def test_set_future_noop_when_future_already_resolved(self): + loop = asyncio.get_running_loop() + fut = loop.create_future() + fut.set_result("first") + _set_future(fut, _pulsar.Result.Ok, "late") + await asyncio.sleep(0) + self.assertEqual(fut.result(), "first") + + if __name__ == '__main__': main()