diff --git a/core/src/main/java/io/temporal/samples/hello/HelloDynamic.java b/core/src/main/java/io/temporal/samples/hello/HelloDynamic.java index c88a79b5e..53d181c62 100644 --- a/core/src/main/java/io/temporal/samples/hello/HelloDynamic.java +++ b/core/src/main/java/io/temporal/samples/hello/HelloDynamic.java @@ -19,21 +19,27 @@ package io.temporal.samples.hello; +import com.google.protobuf.InvalidProtocolBufferException; import io.temporal.activity.Activity; import io.temporal.activity.ActivityOptions; import io.temporal.activity.DynamicActivity; +import io.temporal.api.common.v1.Payload; +import io.temporal.api.common.v1.Payloads; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; import io.temporal.client.WorkflowStub; +import io.temporal.client.WorkflowUpdateException; import io.temporal.common.converter.EncodedValues; +import io.temporal.common.converter.GlobalDataConverter; import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.worker.Worker; import io.temporal.worker.WorkerFactory; -import io.temporal.workflow.ActivityStub; -import io.temporal.workflow.DynamicSignalHandler; -import io.temporal.workflow.DynamicWorkflow; -import io.temporal.workflow.Workflow; +import io.temporal.workflow.*; +import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; public class HelloDynamic { // Define the task queue name @@ -44,7 +50,7 @@ public class HelloDynamic { // Dynamic Workflow Implementation public static class DynamicGreetingWorkflowImpl implements DynamicWorkflow { - private String name; + private List names = new ArrayList<>(); @Override public Object execute(EncodedValues args) { @@ -54,18 +60,57 @@ public Object execute(EncodedValues args) { // Register dynamic signal handler Workflow.registerListener( (DynamicSignalHandler) - (signalName, encodedArgs) -> name = encodedArgs.get(0, String.class)); - + (signalName, encodedArgs) -> names.add(encodedArgs.get(0, String.class))); + + // Register dynamic update handler + DynamicUpdateHandler updateHandler = + new DynamicUpdateHandler() { + @Override + public EncodedValues handleExecute(String updateName, EncodedValues args) { + names.add(args.get(0, String.class)); + try { + EncodedValues encodedValues = + new EncodedValues( + Optional.of( + Payloads.newBuilder() + .addPayloads( + Payload.parseFrom( + ("Update Result: " + args.get(0, String.class)) + .getBytes(StandardCharsets.UTF_8))) + .build()), + GlobalDataConverter.get()); + return encodedValues; + } catch (InvalidProtocolBufferException e) { + return new EncodedValues("Update Exception: " + e.getMessage()); + } + } + + @Override + public void handleValidate(String updateName, EncodedValues args) { + String name = args.get(0, String.class); + if (name == null || name.equals("Invalid Name")) { + throw new IllegalStateException("Invalid name provided."); + } + } + }; + Workflow.registerListener(updateHandler); + + // Wait until we received both the signal and the update request + Workflow.await(() -> names.size() == 2); // Define activity options and get ActivityStub ActivityStub activity = Workflow.newUntypedActivityStub( ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build()); - // Execute the dynamic Activity. Note that the provided Activity name is not + // Process the signal and update names + // Execute the dynamic Activity for signal and update. Note that the provided Activity name is + // not // explicitly registered with the Worker - String greetingResult = activity.execute("DynamicACT", String.class, greeting, name, type); - + String result = ""; + for (String name : names) { + result += activity.execute("DynamicACT", String.class, greeting, name, type) + "\n"; + } // Return results - return greetingResult; + return result; } } @@ -136,8 +181,19 @@ public static void main(String[] arg) { WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).setWorkflowId(WORKFLOW_ID).build(); WorkflowStub workflow = client.newUntypedWorkflowStub("DynamicWF", workflowOptions); - // Start workflow execution and signal right after Pass in the workflow args and signal args - workflow.signalWithStart("greetingSignal", new Object[] {"John"}, new Object[] {"Hello"}); + // Start execution + workflow.start(new Object[] {"Hello"}); + // Send signal to execution with first name + workflow.signal("greetingSignal", new Object[] {"John"}); + // Send invalid name via update + try { + workflow.update("greetingUpdate", String.class, new Object[] {"Invalid Name"}); + } catch (WorkflowUpdateException e) { + // Just print as rejecting invalid name update is expected here + System.out.println("Update Rejected: " + e.getCause().getMessage()); + } + // Send valid name via update + workflow.update("greetingUpdate", Object.class, new Object[] {"Mary"}); // Wait for workflow to finish getting the results String result = workflow.getResult(String.class); diff --git a/core/src/test/java/io/temporal/samples/hello/HelloDynamicTest.java b/core/src/test/java/io/temporal/samples/hello/HelloDynamicTest.java index 96e3513b9..1381f2f31 100644 --- a/core/src/test/java/io/temporal/samples/hello/HelloDynamicTest.java +++ b/core/src/test/java/io/temporal/samples/hello/HelloDynamicTest.java @@ -23,7 +23,9 @@ import io.temporal.client.WorkflowOptions; import io.temporal.client.WorkflowStub; +import io.temporal.client.WorkflowUpdateException; import io.temporal.testing.TestWorkflowRule; +import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -49,13 +51,23 @@ public void testActivityImpl() { WorkflowStub workflow = testWorkflowRule.getWorkflowClient().newUntypedWorkflowStub("DynamicWF", workflowOptions); - // Start workflow execution and signal right after Pass in the workflow args and signal args - workflow.signalWithStart("greetingSignal", new Object[] {"John"}, new Object[] {"Hello"}); + // Start execution + workflow.start(new Object[] {"Hello"}); + // Send signal to execution with first name + workflow.signal("greetingSignal", new Object[] {"John"}); + // Send invalid name via update + WorkflowUpdateException workflowUpdateException = + Assert.assertThrows( + WorkflowUpdateException.class, + () -> workflow.update("greetingUpdate", String.class, new Object[] {"Invalid Name"})); + // Send valid name via update + workflow.update("greetingUpdate", Object.class, new Object[] {"Mary"}); // Wait for workflow to finish getting the results String result = workflow.getResult(String.class); assertNotNull(result); - assertEquals("DynamicACT: Hello John from: DynamicWF", result); + assertEquals( + "DynamicACT: Hello John from: DynamicWF\nDynamicACT: Hello Mary from: DynamicWF\n", result); } }