Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions packages/durabletask-js/src/testing/in-memory-backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,17 +172,27 @@ export class InMemoryOrchestrationBackend {
throw new Error(`Orchestration instance '${instanceId}' not found`);
}

if (this.isTerminalStatus(instance.status)) {
return; // Cannot suspend a completed/failed/terminated instance
}

if (instance.status === pb.OrchestrationStatus.ORCHESTRATION_STATUS_SUSPENDED) {
return;
}

// Update status immediately to match real sidecar behavior, where the
// suspend RPC transitions the orchestration to SUSPENDED right away.
instance.status = pb.OrchestrationStatus.ORCHESTRATION_STATUS_SUSPENDED;

const event = pbh.newSuspendEvent();
instance.pendingEvents.push(event);
instance.lastUpdatedAt = new Date();

if (!this.orchestrationQueueSet.has(instanceId)) {
this.enqueueOrchestration(instanceId);
}

this.notifyWaiters(instanceId);
}

/**
Expand All @@ -194,13 +204,27 @@ export class InMemoryOrchestrationBackend {
throw new Error(`Orchestration instance '${instanceId}' not found`);
}

// No-op for terminal or non-suspended instances
if (this.isTerminalStatus(instance.status)) {
return;
}

if (instance.status !== pb.OrchestrationStatus.ORCHESTRATION_STATUS_SUSPENDED) {
return;
}

// Transition from SUSPENDED back to RUNNING to match real sidecar behavior.
instance.status = pb.OrchestrationStatus.ORCHESTRATION_STATUS_RUNNING;

const event = pbh.newResumeEvent();
instance.pendingEvents.push(event);
instance.lastUpdatedAt = new Date();

if (!this.orchestrationQueueSet.has(instanceId)) {
this.enqueueOrchestration(instanceId);
}

this.notifyWaiters(instanceId);
}

/**
Expand Down
181 changes: 181 additions & 0 deletions packages/durabletask-js/test/in-memory-backend.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -530,4 +530,185 @@ describe("In-Memory Backend", () => {

await expect(waitPromise).rejects.toThrow("Backend was reset");
});

describe("suspend and resume status", () => {
it("should update status to SUSPENDED when suspend is called", async () => {
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
yield ctx.waitForExternalEvent("proceed");
return "done";
};

worker.addOrchestrator(orchestrator);
await worker.start();

const id = await client.scheduleNewOrchestration(orchestrator);
await client.waitForOrchestrationStart(id, false, 10);

await client.suspendOrchestration(id);

const state = await client.getOrchestrationState(id);
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.SUSPENDED);
});

it("should update status to RUNNING when resume is called after suspend", async () => {
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
yield ctx.waitForExternalEvent("proceed");
return "done";
};

worker.addOrchestrator(orchestrator);
await worker.start();

const id = await client.scheduleNewOrchestration(orchestrator);
await client.waitForOrchestrationStart(id, false, 10);

await client.suspendOrchestration(id);
let state = await client.getOrchestrationState(id);
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.SUSPENDED);

await client.resumeOrchestration(id);
state = await client.getOrchestrationState(id);
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.RUNNING);
});
Comment on lines +553 to +572

it("should complete successfully after suspend and resume", async () => {
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
const val: number = yield ctx.waitForExternalEvent("proceed");
return val * 2;
};

worker.addOrchestrator(orchestrator);
await worker.start();

const id = await client.scheduleNewOrchestration(orchestrator);
await client.waitForOrchestrationStart(id, false, 10);

// Suspend the orchestration
await client.suspendOrchestration(id);
let state = await client.getOrchestrationState(id);
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.SUSPENDED);

// Send an event while suspended (will be buffered)
await client.raiseOrchestrationEvent(id, "proceed", 21);

// Resume the orchestration
await client.resumeOrchestration(id);

// Wait for completion — the buffered event should be processed
state = await client.waitForOrchestrationCompletion(id, true, 10);
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED);
expect(state?.serializedOutput).toEqual(JSON.stringify(42));
});

it("should be idempotent when suspend is called twice", async () => {
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
yield ctx.waitForExternalEvent("proceed");
return "done";
};

worker.addOrchestrator(orchestrator);
await worker.start();

const id = await client.scheduleNewOrchestration(orchestrator);
await client.waitForOrchestrationStart(id, false, 10);

// Call suspend twice — should not throw
await client.suspendOrchestration(id);
await client.suspendOrchestration(id);

const state = await client.getOrchestrationState(id);
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.SUSPENDED);
});

it("should notify state waiters on resume", async () => {
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
yield ctx.waitForExternalEvent("proceed");
return "done";
};

worker.addOrchestrator(orchestrator);
await worker.start();

const id = await client.scheduleNewOrchestration(orchestrator);
await client.waitForOrchestrationStart(id, false, 10);

await client.suspendOrchestration(id);

// Set up a waiter for RUNNING status, then resume
const runningPromise = backend.waitForState(
id,
(inst) => backend.toClientStatus(inst.status) === OrchestrationStatus.RUNNING,
5000,
);

await client.resumeOrchestration(id);

const runningInstance = await runningPromise;
expect(runningInstance).toBeDefined();
expect(backend.toClientStatus(runningInstance!.status)).toEqual(OrchestrationStatus.RUNNING);
});

it("should be a no-op when suspending a completed instance", async () => {
// eslint-disable-next-line require-yield
const orchestrator: TOrchestrator = async function* (_ctx: OrchestrationContext): any {
return "done";
};

worker.addOrchestrator(orchestrator);
await worker.start();

const id = await client.scheduleNewOrchestration(orchestrator);
const state = await client.waitForOrchestrationCompletion(id, true, 10);
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED);

// Suspend on a completed instance should be a no-op
await client.suspendOrchestration(id);
const afterSuspend = await client.getOrchestrationState(id);
expect(afterSuspend?.runtimeStatus).toEqual(OrchestrationStatus.COMPLETED);
});

it("should be a no-op when resuming a non-suspended instance", async () => {
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
yield ctx.waitForExternalEvent("proceed");
return "done";
};

worker.addOrchestrator(orchestrator);
await worker.start();

const id = await client.scheduleNewOrchestration(orchestrator);
await client.waitForOrchestrationStart(id, false, 10);

// Resume on a RUNNING (non-suspended) instance should be a no-op
await client.resumeOrchestration(id);
const state = await client.getOrchestrationState(id);
expect(state?.runtimeStatus).toEqual(OrchestrationStatus.RUNNING);
});

it("should notify state waiters on suspend", async () => {
const orchestrator: TOrchestrator = async function* (ctx: OrchestrationContext): any {
yield ctx.waitForExternalEvent("proceed");
return "done";
};

worker.addOrchestrator(orchestrator);
await worker.start();

const id = await client.scheduleNewOrchestration(orchestrator);
await client.waitForOrchestrationStart(id, false, 10);

// Set up a waiter for SUSPENDED status, then suspend
const suspendedPromise = backend.waitForState(
id,
(inst) => backend.toClientStatus(inst.status) === OrchestrationStatus.SUSPENDED,
5000,
);

await client.suspendOrchestration(id);

const suspendedInstance = await suspendedPromise;
expect(suspendedInstance).toBeDefined();
expect(backend.toClientStatus(suspendedInstance!.status)).toEqual(OrchestrationStatus.SUSPENDED);
});
});
});
Loading