diff --git a/packages/durabletask-js/src/testing/in-memory-backend.ts b/packages/durabletask-js/src/testing/in-memory-backend.ts index 66cb406..47d07a2 100644 --- a/packages/durabletask-js/src/testing/in-memory-backend.ts +++ b/packages/durabletask-js/src/testing/in-memory-backend.ts @@ -172,10 +172,18 @@ export class InMemoryOrchestrationBackend { throw new Error(`Orchestration instance '${instanceId}' not found`); } + if (this.isTerminalStatus(instance.status)) { + return; // Cannot suspend a completed/failed/terminated instance + } + if (instance.status === pb.OrchestrationStatus.ORCHESTRATION_STATUS_SUSPENDED) { return; } + // Update status immediately to match real sidecar behavior, where the + // suspend RPC transitions the orchestration to SUSPENDED right away. + instance.status = pb.OrchestrationStatus.ORCHESTRATION_STATUS_SUSPENDED; + const event = pbh.newSuspendEvent(); instance.pendingEvents.push(event); instance.lastUpdatedAt = new Date(); @@ -183,6 +191,8 @@ export class InMemoryOrchestrationBackend { if (!this.orchestrationQueueSet.has(instanceId)) { this.enqueueOrchestration(instanceId); } + + this.notifyWaiters(instanceId); } /** @@ -194,6 +204,18 @@ export class InMemoryOrchestrationBackend { throw new Error(`Orchestration instance '${instanceId}' not found`); } + // No-op for terminal or non-suspended instances + if (this.isTerminalStatus(instance.status)) { + return; + } + + if (instance.status !== pb.OrchestrationStatus.ORCHESTRATION_STATUS_SUSPENDED) { + return; + } + + // Transition from SUSPENDED back to RUNNING to match real sidecar behavior. + instance.status = pb.OrchestrationStatus.ORCHESTRATION_STATUS_RUNNING; + const event = pbh.newResumeEvent(); instance.pendingEvents.push(event); instance.lastUpdatedAt = new Date(); @@ -201,6 +223,8 @@ export class InMemoryOrchestrationBackend { if (!this.orchestrationQueueSet.has(instanceId)) { this.enqueueOrchestration(instanceId); } + + this.notifyWaiters(instanceId); } /** diff --git a/packages/durabletask-js/test/in-memory-backend.spec.ts b/packages/durabletask-js/test/in-memory-backend.spec.ts index 81a82a8..936fece 100644 --- a/packages/durabletask-js/test/in-memory-backend.spec.ts +++ b/packages/durabletask-js/test/in-memory-backend.spec.ts @@ -530,4 +530,185 @@ describe("In-Memory Backend", () => { await expect(waitPromise).rejects.toThrow("Backend was reset"); }); + + describe("suspend and resume status", () => { + it("should update status to SUSPENDED when suspend is called", async () => { + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + yield ctx.waitForExternalEvent("proceed"); + return "done"; + }; + + worker.addOrchestrator(orchestrator); + await worker.start(); + + const id = await client.scheduleNewOrchestration(orchestrator); + await client.waitForOrchestrationStart(id, false, 10); + + await client.suspendOrchestration(id); + + const state = await client.getOrchestrationState(id); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.SUSPENDED); + }); + + it("should update status to RUNNING when resume is called after suspend", async () => { + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + yield ctx.waitForExternalEvent("proceed"); + return "done"; + }; + + worker.addOrchestrator(orchestrator); + await worker.start(); + + const id = await client.scheduleNewOrchestration(orchestrator); + await client.waitForOrchestrationStart(id, false, 10); + + await client.suspendOrchestration(id); + let state = await client.getOrchestrationState(id); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.SUSPENDED); + + await client.resumeOrchestration(id); + state = await client.getOrchestrationState(id); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.RUNNING); + }); + + it("should complete successfully after suspend and resume", async () => { + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + const val: number = yield ctx.waitForExternalEvent("proceed"); + return val * 2; + }; + + worker.addOrchestrator(orchestrator); + await worker.start(); + + const id = await client.scheduleNewOrchestration(orchestrator); + await client.waitForOrchestrationStart(id, false, 10); + + // Suspend the orchestration + await client.suspendOrchestration(id); + let state = await client.getOrchestrationState(id); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.SUSPENDED); + + // Send an event while suspended (will be buffered) + await client.raiseOrchestrationEvent(id, "proceed", 21); + + // Resume the orchestration + await client.resumeOrchestration(id); + + // Wait for completion — the buffered event should be processed + state = await client.waitForOrchestrationCompletion(id, true, 10); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); + expect(state?.serializedOutput).toEqual(JSON.stringify(42)); + }); + + it("should be idempotent when suspend is called twice", async () => { + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + yield ctx.waitForExternalEvent("proceed"); + return "done"; + }; + + worker.addOrchestrator(orchestrator); + await worker.start(); + + const id = await client.scheduleNewOrchestration(orchestrator); + await client.waitForOrchestrationStart(id, false, 10); + + // Call suspend twice — should not throw + await client.suspendOrchestration(id); + await client.suspendOrchestration(id); + + const state = await client.getOrchestrationState(id); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.SUSPENDED); + }); + + it("should notify state waiters on resume", async () => { + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + yield ctx.waitForExternalEvent("proceed"); + return "done"; + }; + + worker.addOrchestrator(orchestrator); + await worker.start(); + + const id = await client.scheduleNewOrchestration(orchestrator); + await client.waitForOrchestrationStart(id, false, 10); + + await client.suspendOrchestration(id); + + // Set up a waiter for RUNNING status, then resume + const runningPromise = backend.waitForState( + id, + (inst) => backend.toClientStatus(inst.status) === OrchestrationStatus.RUNNING, + 5000, + ); + + await client.resumeOrchestration(id); + + const runningInstance = await runningPromise; + expect(runningInstance).toBeDefined(); + expect(backend.toClientStatus(runningInstance!.status)).toEqual(OrchestrationStatus.RUNNING); + }); + + it("should be a no-op when suspending a completed instance", async () => { + // eslint-disable-next-line require-yield + const orchestrator: TOrchestrator = async function* (_ctx: OrchestrationContext): any { + return "done"; + }; + + worker.addOrchestrator(orchestrator); + await worker.start(); + + const id = await client.scheduleNewOrchestration(orchestrator); + const state = await client.waitForOrchestrationCompletion(id, true, 10); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); + + // Suspend on a completed instance should be a no-op + await client.suspendOrchestration(id); + const afterSuspend = await client.getOrchestrationState(id); + expect(afterSuspend?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED); + }); + + it("should be a no-op when resuming a non-suspended instance", async () => { + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + yield ctx.waitForExternalEvent("proceed"); + return "done"; + }; + + worker.addOrchestrator(orchestrator); + await worker.start(); + + const id = await client.scheduleNewOrchestration(orchestrator); + await client.waitForOrchestrationStart(id, false, 10); + + // Resume on a RUNNING (non-suspended) instance should be a no-op + await client.resumeOrchestration(id); + const state = await client.getOrchestrationState(id); + expect(state?.runtimeStatus).toEqual(OrchestrationStatus.RUNNING); + }); + + it("should notify state waiters on suspend", async () => { + const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any { + yield ctx.waitForExternalEvent("proceed"); + return "done"; + }; + + worker.addOrchestrator(orchestrator); + await worker.start(); + + const id = await client.scheduleNewOrchestration(orchestrator); + await client.waitForOrchestrationStart(id, false, 10); + + // Set up a waiter for SUSPENDED status, then suspend + const suspendedPromise = backend.waitForState( + id, + (inst) => backend.toClientStatus(inst.status) === OrchestrationStatus.SUSPENDED, + 5000, + ); + + await client.suspendOrchestration(id); + + const suspendedInstance = await suspendedPromise; + expect(suspendedInstance).toBeDefined(); + expect(backend.toClientStatus(suspendedInstance!.status)).toEqual(OrchestrationStatus.SUSPENDED); + }); + }); });