From 6d4ad9270caa71c58dd8bfa2f12b3b571bbbae04 Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Fri, 17 Apr 2026 13:23:50 -0700 Subject: [PATCH] initial commit --- azurefunctions/build.gradle | 2 +- azuremanaged/build.gradle | 2 +- client/build.gradle | 2 +- .../durabletask/EntityInstanceId.java | 37 +++ .../microsoft/durabletask/EntityMetadata.java | 7 + .../TaskOrchestrationExecutor.java | 24 ++ .../durabletask/TypedEntityMetadata.java | 5 + .../durabletask/EntityInstanceIdTest.java | 58 +++++ .../TaskOrchestrationEntityEventTest.java | 67 ++++++ .../durabletask/TypedEntityMetadataTest.java | 61 +++++ .../PROTO_SOURCE_COMMIT_HASH | 2 +- .../protos/orchestrator_service.proto | 13 +- .../functions/entities/AggregatorEntity.java | 53 +++++ .../functions/entities/BankAccountEntity.java | 62 +++++ .../entities/BankAccountFunctions.java | 214 ++++++++++++++++++ .../functions/entities/CounterFunctions.java | 2 +- .../functions/entities/LifetimeFunctions.java | 2 +- .../com/functions/entities/SensorEntity.java | 43 ++++ .../functions/entities/SensorFunctions.java | 157 +++++++++++++ .../com/functions/entities/SensorState.java | 27 +++ .../com/functions/entities/bankaccounts.http | 29 +++ .../java/com/functions/entities/sensors.http | 32 +++ samples/build.gradle | 56 +++++ .../samples/BankAccountSample.java | 9 +- .../samples/CounterEntitySample.java | 4 +- .../samples/EntityCommunicationSample.java | 4 +- .../samples/EntityQuerySample.java | 4 +- .../samples/EntityReentrantSample.java | 4 +- .../samples/EntityTimeoutSample.java | 4 +- .../samples/LowLevelEntitySample.java | 4 +- .../io/durabletask/samples/SampleUtils.java | 56 +++++ .../samples/TypedEntityProxySample.java | 4 +- 32 files changed, 1026 insertions(+), 24 deletions(-) create mode 100644 samples-azure-functions/src/main/java/com/functions/entities/AggregatorEntity.java create mode 100644 samples-azure-functions/src/main/java/com/functions/entities/BankAccountEntity.java create mode 100644 samples-azure-functions/src/main/java/com/functions/entities/BankAccountFunctions.java create mode 100644 samples-azure-functions/src/main/java/com/functions/entities/SensorEntity.java create mode 100644 samples-azure-functions/src/main/java/com/functions/entities/SensorFunctions.java create mode 100644 samples-azure-functions/src/main/java/com/functions/entities/SensorState.java create mode 100644 samples-azure-functions/src/main/java/com/functions/entities/bankaccounts.http create mode 100644 samples-azure-functions/src/main/java/com/functions/entities/sensors.http create mode 100644 samples/src/main/java/io/durabletask/samples/SampleUtils.java diff --git a/azurefunctions/build.gradle b/azurefunctions/build.gradle index 3f6014ad..4d991038 100644 --- a/azurefunctions/build.gradle +++ b/azurefunctions/build.gradle @@ -6,7 +6,7 @@ plugins { } group 'com.microsoft' -version = '1.8.0' +version = '1.9.0' archivesBaseName = 'durabletask-azure-functions' def protocVersion = '3.25.8' diff --git a/azuremanaged/build.gradle b/azuremanaged/build.gradle index 1db00530..25842223 100644 --- a/azuremanaged/build.gradle +++ b/azuremanaged/build.gradle @@ -17,7 +17,7 @@ plugins { archivesBaseName = 'durabletask-azuremanaged' group 'com.microsoft' -version = '1.8.0' +version = '1.9.0' def grpcVersion = '1.78.0' def azureCoreVersion = '1.57.1' diff --git a/client/build.gradle b/client/build.gradle index a40d1039..d2e348d8 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -10,7 +10,7 @@ plugins { } group 'com.microsoft' -version = '1.8.0' +version = '1.9.0' archivesBaseName = 'durabletask-client' def grpcVersion = '1.78.0' diff --git a/client/src/main/java/com/microsoft/durabletask/EntityInstanceId.java b/client/src/main/java/com/microsoft/durabletask/EntityInstanceId.java index ad8153d8..66d1d215 100644 --- a/client/src/main/java/com/microsoft/durabletask/EntityInstanceId.java +++ b/client/src/main/java/com/microsoft/durabletask/EntityInstanceId.java @@ -2,7 +2,17 @@ // Licensed under the MIT License. package com.microsoft.durabletask; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + import javax.annotation.Nonnull; +import java.io.IOException; import java.util.Locale; import java.util.Objects; @@ -11,7 +21,12 @@ *

* The name typically corresponds to the entity class/type name, and the key identifies the specific * entity instance (e.g., a user ID or account number). + *

+ * Serializes to and deserializes from a compact string format {@code @{name}@{key}}, + * matching the .NET SDK's {@code EntityInstanceId} JSON representation. */ +@JsonSerialize(using = EntityInstanceId.Serializer.class) +@JsonDeserialize(using = EntityInstanceId.Deserializer.class) public final class EntityInstanceId implements Comparable { private final String name; private final String key; @@ -116,4 +131,26 @@ public int compareTo(@Nonnull EntityInstanceId other) { } return this.key.compareTo(other.key); } + + /** + * Jackson serializer that writes an {@code EntityInstanceId} as a compact {@code "@name@key"} string. + */ + static class Serializer extends JsonSerializer { + @Override + public void serialize(EntityInstanceId value, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + gen.writeString(value.toString()); + } + } + + /** + * Jackson deserializer that reads an {@code EntityInstanceId} from a compact {@code "@name@key"} string. + */ + static class Deserializer extends JsonDeserializer { + @Override + public EntityInstanceId deserialize(JsonParser p, DeserializationContext ctxt) + throws IOException { + return EntityInstanceId.fromString(p.getText()); + } + } } diff --git a/client/src/main/java/com/microsoft/durabletask/EntityMetadata.java b/client/src/main/java/com/microsoft/durabletask/EntityMetadata.java index 822c84b2..f094b0b5 100644 --- a/client/src/main/java/com/microsoft/durabletask/EntityMetadata.java +++ b/client/src/main/java/com/microsoft/durabletask/EntityMetadata.java @@ -2,6 +2,9 @@ // Licensed under the MIT License. package com.microsoft.durabletask; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; + import javax.annotation.Nullable; import java.time.Instant; @@ -18,8 +21,10 @@ public class EntityMetadata { private final Instant lastModifiedTime; private final int backlogQueueSize; private final String lockedBy; + @JsonIgnore private final String serializedState; private final boolean includesState; + @JsonIgnore private final DataConverter dataConverter; private volatile EntityInstanceId cachedEntityInstanceId; @@ -56,6 +61,7 @@ public class EntityMetadata { * * @return the instance ID */ + @JsonIgnore public String getInstanceId() { return this.instanceId; } @@ -65,6 +71,7 @@ public String getInstanceId() { * * @return the parsed entity instance ID */ + @JsonProperty("entityId") public EntityInstanceId getEntityInstanceId() { if (this.cachedEntityInstanceId == null) { this.cachedEntityInstanceId = EntityInstanceId.fromString(this.instanceId); diff --git a/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java b/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java index ce7ed416..b5206765 100644 --- a/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java +++ b/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java @@ -1066,6 +1066,30 @@ private void handleEventRaised(HistoryEvent e) { rawResult != null ? rawResult : "(null)")); } this.handleEntityResponseFromEventRaised(matchingTaskRecord, rawResult); + } else if (matchingTaskRecord.getDataType() == AutoCloseable.class) { + // In the Azure Functions trigger binding code path, entity lock grants arrive as + // EventRaised events (not EntityLockGranted proto events). The lock task's data type + // is AutoCloseable, which Jackson cannot instantiate because it's an interface. + // The lock handle carries no meaningful state — the actual AutoCloseable is created + // via thenApply in lockEntities() — so we complete with null and set critical section + // state here, mirroring handleEntityLockGranted(). + String criticalSectionId = eventName; + this.isInCriticalSection = true; + this.lockedEntityIds = this.pendingLockSets.remove(criticalSectionId); + if (this.lockedEntityIds == null) { + throw new NonDeterministicOrchestratorException( + "Lock granted via EventRaised for criticalSectionId=" + criticalSectionId + + " but no pending lock set was found. This indicates a non-deterministic orchestration."); + } + + if (!this.isReplaying) { + this.logger.fine(() -> String.format( + "%s: Entity lock granted via EventRaised for criticalSectionId=%s", + this.instanceId, + criticalSectionId)); + } + + task.complete(null); } else { try { Object result = this.dataConverter.deserialize( diff --git a/client/src/main/java/com/microsoft/durabletask/TypedEntityMetadata.java b/client/src/main/java/com/microsoft/durabletask/TypedEntityMetadata.java index e80dbd2e..ff059a9e 100644 --- a/client/src/main/java/com/microsoft/durabletask/TypedEntityMetadata.java +++ b/client/src/main/java/com/microsoft/durabletask/TypedEntityMetadata.java @@ -2,6 +2,9 @@ // Licensed under the MIT License. package com.microsoft.durabletask; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; + import javax.annotation.Nullable; /** @@ -60,6 +63,7 @@ public final class TypedEntityMetadata extends EntityMetadata { * @throws IllegalStateException if state was not included in this metadata * (i.e., {@link #isIncludesState()} returns {@code false}) */ + @JsonProperty("state") @Nullable public T getState() { if (!this.isIncludesState()) { @@ -75,6 +79,7 @@ public T getState() { * * @return the state type class */ + @JsonIgnore public Class getStateType() { return this.stateType; } diff --git a/client/src/test/java/com/microsoft/durabletask/EntityInstanceIdTest.java b/client/src/test/java/com/microsoft/durabletask/EntityInstanceIdTest.java index c5083a6e..edda0860 100644 --- a/client/src/test/java/com/microsoft/durabletask/EntityInstanceIdTest.java +++ b/client/src/test/java/com/microsoft/durabletask/EntityInstanceIdTest.java @@ -2,6 +2,7 @@ // Licensed under the MIT License. package com.microsoft.durabletask; +import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -212,4 +213,61 @@ void compareTo_sortsList() { assertEquals("counter", ids.get(3).getName()); assertEquals("3", ids.get(3).getKey()); } + + // region Jackson serialization tests + + @Test + void jacksonSerialization_serializesToCompactString() throws Exception { + EntityInstanceId id = new EntityInstanceId("Counter", "myKey"); + ObjectMapper mapper = new ObjectMapper(); + String json = mapper.writeValueAsString(id); + assertEquals("\"@counter@myKey\"", json); + } + + @Test + void jacksonDeserialization_deserializesFromCompactString() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + EntityInstanceId id = mapper.readValue("\"@counter@myKey\"", EntityInstanceId.class); + assertEquals("counter", id.getName()); + assertEquals("myKey", id.getKey()); + } + + @Test + void jacksonRoundTrip_preservesIdentity() throws Exception { + EntityInstanceId original = new EntityInstanceId("BankAccount", "acct-123"); + ObjectMapper mapper = new ObjectMapper(); + String json = mapper.writeValueAsString(original); + EntityInstanceId deserialized = mapper.readValue(json, EntityInstanceId.class); + assertEquals(original, deserialized); + } + + @Test + void jacksonDeserialization_inPojo_works() throws Exception { + // Simulates the CounterPayload scenario where EntityInstanceId is a field + String json = "{\"entityId\":\"@counter@c1\",\"value\":42}"; + ObjectMapper mapper = new ObjectMapper(); + TestPayload payload = mapper.readValue(json, TestPayload.class); + assertEquals("counter", payload.entityId.getName()); + assertEquals("c1", payload.entityId.getKey()); + assertEquals(42, payload.value); + } + + @Test + void jacksonSerialization_inPojo_works() throws Exception { + TestPayload payload = new TestPayload(); + payload.entityId = new EntityInstanceId("Counter", "c1"); + payload.value = 42; + ObjectMapper mapper = new ObjectMapper(); + String json = mapper.writeValueAsString(payload); + assertTrue(json.contains("\"@counter@c1\"")); + assertTrue(json.contains("\"value\":42")); + } + + /** Test POJO that embeds an EntityInstanceId, mirroring CounterPayload. */ + public static class TestPayload { + public EntityInstanceId entityId; + public int value; + } + + // endregion } diff --git a/client/src/test/java/com/microsoft/durabletask/TaskOrchestrationEntityEventTest.java b/client/src/test/java/com/microsoft/durabletask/TaskOrchestrationEntityEventTest.java index f56c0e8b..b07ac430 100644 --- a/client/src/test/java/com/microsoft/durabletask/TaskOrchestrationEntityEventTest.java +++ b/client/src/test/java/com/microsoft/durabletask/TaskOrchestrationEntityEventTest.java @@ -1331,6 +1331,73 @@ void getLockedEntities_outsideCriticalSection_returnsEmpty() { assertTrue(hasComplete, "Expected orchestration to complete"); } + /** + * Regression test: In the Azure Functions trigger binding code path, entity lock grants + * arrive as EventRaised events (not EntityLockGranted proto events). The lock task's data + * type is AutoCloseable, which Jackson cannot instantiate because it's an interface. + * This test verifies that the orchestration completes successfully when the lock grant + * arrives via EventRaised (simulating the Azure Functions path). + */ + @Test + void lockEntities_lockGrantedViaEventRaised_succeeds() { + final String orchestratorName = "LockGrantedViaEventRaisedTest"; + EntityInstanceId entityId = new EntityInstanceId("Counter", "c1"); + + TaskOrchestrationExecutor executor = createExecutor(orchestratorName, ctx -> { + AutoCloseable lock = ctx.lockEntities(Arrays.asList(entityId)).await(); + assertTrue(ctx.isInCriticalSection()); + assertFalse(ctx.getLockedEntities().isEmpty()); + try { + lock.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + ctx.complete("lock-via-event-raised"); + }); + + // First execution: orchestrator calls lockEntities, which produces a lock request action + List pastEvents1 = Arrays.asList( + orchestratorStarted(), + executionStarted(orchestratorName, "null")); + List newEvents1 = Collections.singletonList(orchestratorCompleted()); + + TaskOrchestratorResult result1 = executor.execute(pastEvents1, newEvents1, null); + + // Extract the criticalSectionId from the lock request action + String criticalSectionId = null; + try { + criticalSectionId = extractLockCriticalSectionId(result1.getActions()); + } catch (Exception e) { + fail("Failed to extract criticalSectionId: " + e.getMessage()); + } + assertNotNull(criticalSectionId, "Expected a lock request action with criticalSectionId"); + + // Second execution: replay with lock request in past, lock grant arrives as EventRaised + // (simulating the Azure Functions trigger binding path where DTFx sends lock grants + // as named events rather than proto EntityLockGranted events) + List pastEvents2 = Arrays.asList( + orchestratorStarted(), + executionStarted(orchestratorName, "null"), + eventSentEvent(0), + orchestratorCompleted()); + List newEvents2 = Arrays.asList( + orchestratorStarted(), + eventRaisedEvent(criticalSectionId, "null"), + orchestratorCompleted()); + + TaskOrchestratorResult result2 = executor.execute(pastEvents2, newEvents2, null); + + boolean hasComplete = false; + for (OrchestratorAction action : result2.getActions()) { + if (action.hasCompleteOrchestration()) { + String output = action.getCompleteOrchestration().getResult().getValue(); + assertEquals("\"lock-via-event-raised\"", output); + hasComplete = true; + } + } + assertTrue(hasComplete, "Expected orchestration to complete after lock granted via EventRaised"); + } + @Test void lockEntities_varargs_producesLockAction() { final String orchestratorName = "VarargsLockTest"; diff --git a/client/src/test/java/com/microsoft/durabletask/TypedEntityMetadataTest.java b/client/src/test/java/com/microsoft/durabletask/TypedEntityMetadataTest.java index 4d384f7a..3ed71e0e 100644 --- a/client/src/test/java/com/microsoft/durabletask/TypedEntityMetadataTest.java +++ b/client/src/test/java/com/microsoft/durabletask/TypedEntityMetadataTest.java @@ -2,6 +2,8 @@ // Licensed under the MIT License. package com.microsoft.durabletask; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.Test; import java.time.Instant; @@ -108,4 +110,63 @@ void readStateAs_stillWorksOnTypedInstance() { Integer state = typed.readStateAs(Integer.class); assertEquals(42, state); } + + // region Jackson serialization tests + + @Test + void jacksonSerialization_typedEntityMetadata_hidesInternalFields() throws Exception { + Instant now = Instant.parse("2026-01-15T10:30:00Z"); + EntityMetadata base = new EntityMetadata( + "@counter@myKey", now, 3, "orch-lock-123", "42", true, dataConverter); + + TypedEntityMetadata typed = new TypedEntityMetadata<>(base, Integer.class); + + ObjectMapper mapper = new ObjectMapper(); + mapper.findAndRegisterModules(); + String json = mapper.writeValueAsString(typed); + JsonNode root = mapper.readTree(json); + + // Should include public API fields + assertTrue(root.has("entityId"), "Should have entityId field"); + assertTrue(root.has("lastModifiedTime"), "Should have lastModifiedTime field"); + assertTrue(root.has("backlogQueueSize"), "Should have backlogQueueSize field"); + assertTrue(root.has("lockedBy"), "Should have lockedBy field"); + assertTrue(root.has("includesState"), "Should have includesState field"); + assertTrue(root.has("state"), "Should have state field"); + + // Should NOT include internal fields + assertFalse(root.has("serializedState"), "Should not expose serializedState"); + assertFalse(root.has("dataConverter"), "Should not expose dataConverter"); + assertFalse(root.has("stateType"), "Should not expose stateType"); + assertFalse(root.has("instanceId"), "Should not expose raw instanceId (use entityId instead)"); + + // Verify field values + assertEquals("@counter@myKey", root.get("entityId").asText()); + assertEquals(3, root.get("backlogQueueSize").asInt()); + assertEquals("orch-lock-123", root.get("lockedBy").asText()); + assertTrue(root.get("includesState").asBoolean()); + assertEquals(42, root.get("state").asInt()); + } + + @Test + void jacksonSerialization_entityMetadata_hidesInternalFields() throws Exception { + EntityMetadata base = new EntityMetadata( + "@counter@c1", Instant.EPOCH, 0, null, "99", true, dataConverter); + + ObjectMapper mapper = new ObjectMapper(); + mapper.findAndRegisterModules(); + String json = mapper.writeValueAsString(base); + JsonNode root = mapper.readTree(json); + + // Should include public API fields + assertTrue(root.has("entityId"), "Should have entityId field"); + assertTrue(root.has("lastModifiedTime"), "Should have lastModifiedTime field"); + + // Should NOT include internal fields + assertFalse(root.has("serializedState"), "Should not expose serializedState"); + assertFalse(root.has("dataConverter"), "Should not expose dataConverter"); + assertFalse(root.has("instanceId"), "Should not expose raw instanceId"); + } + + // endregion } diff --git a/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH b/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH index 0ef1ed22..dcc8b2f3 100644 --- a/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH +++ b/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH @@ -1 +1 @@ -1caadbd7ecfdf5f2309acbeac28a3e36d16aa156 \ No newline at end of file +98e138452d57586e3109545b94055448f2f6cc24 \ No newline at end of file diff --git a/internal/durabletask-protobuf/protos/orchestrator_service.proto b/internal/durabletask-protobuf/protos/orchestrator_service.proto index 0c34d986..e7e12524 100644 --- a/internal/durabletask-protobuf/protos/orchestrator_service.proto +++ b/internal/durabletask-protobuf/protos/orchestrator_service.proto @@ -25,6 +25,7 @@ message ActivityRequest { OrchestrationInstance orchestrationInstance = 4; int32 taskId = 5; TraceContext parentTraceContext = 6; + map tags = 7; } message ActivityResponse { @@ -320,6 +321,10 @@ message SendEntityMessageAction { } } +message RewindOrchestrationAction { + repeated HistoryEvent newHistory = 1; +} + message OrchestratorAction { int32 id = 1; oneof orchestratorActionType { @@ -330,6 +335,7 @@ message OrchestratorAction { CompleteOrchestrationAction completeOrchestration = 6; TerminateOrchestrationAction terminateOrchestration = 7; SendEntityMessageAction sendEntityMessage = 8; + RewindOrchestrationAction rewindOrchestration = 9; } } @@ -364,12 +370,14 @@ message OrchestratorResponse { // Whether or not a history is required to complete the original OrchestratorRequest and none was provided. bool requiresHistory = 7; + /* Chunking logic has since been deprecated and fields related to it are marked as such */ + // True if this is a partial (chunked) completion. The backend must keep the work item open until the final chunk (isPartial=false). - bool isPartial = 8; + bool isPartial = 8 [deprecated=true]; // Zero-based position of the current chunk within a chunked completion sequence. // This field is omitted for non-chunked completions. - google.protobuf.Int32Value chunkIndex = 9; + google.protobuf.Int32Value chunkIndex = 9 [deprecated=true];; } message CreateInstanceRequest { @@ -517,6 +525,7 @@ message PurgeInstanceFilter { google.protobuf.Timestamp createdTimeFrom = 1; google.protobuf.Timestamp createdTimeTo = 2; repeated OrchestrationStatus runtimeStatus = 3; + google.protobuf.Duration timeout = 4; } message PurgeInstancesResponse { diff --git a/samples-azure-functions/src/main/java/com/functions/entities/AggregatorEntity.java b/samples-azure-functions/src/main/java/com/functions/entities/AggregatorEntity.java new file mode 100644 index 00000000..119de2be --- /dev/null +++ b/samples-azure-functions/src/main/java/com/functions/entities/AggregatorEntity.java @@ -0,0 +1,53 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.functions.entities; + +import com.microsoft.durabletask.AbstractTaskEntity; +import com.microsoft.durabletask.TaskEntityContext; +import com.microsoft.durabletask.TaskEntityOperation; + +/** + * Aggregator entity that computes running averages and starts an alert orchestration + * when a reading exceeds a threshold. + *

+ * Demonstrates entity starting an orchestration via + * {@link TaskEntityContext#startNewOrchestration}. + *

+ * This mirrors the standalone {@code EntityCommunicationSample.AggregatorEntity}. + */ +public class AggregatorEntity extends AbstractTaskEntity { + private static final double ALERT_THRESHOLD = 30.0; + + /** + * Adds a reading to the running total. If the reading exceeds the threshold, + * starts a {@code TemperatureAlert} orchestration. + *

+ * The {@link TaskEntityContext} parameter is automatically injected by the dispatch engine. + */ + public void addReading(double reading, TaskEntityContext ctx) { + this.state.sum += reading; + this.state.count++; + + if (reading > ALERT_THRESHOLD) { + // Entity starting an orchestration when a threshold is breached + ctx.startNewOrchestration("TemperatureAlert", reading); + } + } + + /** + * Returns the current aggregator state. + */ + public AggregatorState get() { + return this.state; + } + + @Override + protected AggregatorState initializeState(TaskEntityOperation operation) { + return new AggregatorState(); + } + + @Override + protected Class getStateType() { + return AggregatorState.class; + } +} diff --git a/samples-azure-functions/src/main/java/com/functions/entities/BankAccountEntity.java b/samples-azure-functions/src/main/java/com/functions/entities/BankAccountEntity.java new file mode 100644 index 00000000..f93c1987 --- /dev/null +++ b/samples-azure-functions/src/main/java/com/functions/entities/BankAccountEntity.java @@ -0,0 +1,62 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.functions.entities; + +import com.microsoft.durabletask.AbstractTaskEntity; +import com.microsoft.durabletask.TaskEntityOperation; + +/** + * A bank account entity that stores a balance and supports deposit, withdraw, and get operations. + *

+ * Used by {@link BankAccountFunctions} to demonstrate entity locking (critical sections) + * for atomic multi-entity operations such as fund transfers. + *

+ * This mirrors the standalone {@code BankAccountSample.BankAccountEntity}. + */ +public class BankAccountEntity extends AbstractTaskEntity { + + /** + * Adds the given amount to the account balance and returns the new balance. + */ + public double deposit(double amount) { + if (amount <= 0) { + throw new IllegalArgumentException("Deposit amount must be positive."); + } + this.state += amount; + return this.state; + } + + /** + * Withdraws the given amount from the account balance and returns the new balance. + * + * @throws IllegalStateException if the account has insufficient funds + */ + public double withdraw(double amount) { + if (amount <= 0) { + throw new IllegalArgumentException("Withdrawal amount must be positive."); + } + if (amount > this.state) { + throw new IllegalStateException( + String.format("Insufficient funds. Balance: %.2f, Requested: %.2f", this.state, amount)); + } + this.state -= amount; + return this.state; + } + + /** + * Returns the current account balance. + */ + public double get() { + return this.state; + } + + @Override + protected Double initializeState(TaskEntityOperation operation) { + return 0.0; + } + + @Override + protected Class getStateType() { + return Double.class; + } +} diff --git a/samples-azure-functions/src/main/java/com/functions/entities/BankAccountFunctions.java b/samples-azure-functions/src/main/java/com/functions/entities/BankAccountFunctions.java new file mode 100644 index 00000000..fd6aac94 --- /dev/null +++ b/samples-azure-functions/src/main/java/com/functions/entities/BankAccountFunctions.java @@ -0,0 +1,214 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.functions.entities; + +import com.microsoft.azure.functions.*; +import com.microsoft.azure.functions.annotation.*; +import com.microsoft.durabletask.*; +import com.microsoft.durabletask.azurefunctions.DurableClientContext; +import com.microsoft.durabletask.azurefunctions.DurableClientInput; +import com.microsoft.durabletask.azurefunctions.DurableEntityTrigger; +import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger; +import com.microsoft.durabletask.interruption.ContinueAsNewInterruption; +import com.microsoft.durabletask.interruption.OrchestratorBlockedException; + +import java.util.Arrays; +import java.util.Optional; + +/** + * Azure Functions for the BankAccount entity sample. + *

+ * Demonstrates: + *

+ *

+ * This mirrors the standalone {@code BankAccountSample} adapted for Azure Functions. + *

+ * See {@code bankaccounts.http} for example HTTP requests. + * + *

APIs: + *

+ */ +public class BankAccountFunctions { + + // ─── Entity trigger function ─── + + @FunctionName("BankAccount") + public String bankAccountEntity( + @DurableEntityTrigger(name = "req") String req) { + return EntityRunner.loadAndRun(req, BankAccountEntity::new); + } + + // ─── Orchestration ─── + + /** + * Orchestration that performs an atomic fund transfer between two bank account entities. + *

+ * Uses {@code ctx.lockEntities()} to acquire exclusive locks on both accounts before + * performing the withdraw and deposit. This ensures no concurrent operations can modify + * either account during the transfer. + */ + @FunctionName("TransferFunds") + public String transferFunds( + @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { + TransferRequest request = ctx.getInput(TransferRequest.class); + EntityInstanceId source = new EntityInstanceId("BankAccount", request.sourceAccount); + EntityInstanceId dest = new EntityInstanceId("BankAccount", request.destAccount); + + // Lock both accounts to ensure atomic transfer (critical section) + try (AutoCloseable lock = ctx.lockEntities(Arrays.asList(source, dest)).await()) { + // Withdraw from source + double sourceBalance = ctx.callEntity( + source, "withdraw", request.amount, Double.class).await(); + + // Deposit to destination + double destBalance = ctx.callEntity( + dest, "deposit", request.amount, Double.class).await(); + + return String.format( + "Transferred %.2f from %s (balance: %.2f) to %s (balance: %.2f)", + request.amount, request.sourceAccount, sourceBalance, + request.destAccount, destBalance); + } catch (OrchestratorBlockedException | ContinueAsNewInterruption e) { + // These are framework control-flow signals and must never be swallowed + throw e; + } catch (Exception e) { + return "Transfer failed: " + e.getMessage(); + } + } + + // ─── HTTP API functions ─── + + /** + * POST /api/bankaccounts/{id}/deposit/{amount} + *

+ * Signals the bank account entity to deposit the given amount. + */ + @FunctionName("BankAccount_Deposit") + public HttpResponseMessage deposit( + @HttpTrigger(name = "req", methods = {HttpMethod.POST}, + route = "bankaccounts/{id}/deposit/{amount}", + authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + @BindingName("id") String id, + @BindingName("amount") double amount) { + EntityInstanceId entityId = new EntityInstanceId("BankAccount", id); + DurableTaskClient client = durableContext.getClient(); + client.getEntities().signalEntity(entityId, "deposit", amount); + return request.createResponseBuilder(HttpStatus.ACCEPTED).build(); + } + + /** + * POST /api/bankaccounts/{id}/withdraw/{amount} + *

+ * Signals the bank account entity to withdraw the given amount. + */ + @FunctionName("BankAccount_Withdraw") + public HttpResponseMessage withdraw( + @HttpTrigger(name = "req", methods = {HttpMethod.POST}, + route = "bankaccounts/{id}/withdraw/{amount}", + authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + @BindingName("id") String id, + @BindingName("amount") double amount) { + EntityInstanceId entityId = new EntityInstanceId("BankAccount", id); + DurableTaskClient client = durableContext.getClient(); + client.getEntities().signalEntity(entityId, "withdraw", amount); + return request.createResponseBuilder(HttpStatus.ACCEPTED).build(); + } + + /** + * GET /api/bankaccounts/{id} + *

+ * Gets the current balance of the bank account entity. + */ + @FunctionName("BankAccount_Get") + public HttpResponseMessage getBalance( + @HttpTrigger(name = "req", methods = {HttpMethod.GET}, + route = "bankaccounts/{id}", + authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + @BindingName("id") String id) { + EntityInstanceId entityId = new EntityInstanceId("BankAccount", id); + DurableTaskClient client = durableContext.getClient(); + + TypedEntityMetadata entity = client.getEntities().getEntityMetadata(entityId, Double.class); + if (entity == null) { + return request.createResponseBuilder(HttpStatus.NOT_FOUND).build(); + } + + return request.createResponseBuilder(HttpStatus.OK) + .header("Content-Type", "application/json") + .body(new JacksonDataConverter().serialize(entity)) + .build(); + } + + /** + * POST /api/bankaccounts/transfer?from={id}&to={id}&amount={amount} + *

+ * Starts a TransferFunds orchestration that atomically transfers funds between two accounts + * using entity locking. + */ + @FunctionName("BankAccount_Transfer") + public HttpResponseMessage transfer( + @HttpTrigger(name = "req", methods = {HttpMethod.POST}, + route = "bankaccounts/transfer", + authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + final ExecutionContext context) { + String from = request.getQueryParameters().get("from"); + String to = request.getQueryParameters().get("to"); + String amountStr = request.getQueryParameters().get("amount"); + + if (from == null || to == null || amountStr == null) { + return request.createResponseBuilder(HttpStatus.BAD_REQUEST) + .body("Query parameters 'from', 'to', and 'amount' are required.") + .build(); + } + + double amount; + try { + amount = Double.parseDouble(amountStr); + } catch (NumberFormatException e) { + return request.createResponseBuilder(HttpStatus.BAD_REQUEST) + .body("Amount must be a valid number.") + .build(); + } + + DurableTaskClient client = durableContext.getClient(); + TransferRequest payload = new TransferRequest(from, to, amount); + String instanceId = client.scheduleNewOrchestrationInstance("TransferFunds", payload); + context.getLogger().info("Started TransferFunds orchestration: " + instanceId); + return durableContext.createCheckStatusResponse(request, instanceId); + } + + // ─── Payload classes ─── + + /** + * Represents a request to transfer funds between two bank accounts. + */ + public static class TransferRequest { + public String sourceAccount; + public String destAccount; + public double amount; + + public TransferRequest() { + } + + public TransferRequest(String sourceAccount, String destAccount, double amount) { + this.sourceAccount = sourceAccount; + this.destAccount = destAccount; + this.amount = amount; + } + } +} diff --git a/samples-azure-functions/src/main/java/com/functions/entities/CounterFunctions.java b/samples-azure-functions/src/main/java/com/functions/entities/CounterFunctions.java index 42b79ae8..83e698c8 100644 --- a/samples-azure-functions/src/main/java/com/functions/entities/CounterFunctions.java +++ b/samples-azure-functions/src/main/java/com/functions/entities/CounterFunctions.java @@ -133,7 +133,7 @@ public HttpResponseMessage counterGet( return request.createResponseBuilder(HttpStatus.OK) .header("Content-Type", "application/json") - .body(entity) + .body(new JacksonDataConverter().serialize(entity)) .build(); } diff --git a/samples-azure-functions/src/main/java/com/functions/entities/LifetimeFunctions.java b/samples-azure-functions/src/main/java/com/functions/entities/LifetimeFunctions.java index 8923a846..64a37ba8 100644 --- a/samples-azure-functions/src/main/java/com/functions/entities/LifetimeFunctions.java +++ b/samples-azure-functions/src/main/java/com/functions/entities/LifetimeFunctions.java @@ -57,7 +57,7 @@ public HttpResponseMessage lifetimeGet( return request.createResponseBuilder(HttpStatus.OK) .header("Content-Type", "application/json") - .body(entity) + .body(new JacksonDataConverter().serialize(entity)) .build(); } diff --git a/samples-azure-functions/src/main/java/com/functions/entities/SensorEntity.java b/samples-azure-functions/src/main/java/com/functions/entities/SensorEntity.java new file mode 100644 index 00000000..ea71459f --- /dev/null +++ b/samples-azure-functions/src/main/java/com/functions/entities/SensorEntity.java @@ -0,0 +1,43 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.functions.entities; + +import com.microsoft.durabletask.AbstractTaskEntity; +import com.microsoft.durabletask.EntityInstanceId; +import com.microsoft.durabletask.TaskEntityContext; +import com.microsoft.durabletask.TaskEntityOperation; + +/** + * Sensor entity that records temperature readings and forwards them to an aggregator entity. + *

+ * Demonstrates entity-to-entity signaling via {@link TaskEntityContext#signalEntity}. + * Each recorded reading is forwarded to an {@link AggregatorEntity} with the same key. + *

+ * This mirrors the standalone {@code EntityCommunicationSample.SensorEntity}. + */ +public class SensorEntity extends AbstractTaskEntity { + + /** + * Records a temperature reading and forwards it to the aggregator entity. + *

+ * The {@link TaskEntityContext} parameter is automatically injected by the dispatch engine. + */ + public void record(double temperature, TaskEntityContext ctx) { + this.state.lastReading = temperature; + this.state.totalReadings++; + + // Entity-to-entity signaling: forward the reading to the aggregator + EntityInstanceId aggregatorId = new EntityInstanceId("Aggregator", ctx.getId().getKey()); + ctx.signalEntity(aggregatorId, "addReading", temperature); + } + + @Override + protected SensorState initializeState(TaskEntityOperation operation) { + return new SensorState(); + } + + @Override + protected Class getStateType() { + return SensorState.class; + } +} diff --git a/samples-azure-functions/src/main/java/com/functions/entities/SensorFunctions.java b/samples-azure-functions/src/main/java/com/functions/entities/SensorFunctions.java new file mode 100644 index 00000000..7510807f --- /dev/null +++ b/samples-azure-functions/src/main/java/com/functions/entities/SensorFunctions.java @@ -0,0 +1,157 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.functions.entities; + +import com.microsoft.azure.functions.*; +import com.microsoft.azure.functions.annotation.*; +import com.microsoft.durabletask.*; +import com.microsoft.durabletask.azurefunctions.DurableActivityTrigger; +import com.microsoft.durabletask.azurefunctions.DurableClientContext; +import com.microsoft.durabletask.azurefunctions.DurableClientInput; +import com.microsoft.durabletask.azurefunctions.DurableEntityTrigger; +import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger; + +import java.util.Optional; + +/** + * Azure Functions for the Sensor/Aggregator entity communication sample. + *

+ * Demonstrates two advanced entity communication patterns: + *

+ *

+ * This mirrors the standalone {@code EntityCommunicationSample} adapted for Azure Functions. + *

+ * See {@code sensors.http} for example HTTP requests. + * + *

APIs: + *

+ */ +public class SensorFunctions { + + // ─── Entity trigger functions ─── + + @FunctionName("Sensor") + public String sensorEntity( + @DurableEntityTrigger(name = "req") String req) { + return EntityRunner.loadAndRun(req, SensorEntity::new); + } + + @FunctionName("Aggregator") + public String aggregatorEntity( + @DurableEntityTrigger(name = "req") String req) { + return EntityRunner.loadAndRun(req, AggregatorEntity::new); + } + + // ─── Alert orchestration & activity ─── + + /** + * Orchestration triggered by the {@link AggregatorEntity} when a temperature reading + * exceeds the threshold. Calls an activity to handle the alert. + */ + @FunctionName("TemperatureAlert") + public String temperatureAlert( + @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { + double temperature = ctx.getInput(Double.class); + return ctx.callActivity("SendAlert", temperature, String.class).await(); + } + + /** + * Activity that sends a temperature alert. + */ + @FunctionName("SendAlert") + public String sendAlert( + @DurableActivityTrigger(name = "temperature") double temperature, + final ExecutionContext context) { + String message = String.format( + "ALERT: Temperature %.1f°C exceeds threshold!", temperature); + context.getLogger().warning(message); + return message; + } + + // ─── HTTP API functions ─── + + /** + * POST /api/sensors/{id}/record/{temperature} + *

+ * Signals the sensor entity to record a temperature reading. + * The sensor entity will forward the reading to the aggregator entity (entity-to-entity signaling). + */ + @FunctionName("Sensor_Record") + public HttpResponseMessage recordReading( + @HttpTrigger(name = "req", methods = {HttpMethod.POST}, + route = "sensors/{id}/record/{temperature}", + authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + @BindingName("id") String id, + @BindingName("temperature") double temperature) { + EntityInstanceId entityId = new EntityInstanceId("Sensor", id); + DurableTaskClient client = durableContext.getClient(); + client.getEntities().signalEntity(entityId, "record", temperature); + return request.createResponseBuilder(HttpStatus.ACCEPTED).build(); + } + + /** + * GET /api/sensors/{id} + *

+ * Gets the current state of the sensor entity. + */ + @FunctionName("Sensor_Get") + public HttpResponseMessage getSensor( + @HttpTrigger(name = "req", methods = {HttpMethod.GET}, + route = "sensors/{id}", + authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + @BindingName("id") String id) { + EntityInstanceId entityId = new EntityInstanceId("Sensor", id); + DurableTaskClient client = durableContext.getClient(); + + TypedEntityMetadata entity = client.getEntities() + .getEntityMetadata(entityId, SensorState.class); + if (entity == null) { + return request.createResponseBuilder(HttpStatus.NOT_FOUND).build(); + } + + return request.createResponseBuilder(HttpStatus.OK) + .header("Content-Type", "application/json") + .body(entity.getState()) + .build(); + } + + /** + * GET /api/sensors/{id}/aggregator + *

+ * Gets the current state of the aggregator entity associated with the sensor. + */ + @FunctionName("Aggregator_Get") + public HttpResponseMessage getAggregator( + @HttpTrigger(name = "req", methods = {HttpMethod.GET}, + route = "sensors/{id}/aggregator", + authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + @BindingName("id") String id) { + EntityInstanceId entityId = new EntityInstanceId("Aggregator", id); + DurableTaskClient client = durableContext.getClient(); + + TypedEntityMetadata entity = client.getEntities() + .getEntityMetadata(entityId, AggregatorState.class); + if (entity == null) { + return request.createResponseBuilder(HttpStatus.NOT_FOUND).build(); + } + + return request.createResponseBuilder(HttpStatus.OK) + .header("Content-Type", "application/json") + .body(entity.getState()) + .build(); + } +} diff --git a/samples-azure-functions/src/main/java/com/functions/entities/SensorState.java b/samples-azure-functions/src/main/java/com/functions/entities/SensorState.java new file mode 100644 index 00000000..b718a53a --- /dev/null +++ b/samples-azure-functions/src/main/java/com/functions/entities/SensorState.java @@ -0,0 +1,27 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.functions.entities; + +import com.microsoft.durabletask.AbstractTaskEntity; +import com.microsoft.durabletask.TaskEntityContext; +import com.microsoft.durabletask.TaskEntityOperation; + +/** + * State for the {@link SensorEntity} — tracks the last recorded temperature and total readings. + */ +class SensorState { + public double lastReading; + public int totalReadings; +} + +/** + * State for the {@link AggregatorEntity} — tracks sum and count for computing averages. + */ +class AggregatorState { + public double sum; + public int count; + + public double getAverage() { + return count > 0 ? sum / count : 0; + } +} diff --git a/samples-azure-functions/src/main/java/com/functions/entities/bankaccounts.http b/samples-azure-functions/src/main/java/com/functions/entities/bankaccounts.http new file mode 100644 index 00000000..8411e5d9 --- /dev/null +++ b/samples-azure-functions/src/main/java/com/functions/entities/bankaccounts.http @@ -0,0 +1,29 @@ +@host = http://localhost:7071/api + +# Initialize account A with $1000 +POST {{host}}/bankaccounts/account-A/deposit/1000 + +### + +# Initialize account B with $500 +POST {{host}}/bankaccounts/account-B/deposit/500 + +### + +# Get balance of account A +GET {{host}}/bankaccounts/account-A + +### + +# Get balance of account B +GET {{host}}/bankaccounts/account-B + +### + +# Transfer $250 from account A to account B (uses entity locking) +POST {{host}}/bankaccounts/transfer?from=account-A&to=account-B&amount=250 + +### + +# Withdraw $100 from account A +POST {{host}}/bankaccounts/account-A/withdraw/100 diff --git a/samples-azure-functions/src/main/java/com/functions/entities/sensors.http b/samples-azure-functions/src/main/java/com/functions/entities/sensors.http new file mode 100644 index 00000000..ad1e1e45 --- /dev/null +++ b/samples-azure-functions/src/main/java/com/functions/entities/sensors.http @@ -0,0 +1,32 @@ +@host = http://localhost:7071/api + +# Record normal temperature readings for sensor-1 +# The sensor entity forwards each reading to the aggregator entity (entity-to-entity signaling) +POST {{host}}/sensors/sensor-1/record/22.5 + +### + +POST {{host}}/sensors/sensor-1/record/24.0 + +### + +POST {{host}}/sensors/sensor-1/record/26.5 + +### + +# Record a high temperature — the aggregator will start a TemperatureAlert orchestration +POST {{host}}/sensors/sensor-1/record/35.0 + +### + +POST {{host}}/sensors/sensor-1/record/23.0 + +### + +# Get the sensor state (last reading, total readings) +GET {{host}}/sensors/sensor-1 + +### + +# Get the aggregator state (sum, count, average) +GET {{host}}/sensors/sensor-1/aggregator diff --git a/samples/build.gradle b/samples/build.gradle index e6fd9d68..5f58d861 100644 --- a/samples/build.gradle +++ b/samples/build.gradle @@ -27,6 +27,62 @@ task runWorkItemFilterSample(type: JavaExec) { mainClass = 'io.durabletask.samples.WorkItemFilterSample' } +// --- Entity samples (require a Durable Task sidecar / DTS emulator on localhost:4001) --- +// When DTS_ENDPOINT is set (or the default applies), these tasks connect to a Durable Task +// Scheduler such as the DTS emulator. Override by setting ENDPOINT/TASKHUB env vars. +def dtsEnv = [ + ENDPOINT: System.getenv('ENDPOINT') ?: 'http://localhost:4001', + TASKHUB : System.getenv('TASKHUB') ?: 'default', +] + +task runCounterEntitySample(type: JavaExec) { + classpath = sourceSets.main.runtimeClasspath + mainClass = 'io.durabletask.samples.CounterEntitySample' + environment dtsEnv +} + +task runBankAccountSample(type: JavaExec) { + classpath = sourceSets.main.runtimeClasspath + mainClass = 'io.durabletask.samples.BankAccountSample' + environment dtsEnv +} + +task runEntityCommunicationSample(type: JavaExec) { + classpath = sourceSets.main.runtimeClasspath + mainClass = 'io.durabletask.samples.EntityCommunicationSample' + environment dtsEnv +} + +task runEntityQuerySample(type: JavaExec) { + classpath = sourceSets.main.runtimeClasspath + mainClass = 'io.durabletask.samples.EntityQuerySample' + environment dtsEnv +} + +task runEntityReentrantSample(type: JavaExec) { + classpath = sourceSets.main.runtimeClasspath + mainClass = 'io.durabletask.samples.EntityReentrantSample' + environment dtsEnv +} + +task runEntityTimeoutSample(type: JavaExec) { + classpath = sourceSets.main.runtimeClasspath + mainClass = 'io.durabletask.samples.EntityTimeoutSample' + environment dtsEnv +} + +task runLowLevelEntitySample(type: JavaExec) { + classpath = sourceSets.main.runtimeClasspath + mainClass = 'io.durabletask.samples.LowLevelEntitySample' + environment dtsEnv +} + +task runTypedEntityProxySample(type: JavaExec) { + classpath = sourceSets.main.runtimeClasspath + mainClass = 'io.durabletask.samples.TypedEntityProxySample' + environment dtsEnv +} + task printClasspath { doLast { println sourceSets.main.runtimeClasspath.asPath diff --git a/samples/src/main/java/io/durabletask/samples/BankAccountSample.java b/samples/src/main/java/io/durabletask/samples/BankAccountSample.java index f32f5414..86eae801 100644 --- a/samples/src/main/java/io/durabletask/samples/BankAccountSample.java +++ b/samples/src/main/java/io/durabletask/samples/BankAccountSample.java @@ -3,6 +3,8 @@ package io.durabletask.samples; import com.microsoft.durabletask.*; +import com.microsoft.durabletask.interruption.ContinueAsNewInterruption; +import com.microsoft.durabletask.interruption.OrchestratorBlockedException; import java.io.IOException; import java.time.Duration; @@ -32,7 +34,7 @@ final class BankAccountSample { public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { // Build the worker with bank account entity and transfer orchestration - DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder() + DurableTaskGrpcWorker worker = SampleUtils.newWorkerBuilder() .addEntity("BankAccount", BankAccountEntity::new) .addOrchestration(new TaskOrchestrationFactory() { @Override @@ -78,6 +80,9 @@ public TaskOrchestration create() { request.amount, request.sourceAccount, sourceBalance, request.destAccount, destBalance); ctx.complete(result); + } catch (OrchestratorBlockedException | ContinueAsNewInterruption e) { + // These are framework control-flow signals and must never be swallowed + throw e; } catch (Exception e) { ctx.complete("Transfer failed: " + e.getMessage()); } @@ -89,7 +94,7 @@ public TaskOrchestration create() { worker.start(); System.out.println("Worker started. BankAccount entity and TransferFunds orchestration registered."); - DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + DurableTaskClient client = SampleUtils.newClientBuilder().build(); // Step 1: Initialize accounts String setupId = client.scheduleNewOrchestrationInstance("SetupAccounts"); diff --git a/samples/src/main/java/io/durabletask/samples/CounterEntitySample.java b/samples/src/main/java/io/durabletask/samples/CounterEntitySample.java index c6737c14..d0c892c3 100644 --- a/samples/src/main/java/io/durabletask/samples/CounterEntitySample.java +++ b/samples/src/main/java/io/durabletask/samples/CounterEntitySample.java @@ -28,7 +28,7 @@ final class CounterEntitySample { public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { // Build the worker with the counter entity registered - DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder() + DurableTaskGrpcWorker worker = SampleUtils.newWorkerBuilder() .addEntity("Counter", CounterEntity::new) .addOrchestration(new TaskOrchestrationFactory() { @Override @@ -57,7 +57,7 @@ public TaskOrchestration create() { System.out.println("Worker started. Counter entity registered."); // Use the client to schedule an orchestration that interacts with the entity - DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + DurableTaskClient client = SampleUtils.newClientBuilder().build(); String instanceId = client.scheduleNewOrchestrationInstance("CounterOrchestration"); System.out.printf("Started orchestration: %s%n", instanceId); diff --git a/samples/src/main/java/io/durabletask/samples/EntityCommunicationSample.java b/samples/src/main/java/io/durabletask/samples/EntityCommunicationSample.java index 4b9d03b9..80093b3d 100644 --- a/samples/src/main/java/io/durabletask/samples/EntityCommunicationSample.java +++ b/samples/src/main/java/io/durabletask/samples/EntityCommunicationSample.java @@ -33,7 +33,7 @@ final class EntityCommunicationSample { public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { - DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder() + DurableTaskGrpcWorker worker = SampleUtils.newWorkerBuilder() .addEntity("Sensor", SensorEntity::new) .addEntity("Aggregator", AggregatorEntity::new) .addOrchestration(new TaskOrchestrationFactory() { @@ -74,7 +74,7 @@ public TaskOrchestration create() { worker.start(); System.out.println("Worker started. Sensor and Aggregator entities registered."); - DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + DurableTaskClient client = SampleUtils.newClientBuilder().build(); // Send readings through an orchestration String instanceId = client.scheduleNewOrchestrationInstance("SendReadings"); diff --git a/samples/src/main/java/io/durabletask/samples/EntityQuerySample.java b/samples/src/main/java/io/durabletask/samples/EntityQuerySample.java index 7456405c..de67b065 100644 --- a/samples/src/main/java/io/durabletask/samples/EntityQuerySample.java +++ b/samples/src/main/java/io/durabletask/samples/EntityQuerySample.java @@ -31,7 +31,7 @@ final class EntityQuerySample { public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { // Build the worker with a simple counter entity - DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder() + DurableTaskGrpcWorker worker = SampleUtils.newWorkerBuilder() .addEntity("Counter", CounterEntity::new) .addOrchestration(new TaskOrchestrationFactory() { @Override @@ -54,7 +54,7 @@ public TaskOrchestration create() { worker.start(); System.out.println("Worker started. Counter entity registered."); - DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + DurableTaskClient client = SampleUtils.newClientBuilder().build(); // Step 1: Create several counter entities via an orchestration String instanceId = client.scheduleNewOrchestrationInstance("CreateCounters"); diff --git a/samples/src/main/java/io/durabletask/samples/EntityReentrantSample.java b/samples/src/main/java/io/durabletask/samples/EntityReentrantSample.java index af2b9f90..9b08e5a6 100644 --- a/samples/src/main/java/io/durabletask/samples/EntityReentrantSample.java +++ b/samples/src/main/java/io/durabletask/samples/EntityReentrantSample.java @@ -106,7 +106,7 @@ protected Class getStateType() { } public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { - DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder() + DurableTaskGrpcWorker worker = SampleUtils.newWorkerBuilder() .addEntity("RewardsAccount", RewardsAccountEntity::new) .addOrchestration(new TaskOrchestrationFactory() { @Override @@ -136,7 +136,7 @@ public TaskOrchestration create() { worker.start(); System.out.println("Worker started. RewardsAccount entity registered."); - DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + DurableTaskClient client = SampleUtils.newClientBuilder().build(); String instanceId = client.scheduleNewOrchestrationInstance("RewardsDemo"); System.out.printf("Started orchestration: %s%n", instanceId); diff --git a/samples/src/main/java/io/durabletask/samples/EntityTimeoutSample.java b/samples/src/main/java/io/durabletask/samples/EntityTimeoutSample.java index 040fd9e9..d7cc1187 100644 --- a/samples/src/main/java/io/durabletask/samples/EntityTimeoutSample.java +++ b/samples/src/main/java/io/durabletask/samples/EntityTimeoutSample.java @@ -30,7 +30,7 @@ final class EntityTimeoutSample { public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { - DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder() + DurableTaskGrpcWorker worker = SampleUtils.newWorkerBuilder() .addEntity("SlowCounter", SlowCounterEntity::new) .addOrchestration(new TaskOrchestrationFactory() { @Override @@ -85,7 +85,7 @@ public TaskOrchestration create() { worker.start(); System.out.println("Worker started. SlowCounter entity registered."); - DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + DurableTaskClient client = SampleUtils.newClientBuilder().build(); // --- Demo 1: Successful call with generous timeout --- System.out.println("\n--- Demo 1: callEntity with generous timeout ---"); diff --git a/samples/src/main/java/io/durabletask/samples/LowLevelEntitySample.java b/samples/src/main/java/io/durabletask/samples/LowLevelEntitySample.java index 32b2f1e5..0c5ac182 100644 --- a/samples/src/main/java/io/durabletask/samples/LowLevelEntitySample.java +++ b/samples/src/main/java/io/durabletask/samples/LowLevelEntitySample.java @@ -29,7 +29,7 @@ final class LowLevelEntitySample { public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { - DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder() + DurableTaskGrpcWorker worker = SampleUtils.newWorkerBuilder() // Demo 1: TaskEntity — manual dispatch .addEntity("KeyValue", KeyValueEntity::new) // Demo 2: TaskEntity with POJO state and state dispatch @@ -99,7 +99,7 @@ public TaskOrchestration create() { worker.start(); System.out.println("Worker started. KeyValue and ShoppingCart entities registered."); - DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + DurableTaskClient client = SampleUtils.newClientBuilder().build(); // --- Demo 1: Low-level TaskEntity --- System.out.println("\n--- Demo 1: TaskEntity with manual dispatch ---"); diff --git a/samples/src/main/java/io/durabletask/samples/SampleUtils.java b/samples/src/main/java/io/durabletask/samples/SampleUtils.java new file mode 100644 index 00000000..3b278fdd --- /dev/null +++ b/samples/src/main/java/io/durabletask/samples/SampleUtils.java @@ -0,0 +1,56 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package io.durabletask.samples; + +import com.microsoft.durabletask.DurableTaskGrpcClientBuilder; +import com.microsoft.durabletask.DurableTaskGrpcWorkerBuilder; +import com.microsoft.durabletask.azuremanaged.DurableTaskSchedulerClientExtensions; +import com.microsoft.durabletask.azuremanaged.DurableTaskSchedulerWorkerExtensions; + +/** + * Helpers for the standalone entity samples. + *

+ * By default, these samples connect to a plain Durable Task sidecar on {@code localhost:4001}. + * When one of the following environment variables is set, they instead connect to a + * Durable Task Scheduler (e.g. the DTS emulator), which requires a task hub header: + *

+ */ +final class SampleUtils { + private SampleUtils() {} + + static DurableTaskGrpcWorkerBuilder newWorkerBuilder() { + String cs = connectionString(); + return cs != null + ? DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(cs) + : new DurableTaskGrpcWorkerBuilder(); + } + + static DurableTaskGrpcClientBuilder newClientBuilder() { + String cs = connectionString(); + return cs != null + ? DurableTaskSchedulerClientExtensions.createClientBuilder(cs) + : new DurableTaskGrpcClientBuilder(); + } + + private static String connectionString() { + String cs = System.getenv("DURABLE_TASK_CONNECTION_STRING"); + if (cs != null && !cs.isEmpty()) { + return cs; + } + String endpoint = System.getenv("ENDPOINT"); + if (endpoint == null || endpoint.isEmpty()) { + return null; + } + String taskHub = System.getenv("TASKHUB"); + if (taskHub == null || taskHub.isEmpty()) { + taskHub = "default"; + } + String authType = endpoint.startsWith("http://localhost") || endpoint.startsWith("http://127.") + ? "None" : "DefaultAzure"; + return String.format("Endpoint=%s;TaskHub=%s;Authentication=%s", endpoint, taskHub, authType); + } +} diff --git a/samples/src/main/java/io/durabletask/samples/TypedEntityProxySample.java b/samples/src/main/java/io/durabletask/samples/TypedEntityProxySample.java index 7716235d..47a8b9c1 100644 --- a/samples/src/main/java/io/durabletask/samples/TypedEntityProxySample.java +++ b/samples/src/main/java/io/durabletask/samples/TypedEntityProxySample.java @@ -87,7 +87,7 @@ protected Class> getStateType() { } public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { - DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder() + DurableTaskGrpcWorker worker = SampleUtils.newWorkerBuilder() .addEntity("ShoppingCart", ShoppingCartEntity::new) .addOrchestration(new TaskOrchestrationFactory() { @Override @@ -124,7 +124,7 @@ public TaskOrchestration create() { worker.start(); System.out.println("Worker started. ShoppingCart entity registered."); - DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + DurableTaskClient client = SampleUtils.newClientBuilder().build(); String instanceId = client.scheduleNewOrchestrationInstance("ShoppingWorkflow"); System.out.printf("Started orchestration: %s%n", instanceId);