From c4791f63c9ff04421855a27ed8723cff4832407e Mon Sep 17 00:00:00 2001 From: Tuan Nguyen Date: Wed, 20 Jan 2021 16:13:42 +0100 Subject: [PATCH 1/5] test complex input --- build.gradle | 2 + docker/buildkite/Dockerfile | 1 + docker/buildkite/docker-compose.yaml | 9 ++-- .../samples/complex/ComplexWorkflow.java | 12 +++++ .../samples/complex/ComplexWorkflowImpl.java | 8 +++ .../io/temporal/samples/complex/Input.java | 17 ++++++ .../io/temporal/samples/complex/Starter.java | 53 +++++++++++++++++++ .../io/temporal/samples/complex/Worker.java | 29 ++++++++++ 8 files changed, 125 insertions(+), 6 deletions(-) create mode 100644 src/main/java/io/temporal/samples/complex/ComplexWorkflow.java create mode 100644 src/main/java/io/temporal/samples/complex/ComplexWorkflowImpl.java create mode 100644 src/main/java/io/temporal/samples/complex/Input.java create mode 100644 src/main/java/io/temporal/samples/complex/Starter.java create mode 100644 src/main/java/io/temporal/samples/complex/Worker.java diff --git a/build.gradle b/build.gradle index 7c5924efc..4f596dde2 100644 --- a/build.gradle +++ b/build.gradle @@ -34,6 +34,8 @@ dependencies { implementation group: 'commons-configuration', name: 'commons-configuration', version: '1.10' implementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3' + implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.12.1' + testImplementation group: 'junit', name: 'junit', version: '4.13.1' testImplementation group: 'org.mockito', name: 'mockito-all', version: '1.10.19' testImplementation group: 'org.powermock', name: 'powermock-api-mockito', version: '1.7.4' diff --git a/docker/buildkite/Dockerfile b/docker/buildkite/Dockerfile index 2c62dbb9d..44d9b0af5 100644 --- a/docker/buildkite/Dockerfile +++ b/docker/buildkite/Dockerfile @@ -13,4 +13,5 @@ RUN apk update && apk add --virtual wget ca-certificates wget && apk add protobu RUN apk add --virtual git RUN mkdir /temporal-java-samples +COPY . /temporal-java-samples WORKDIR /temporal-java-samples diff --git a/docker/buildkite/docker-compose.yaml b/docker/buildkite/docker-compose.yaml index f19fbb834..19cf5cc2d 100644 --- a/docker/buildkite/docker-compose.yaml +++ b/docker/buildkite/docker-compose.yaml @@ -1,12 +1,9 @@ version: '3.5' services: - unit-test: + worker: build: context: ../../ dockerfile: ./docker/buildkite/Dockerfile - command: "./gradlew --no-daemon test" - environment: - - "USER=unittest" - volumes: - - "../../:/temporal-java-samples" + command: "./gradlew -q execute -PmainClass=io.temporal.samples.complex.Worker" + diff --git a/src/main/java/io/temporal/samples/complex/ComplexWorkflow.java b/src/main/java/io/temporal/samples/complex/ComplexWorkflow.java new file mode 100644 index 000000000..9c965d408 --- /dev/null +++ b/src/main/java/io/temporal/samples/complex/ComplexWorkflow.java @@ -0,0 +1,12 @@ +package io.temporal.samples.complex; + +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +@WorkflowInterface +public interface ComplexWorkflow { + String TASK_QUEUE = "complex"; + + @WorkflowMethod + void handleLambda(Input input); +} diff --git a/src/main/java/io/temporal/samples/complex/ComplexWorkflowImpl.java b/src/main/java/io/temporal/samples/complex/ComplexWorkflowImpl.java new file mode 100644 index 000000000..80463f3e6 --- /dev/null +++ b/src/main/java/io/temporal/samples/complex/ComplexWorkflowImpl.java @@ -0,0 +1,8 @@ +package io.temporal.samples.complex; + +public class ComplexWorkflowImpl implements ComplexWorkflow { + @Override + public void handleLambda(Input input) { + System.out.println(input.apply()); + } +} diff --git a/src/main/java/io/temporal/samples/complex/Input.java b/src/main/java/io/temporal/samples/complex/Input.java new file mode 100644 index 000000000..7d0c7e8c9 --- /dev/null +++ b/src/main/java/io/temporal/samples/complex/Input.java @@ -0,0 +1,17 @@ +package io.temporal.samples.complex; + +public class Input { + private int i; + + public Input() { + this.i = 0; + } + + public Input(int i) { + this.i = i; + } + + public String apply() { + return "name: " + i; + } +} diff --git a/src/main/java/io/temporal/samples/complex/Starter.java b/src/main/java/io/temporal/samples/complex/Starter.java new file mode 100644 index 000000000..271394acf --- /dev/null +++ b/src/main/java/io/temporal/samples/complex/Starter.java @@ -0,0 +1,53 @@ +package io.temporal.samples.complex; + +import static io.temporal.samples.complex.ComplexWorkflow.TASK_QUEUE; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.client.WorkflowOptions; +import io.temporal.common.converter.ByteArrayPayloadConverter; +import io.temporal.common.converter.DefaultDataConverter; +import io.temporal.common.converter.JacksonJsonPayloadConverter; +import io.temporal.common.converter.NullPayloadConverter; +import io.temporal.common.converter.ProtobufJsonPayloadConverter; +import io.temporal.serviceclient.WorkflowServiceStubs; + +public class Starter { + public static void main(String[] args) { + // gRPC stubs wrapper that talks to the local docker instance of temporal service. + WorkflowServiceStubs service = WorkflowServiceStubs.newInstance(); + + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); + mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); + mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + DefaultDataConverter dataConverter = + new DefaultDataConverter( + new NullPayloadConverter(), + new ByteArrayPayloadConverter(), + new ProtobufJsonPayloadConverter(), + new JacksonJsonPayloadConverter(mapper)); + + WorkflowClientOptions options = + WorkflowClientOptions.newBuilder().setDataConverter(dataConverter).build(); + // client that can be used to start and signal workflows + WorkflowClient client = WorkflowClient.newInstance(service, options); + + for (int i = 0; i < 1000; i++) { + ComplexWorkflow workflow = + client.newWorkflowStub( + ComplexWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build()); + // Execute a workflow waiting for it to complete. See {@link + // io.temporal.samples.hello.HelloSignal} + // for an example of starting workflow without waiting synchronously for its result. + + WorkflowClient.start(workflow::handleLambda, new Input(i)); + System.out.println("done " + i); + } + System.exit(0); + } +} diff --git a/src/main/java/io/temporal/samples/complex/Worker.java b/src/main/java/io/temporal/samples/complex/Worker.java new file mode 100644 index 000000000..637294677 --- /dev/null +++ b/src/main/java/io/temporal/samples/complex/Worker.java @@ -0,0 +1,29 @@ +package io.temporal.samples.complex; + +import static io.temporal.samples.complex.ComplexWorkflow.TASK_QUEUE; + +import io.temporal.client.WorkflowClient; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.serviceclient.WorkflowServiceStubsOptions; +import io.temporal.worker.WorkerFactory; + +public class Worker { + public static void main(String[] args) { + // gRPC stubs wrapper that talks to the local docker instance of temporal service. + WorkflowServiceStubs service = + WorkflowServiceStubs.newInstance( + WorkflowServiceStubsOptions.newBuilder().setTarget("192.168.0.3:7233").build()); + // client that can be used to start and signal workflows + WorkflowClient client = WorkflowClient.newInstance(service); + + // worker factory that can be used to create workers for specific task queues + WorkerFactory factory = WorkerFactory.newInstance(client); + // Worker that listens on a task queue and hosts both workflow and activity implementations. + io.temporal.worker.Worker worker = factory.newWorker(TASK_QUEUE); + // Workflows are stateful. So you need a type to create instances. + worker.registerWorkflowImplementationTypes(ComplexWorkflowImpl.class); + // Activities are stateless and thread safe. So a shared instance is used. + // Start listening to the workflow and activity task queues. + factory.start(); + } +} From 0f250550dccd9a39e107a3627b8654521cba0398 Mon Sep 17 00:00:00 2001 From: Tuan Nguyen Date: Thu, 21 Jan 2021 16:01:24 +0100 Subject: [PATCH 2/5] add benchmark --- docker/buildkite/docker-compose.yaml | 2 +- .../temporal/samples/hello/HelloActivity.java | 18 ++--- .../temporal/samples/hello/HelloStarter.java | 65 +++++++++++++++++++ 3 files changed, 70 insertions(+), 15 deletions(-) create mode 100644 src/main/java/io/temporal/samples/hello/HelloStarter.java diff --git a/docker/buildkite/docker-compose.yaml b/docker/buildkite/docker-compose.yaml index 19cf5cc2d..cf6d6811d 100644 --- a/docker/buildkite/docker-compose.yaml +++ b/docker/buildkite/docker-compose.yaml @@ -5,5 +5,5 @@ services: build: context: ../../ dockerfile: ./docker/buildkite/Dockerfile - command: "./gradlew -q execute -PmainClass=io.temporal.samples.complex.Worker" + command: "./gradlew -q execute -PmainClass=io.temporal.samples.hello.HelloActivity" diff --git a/src/main/java/io/temporal/samples/hello/HelloActivity.java b/src/main/java/io/temporal/samples/hello/HelloActivity.java index 837c90754..010b03b03 100644 --- a/src/main/java/io/temporal/samples/hello/HelloActivity.java +++ b/src/main/java/io/temporal/samples/hello/HelloActivity.java @@ -23,8 +23,8 @@ import io.temporal.activity.ActivityMethod; import io.temporal.activity.ActivityOptions; import io.temporal.client.WorkflowClient; -import io.temporal.client.WorkflowOptions; import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.serviceclient.WorkflowServiceStubsOptions; import io.temporal.worker.Worker; import io.temporal.worker.WorkerFactory; import io.temporal.workflow.Workflow; @@ -83,7 +83,9 @@ public String composeGreeting(String greeting, String name) { public static void main(String[] args) { // gRPC stubs wrapper that talks to the local docker instance of temporal service. - WorkflowServiceStubs service = WorkflowServiceStubs.newInstance(); + WorkflowServiceStubs service = + WorkflowServiceStubs.newInstance( + WorkflowServiceStubsOptions.newBuilder().setTarget("192.168.0.3:7233").build()); // client that can be used to start and signal workflows WorkflowClient client = WorkflowClient.newInstance(service); @@ -97,17 +99,5 @@ public static void main(String[] args) { worker.registerActivitiesImplementations(new GreetingActivitiesImpl()); // Start listening to the workflow and activity task queues. factory.start(); - - // Start a workflow execution. Usually this is done from another program. - // Uses task queue from the GreetingWorkflow @WorkflowMethod annotation. - GreetingWorkflow workflow = - client.newWorkflowStub( - GreetingWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build()); - // Execute a workflow waiting for it to complete. See {@link - // io.temporal.samples.hello.HelloSignal} - // for an example of starting workflow without waiting synchronously for its result. - String greeting = workflow.getGreeting("World"); - System.out.println(greeting); - System.exit(0); } } diff --git a/src/main/java/io/temporal/samples/hello/HelloStarter.java b/src/main/java/io/temporal/samples/hello/HelloStarter.java new file mode 100644 index 000000000..b09da902f --- /dev/null +++ b/src/main/java/io/temporal/samples/hello/HelloStarter.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.hello; + +import static io.temporal.samples.hello.HelloActivity.TASK_QUEUE; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.serviceclient.WorkflowServiceStubs; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * Hello World Temporal workflow that executes a single activity. Requires a local instance the + * Temporal service to be running. + */ +public class HelloStarter { + + public static void main(String[] args) throws InterruptedException { + + WorkflowServiceStubs service = WorkflowServiceStubs.newInstance(); + WorkflowClient client = WorkflowClient.newInstance(service); + + long start = System.currentTimeMillis(); + ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(30); + CountDownLatch latch = new CountDownLatch(1000); + for (int i = 0; i < 1000; i++) { + executor.submit( + () -> { + HelloActivity.GreetingWorkflow workflow = + client.newWorkflowStub( + HelloActivity.GreetingWorkflow.class, + WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build()); + // Execute a workflow waiting for it to complete. See {@link + // io.temporal.samples.hello.HelloSignal} + // for an example of starting workflow without waiting synchronously for its result. + String greeting = workflow.getGreeting("World"); + System.out.println(greeting); + latch.countDown(); + }); + } + + latch.await(); + System.out.println("time = " + (System.currentTimeMillis() - start)); + System.exit(0); + } +} From dbe68fcf80501cc0d91b27de8b1af3b039651d29 Mon Sep 17 00:00:00 2001 From: Tuan Nguyen Date: Fri, 22 Jan 2021 09:49:40 +0100 Subject: [PATCH 3/5] simulate long activities --- .../java/io/temporal/samples/hello/HelloActivity.java | 8 +++++++- src/main/java/io/temporal/samples/hello/HelloStarter.java | 6 +++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/temporal/samples/hello/HelloActivity.java b/src/main/java/io/temporal/samples/hello/HelloActivity.java index 010b03b03..98a7aa67c 100644 --- a/src/main/java/io/temporal/samples/hello/HelloActivity.java +++ b/src/main/java/io/temporal/samples/hello/HelloActivity.java @@ -31,6 +31,7 @@ import io.temporal.workflow.WorkflowInterface; import io.temporal.workflow.WorkflowMethod; import java.time.Duration; +import java.util.concurrent.TimeUnit; /** * Hello World Temporal workflow that executes a single activity. Requires a local instance the @@ -65,7 +66,7 @@ public static class GreetingWorkflowImpl implements GreetingWorkflow { private final GreetingActivities activities = Workflow.newActivityStub( GreetingActivities.class, - ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofSeconds(2)).build()); + ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofSeconds(60)).build()); @Override public String getGreeting(String name) { @@ -77,6 +78,11 @@ public String getGreeting(String name) { static class GreetingActivitiesImpl implements GreetingActivities { @Override public String composeGreeting(String greeting, String name) { + try { + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException e) { + e.printStackTrace(); + } return greeting + " " + name + "!"; } } diff --git a/src/main/java/io/temporal/samples/hello/HelloStarter.java b/src/main/java/io/temporal/samples/hello/HelloStarter.java index b09da902f..bfe00ee88 100644 --- a/src/main/java/io/temporal/samples/hello/HelloStarter.java +++ b/src/main/java/io/temporal/samples/hello/HelloStarter.java @@ -42,13 +42,13 @@ public static void main(String[] args) throws InterruptedException { long start = System.currentTimeMillis(); ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(30); CountDownLatch latch = new CountDownLatch(1000); + WorkflowOptions workflowOptions = WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build(); + System.out.println(workflowOptions); for (int i = 0; i < 1000; i++) { executor.submit( () -> { HelloActivity.GreetingWorkflow workflow = - client.newWorkflowStub( - HelloActivity.GreetingWorkflow.class, - WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build()); + client.newWorkflowStub(HelloActivity.GreetingWorkflow.class, workflowOptions); // Execute a workflow waiting for it to complete. See {@link // io.temporal.samples.hello.HelloSignal} // for an example of starting workflow without waiting synchronously for its result. From d7b7c5fe48937f48c98c086f54cd821dbeb9948f Mon Sep 17 00:00:00 2001 From: Tuan Nguyen Date: Fri, 22 Jan 2021 15:48:03 +0100 Subject: [PATCH 4/5] fix starter --- src/main/java/io/temporal/samples/hello/HelloActivity.java | 5 ++++- src/main/java/io/temporal/samples/hello/HelloStarter.java | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/temporal/samples/hello/HelloActivity.java b/src/main/java/io/temporal/samples/hello/HelloActivity.java index 98a7aa67c..e715e005c 100644 --- a/src/main/java/io/temporal/samples/hello/HelloActivity.java +++ b/src/main/java/io/temporal/samples/hello/HelloActivity.java @@ -83,6 +83,7 @@ public String composeGreeting(String greeting, String name) { } catch (InterruptedException e) { e.printStackTrace(); } + System.out.println(Thread.currentThread().getId()); return greeting + " " + name + "!"; } } @@ -91,7 +92,9 @@ public static void main(String[] args) { // gRPC stubs wrapper that talks to the local docker instance of temporal service. WorkflowServiceStubs service = WorkflowServiceStubs.newInstance( - WorkflowServiceStubsOptions.newBuilder().setTarget("192.168.0.3:7233").build()); + WorkflowServiceStubsOptions.newBuilder() + .setTarget("temporaltest-frontend:7233") + .build()); // client that can be used to start and signal workflows WorkflowClient client = WorkflowClient.newInstance(service); diff --git a/src/main/java/io/temporal/samples/hello/HelloStarter.java b/src/main/java/io/temporal/samples/hello/HelloStarter.java index bfe00ee88..a2c00ff7c 100644 --- a/src/main/java/io/temporal/samples/hello/HelloStarter.java +++ b/src/main/java/io/temporal/samples/hello/HelloStarter.java @@ -40,7 +40,7 @@ public static void main(String[] args) throws InterruptedException { WorkflowClient client = WorkflowClient.newInstance(service); long start = System.currentTimeMillis(); - ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(30); + ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1000); CountDownLatch latch = new CountDownLatch(1000); WorkflowOptions workflowOptions = WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build(); System.out.println(workflowOptions); From 1680b401a9cdc285f44b7702bb6e743f93f4b06c Mon Sep 17 00:00:00 2001 From: Tuan Nguyen Date: Mon, 25 Jan 2021 17:34:10 +0100 Subject: [PATCH 5/5] use k8s service address --- .../temporal/samples/hello/HelloActivity.java | 2 +- .../temporal/samples/hello/HelloStarter.java | 26 ++++++++++++++----- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/src/main/java/io/temporal/samples/hello/HelloActivity.java b/src/main/java/io/temporal/samples/hello/HelloActivity.java index e715e005c..15833c2cc 100644 --- a/src/main/java/io/temporal/samples/hello/HelloActivity.java +++ b/src/main/java/io/temporal/samples/hello/HelloActivity.java @@ -66,7 +66,7 @@ public static class GreetingWorkflowImpl implements GreetingWorkflow { private final GreetingActivities activities = Workflow.newActivityStub( GreetingActivities.class, - ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofSeconds(60)).build()); + ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofMinutes(5)).build()); @Override public String getGreeting(String name) { diff --git a/src/main/java/io/temporal/samples/hello/HelloStarter.java b/src/main/java/io/temporal/samples/hello/HelloStarter.java index a2c00ff7c..51a5172ad 100644 --- a/src/main/java/io/temporal/samples/hello/HelloStarter.java +++ b/src/main/java/io/temporal/samples/hello/HelloStarter.java @@ -24,9 +24,11 @@ import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; import io.temporal.serviceclient.WorkflowServiceStubs; -import java.util.concurrent.CountDownLatch; +import io.temporal.serviceclient.WorkflowServiceStubsOptions; +import java.time.Duration; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * Hello World Temporal workflow that executes a single activity. Requires a local instance the @@ -36,13 +38,25 @@ public class HelloStarter { public static void main(String[] args) throws InterruptedException { - WorkflowServiceStubs service = WorkflowServiceStubs.newInstance(); + WorkflowServiceStubs service = + WorkflowServiceStubs.newInstance( + WorkflowServiceStubsOptions.newBuilder() + .setRpcTimeout(Duration.ofMinutes(5)) + .setQueryRpcTimeout(Duration.ofMinutes(5)) + .setRpcLongPollTimeout(Duration.ofMinutes(5)) + .setTarget("temporaltest-frontend:7233") + .build()); WorkflowClient client = WorkflowClient.newInstance(service); long start = System.currentTimeMillis(); ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1000); - CountDownLatch latch = new CountDownLatch(1000); - WorkflowOptions workflowOptions = WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build(); + WorkflowOptions workflowOptions = + WorkflowOptions.newBuilder() + .setTaskQueue(TASK_QUEUE) + .setWorkflowTaskTimeout(Duration.ofMinutes(5)) + .setWorkflowRunTimeout(Duration.ofMinutes(5)) + .setWorkflowExecutionTimeout(Duration.ofMinutes(5)) + .build(); System.out.println(workflowOptions); for (int i = 0; i < 1000; i++) { executor.submit( @@ -54,11 +68,11 @@ public static void main(String[] args) throws InterruptedException { // for an example of starting workflow without waiting synchronously for its result. String greeting = workflow.getGreeting("World"); System.out.println(greeting); - latch.countDown(); }); } - latch.await(); + executor.shutdown(); + executor.awaitTermination(5, TimeUnit.MINUTES); System.out.println("time = " + (System.currentTimeMillis() - start)); System.exit(0); }