Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 69 additions & 13 deletions core/src/main/java/io/temporal/samples/hello/HelloDynamic.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,27 @@

package io.temporal.samples.hello;

import com.google.protobuf.InvalidProtocolBufferException;
import io.temporal.activity.Activity;
import io.temporal.activity.ActivityOptions;
import io.temporal.activity.DynamicActivity;
import io.temporal.api.common.v1.Payload;
import io.temporal.api.common.v1.Payloads;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.client.WorkflowUpdateException;
import io.temporal.common.converter.EncodedValues;
import io.temporal.common.converter.GlobalDataConverter;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.workflow.ActivityStub;
import io.temporal.workflow.DynamicSignalHandler;
import io.temporal.workflow.DynamicWorkflow;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.*;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

public class HelloDynamic {
// Define the task queue name
Expand All @@ -44,7 +50,7 @@ public class HelloDynamic {

// Dynamic Workflow Implementation
public static class DynamicGreetingWorkflowImpl implements DynamicWorkflow {
private String name;
private List<String> names = new ArrayList<>();

@Override
public Object execute(EncodedValues args) {
Expand All @@ -54,18 +60,57 @@ public Object execute(EncodedValues args) {
// Register dynamic signal handler
Workflow.registerListener(
(DynamicSignalHandler)
(signalName, encodedArgs) -> name = encodedArgs.get(0, String.class));

(signalName, encodedArgs) -> names.add(encodedArgs.get(0, String.class)));

// Register dynamic update handler
DynamicUpdateHandler updateHandler =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the DynamicUpdateHandler API here is flawed, we should change the API before adding a sample

new DynamicUpdateHandler() {
@Override
public EncodedValues handleExecute(String updateName, EncodedValues args) {
names.add(args.get(0, String.class));
try {
EncodedValues encodedValues =
new EncodedValues(
Optional.of(
Payloads.newBuilder()
.addPayloads(
Payload.parseFrom(
("Update Result: " + args.get(0, String.class))
.getBytes(StandardCharsets.UTF_8)))
.build()),
GlobalDataConverter.get());
return encodedValues;
} catch (InvalidProtocolBufferException e) {
return new EncodedValues("Update Exception: " + e.getMessage());
}
}

@Override
public void handleValidate(String updateName, EncodedValues args) {
String name = args.get(0, String.class);
if (name == null || name.equals("Invalid Name")) {
throw new IllegalStateException("Invalid name provided.");
}
}
};
Workflow.registerListener(updateHandler);

// Wait until we received both the signal and the update request
Workflow.await(() -> names.size() == 2);
// Define activity options and get ActivityStub
ActivityStub activity =
Workflow.newUntypedActivityStub(
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build());
// Execute the dynamic Activity. Note that the provided Activity name is not
// Process the signal and update names
// Execute the dynamic Activity for signal and update. Note that the provided Activity name is
// not
// explicitly registered with the Worker
String greetingResult = activity.execute("DynamicACT", String.class, greeting, name, type);

String result = "";
for (String name : names) {
result += activity.execute("DynamicACT", String.class, greeting, name, type) + "\n";
}
// Return results
return greetingResult;
return result;
}
}

Expand Down Expand Up @@ -136,8 +181,19 @@ public static void main(String[] arg) {
WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).setWorkflowId(WORKFLOW_ID).build();
WorkflowStub workflow = client.newUntypedWorkflowStub("DynamicWF", workflowOptions);

// Start workflow execution and signal right after Pass in the workflow args and signal args
workflow.signalWithStart("greetingSignal", new Object[] {"John"}, new Object[] {"Hello"});
// Start execution
workflow.start(new Object[] {"Hello"});
// Send signal to execution with first name
workflow.signal("greetingSignal", new Object[] {"John"});
// Send invalid name via update
try {
workflow.update("greetingUpdate", String.class, new Object[] {"Invalid Name"});
} catch (WorkflowUpdateException e) {
// Just print as rejecting invalid name update is expected here
System.out.println("Update Rejected: " + e.getCause().getMessage());
}
// Send valid name via update
workflow.update("greetingUpdate", Object.class, new Object[] {"Mary"});

// Wait for workflow to finish getting the results
String result = workflow.getResult(String.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@

import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.client.WorkflowUpdateException;
import io.temporal.testing.TestWorkflowRule;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

Expand All @@ -49,13 +51,23 @@ public void testActivityImpl() {
WorkflowStub workflow =
testWorkflowRule.getWorkflowClient().newUntypedWorkflowStub("DynamicWF", workflowOptions);

// Start workflow execution and signal right after Pass in the workflow args and signal args
workflow.signalWithStart("greetingSignal", new Object[] {"John"}, new Object[] {"Hello"});
// Start execution
workflow.start(new Object[] {"Hello"});
// Send signal to execution with first name
workflow.signal("greetingSignal", new Object[] {"John"});
// Send invalid name via update
WorkflowUpdateException workflowUpdateException =
Assert.assertThrows(
WorkflowUpdateException.class,
() -> workflow.update("greetingUpdate", String.class, new Object[] {"Invalid Name"}));
// Send valid name via update
workflow.update("greetingUpdate", Object.class, new Object[] {"Mary"});

// Wait for workflow to finish getting the results
String result = workflow.getResult(String.class);

assertNotNull(result);
assertEquals("DynamicACT: Hello John from: DynamicWF", result);
assertEquals(
"DynamicACT: Hello John from: DynamicWF\nDynamicACT: Hello Mary from: DynamicWF\n", result);
}
}