diff --git a/build.gradle b/build.gradle index 7c5924efc..4f596dde2 100644 --- a/build.gradle +++ b/build.gradle @@ -34,6 +34,8 @@ dependencies { implementation group: 'commons-configuration', name: 'commons-configuration', version: '1.10' implementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3' + implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.12.1' + testImplementation group: 'junit', name: 'junit', version: '4.13.1' testImplementation group: 'org.mockito', name: 'mockito-all', version: '1.10.19' testImplementation group: 'org.powermock', name: 'powermock-api-mockito', version: '1.7.4' diff --git a/docker/buildkite/Dockerfile b/docker/buildkite/Dockerfile index 2c62dbb9d..44d9b0af5 100644 --- a/docker/buildkite/Dockerfile +++ b/docker/buildkite/Dockerfile @@ -13,4 +13,5 @@ RUN apk update && apk add --virtual wget ca-certificates wget && apk add protobu RUN apk add --virtual git RUN mkdir /temporal-java-samples +COPY . /temporal-java-samples WORKDIR /temporal-java-samples diff --git a/docker/buildkite/docker-compose.yaml b/docker/buildkite/docker-compose.yaml index f19fbb834..cf6d6811d 100644 --- a/docker/buildkite/docker-compose.yaml +++ b/docker/buildkite/docker-compose.yaml @@ -1,12 +1,9 @@ version: '3.5' services: - unit-test: + worker: build: context: ../../ dockerfile: ./docker/buildkite/Dockerfile - command: "./gradlew --no-daemon test" - environment: - - "USER=unittest" - volumes: - - "../../:/temporal-java-samples" + command: "./gradlew -q execute -PmainClass=io.temporal.samples.hello.HelloActivity" + diff --git a/src/main/java/io/temporal/samples/complex/ComplexWorkflow.java b/src/main/java/io/temporal/samples/complex/ComplexWorkflow.java new file mode 100644 index 000000000..9c965d408 --- /dev/null +++ b/src/main/java/io/temporal/samples/complex/ComplexWorkflow.java @@ -0,0 +1,12 @@ +package io.temporal.samples.complex; + +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +@WorkflowInterface +public interface ComplexWorkflow { + String TASK_QUEUE = "complex"; + + @WorkflowMethod + void handleLambda(Input input); +} diff --git a/src/main/java/io/temporal/samples/complex/ComplexWorkflowImpl.java b/src/main/java/io/temporal/samples/complex/ComplexWorkflowImpl.java new file mode 100644 index 000000000..80463f3e6 --- /dev/null +++ b/src/main/java/io/temporal/samples/complex/ComplexWorkflowImpl.java @@ -0,0 +1,8 @@ +package io.temporal.samples.complex; + +public class ComplexWorkflowImpl implements ComplexWorkflow { + @Override + public void handleLambda(Input input) { + System.out.println(input.apply()); + } +} diff --git a/src/main/java/io/temporal/samples/complex/Input.java b/src/main/java/io/temporal/samples/complex/Input.java new file mode 100644 index 000000000..7d0c7e8c9 --- /dev/null +++ b/src/main/java/io/temporal/samples/complex/Input.java @@ -0,0 +1,17 @@ +package io.temporal.samples.complex; + +public class Input { + private int i; + + public Input() { + this.i = 0; + } + + public Input(int i) { + this.i = i; + } + + public String apply() { + return "name: " + i; + } +} diff --git a/src/main/java/io/temporal/samples/complex/Starter.java b/src/main/java/io/temporal/samples/complex/Starter.java new file mode 100644 index 000000000..271394acf --- /dev/null +++ b/src/main/java/io/temporal/samples/complex/Starter.java @@ -0,0 +1,53 @@ +package io.temporal.samples.complex; + +import static io.temporal.samples.complex.ComplexWorkflow.TASK_QUEUE; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.client.WorkflowOptions; +import io.temporal.common.converter.ByteArrayPayloadConverter; +import io.temporal.common.converter.DefaultDataConverter; +import io.temporal.common.converter.JacksonJsonPayloadConverter; +import io.temporal.common.converter.NullPayloadConverter; +import io.temporal.common.converter.ProtobufJsonPayloadConverter; +import io.temporal.serviceclient.WorkflowServiceStubs; + +public class Starter { + public static void main(String[] args) { + // gRPC stubs wrapper that talks to the local docker instance of temporal service. + WorkflowServiceStubs service = WorkflowServiceStubs.newInstance(); + + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); + mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); + mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + DefaultDataConverter dataConverter = + new DefaultDataConverter( + new NullPayloadConverter(), + new ByteArrayPayloadConverter(), + new ProtobufJsonPayloadConverter(), + new JacksonJsonPayloadConverter(mapper)); + + WorkflowClientOptions options = + WorkflowClientOptions.newBuilder().setDataConverter(dataConverter).build(); + // client that can be used to start and signal workflows + WorkflowClient client = WorkflowClient.newInstance(service, options); + + for (int i = 0; i < 1000; i++) { + ComplexWorkflow workflow = + client.newWorkflowStub( + ComplexWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build()); + // Execute a workflow waiting for it to complete. See {@link + // io.temporal.samples.hello.HelloSignal} + // for an example of starting workflow without waiting synchronously for its result. + + WorkflowClient.start(workflow::handleLambda, new Input(i)); + System.out.println("done " + i); + } + System.exit(0); + } +} diff --git a/src/main/java/io/temporal/samples/complex/Worker.java b/src/main/java/io/temporal/samples/complex/Worker.java new file mode 100644 index 000000000..637294677 --- /dev/null +++ b/src/main/java/io/temporal/samples/complex/Worker.java @@ -0,0 +1,29 @@ +package io.temporal.samples.complex; + +import static io.temporal.samples.complex.ComplexWorkflow.TASK_QUEUE; + +import io.temporal.client.WorkflowClient; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.serviceclient.WorkflowServiceStubsOptions; +import io.temporal.worker.WorkerFactory; + +public class Worker { + public static void main(String[] args) { + // gRPC stubs wrapper that talks to the local docker instance of temporal service. + WorkflowServiceStubs service = + WorkflowServiceStubs.newInstance( + WorkflowServiceStubsOptions.newBuilder().setTarget("192.168.0.3:7233").build()); + // client that can be used to start and signal workflows + WorkflowClient client = WorkflowClient.newInstance(service); + + // worker factory that can be used to create workers for specific task queues + WorkerFactory factory = WorkerFactory.newInstance(client); + // Worker that listens on a task queue and hosts both workflow and activity implementations. + io.temporal.worker.Worker worker = factory.newWorker(TASK_QUEUE); + // Workflows are stateful. So you need a type to create instances. + worker.registerWorkflowImplementationTypes(ComplexWorkflowImpl.class); + // Activities are stateless and thread safe. So a shared instance is used. + // Start listening to the workflow and activity task queues. + factory.start(); + } +} diff --git a/src/main/java/io/temporal/samples/hello/HelloActivity.java b/src/main/java/io/temporal/samples/hello/HelloActivity.java index 837c90754..15833c2cc 100644 --- a/src/main/java/io/temporal/samples/hello/HelloActivity.java +++ b/src/main/java/io/temporal/samples/hello/HelloActivity.java @@ -23,14 +23,15 @@ import io.temporal.activity.ActivityMethod; import io.temporal.activity.ActivityOptions; import io.temporal.client.WorkflowClient; -import io.temporal.client.WorkflowOptions; import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.serviceclient.WorkflowServiceStubsOptions; import io.temporal.worker.Worker; import io.temporal.worker.WorkerFactory; import io.temporal.workflow.Workflow; import io.temporal.workflow.WorkflowInterface; import io.temporal.workflow.WorkflowMethod; import java.time.Duration; +import java.util.concurrent.TimeUnit; /** * Hello World Temporal workflow that executes a single activity. Requires a local instance the @@ -65,7 +66,7 @@ public static class GreetingWorkflowImpl implements GreetingWorkflow { private final GreetingActivities activities = Workflow.newActivityStub( GreetingActivities.class, - ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofSeconds(2)).build()); + ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofMinutes(5)).build()); @Override public String getGreeting(String name) { @@ -77,13 +78,23 @@ public String getGreeting(String name) { static class GreetingActivitiesImpl implements GreetingActivities { @Override public String composeGreeting(String greeting, String name) { + try { + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println(Thread.currentThread().getId()); return greeting + " " + name + "!"; } } public static void main(String[] args) { // gRPC stubs wrapper that talks to the local docker instance of temporal service. - WorkflowServiceStubs service = WorkflowServiceStubs.newInstance(); + WorkflowServiceStubs service = + WorkflowServiceStubs.newInstance( + WorkflowServiceStubsOptions.newBuilder() + .setTarget("temporaltest-frontend:7233") + .build()); // client that can be used to start and signal workflows WorkflowClient client = WorkflowClient.newInstance(service); @@ -97,17 +108,5 @@ public static void main(String[] args) { worker.registerActivitiesImplementations(new GreetingActivitiesImpl()); // Start listening to the workflow and activity task queues. factory.start(); - - // Start a workflow execution. Usually this is done from another program. - // Uses task queue from the GreetingWorkflow @WorkflowMethod annotation. - GreetingWorkflow workflow = - client.newWorkflowStub( - GreetingWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build()); - // Execute a workflow waiting for it to complete. See {@link - // io.temporal.samples.hello.HelloSignal} - // for an example of starting workflow without waiting synchronously for its result. - String greeting = workflow.getGreeting("World"); - System.out.println(greeting); - System.exit(0); } } diff --git a/src/main/java/io/temporal/samples/hello/HelloStarter.java b/src/main/java/io/temporal/samples/hello/HelloStarter.java new file mode 100644 index 000000000..51a5172ad --- /dev/null +++ b/src/main/java/io/temporal/samples/hello/HelloStarter.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.hello; + +import static io.temporal.samples.hello.HelloActivity.TASK_QUEUE; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.serviceclient.WorkflowServiceStubsOptions; +import java.time.Duration; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Hello World Temporal workflow that executes a single activity. Requires a local instance the + * Temporal service to be running. + */ +public class HelloStarter { + + public static void main(String[] args) throws InterruptedException { + + WorkflowServiceStubs service = + WorkflowServiceStubs.newInstance( + WorkflowServiceStubsOptions.newBuilder() + .setRpcTimeout(Duration.ofMinutes(5)) + .setQueryRpcTimeout(Duration.ofMinutes(5)) + .setRpcLongPollTimeout(Duration.ofMinutes(5)) + .setTarget("temporaltest-frontend:7233") + .build()); + WorkflowClient client = WorkflowClient.newInstance(service); + + long start = System.currentTimeMillis(); + ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1000); + WorkflowOptions workflowOptions = + WorkflowOptions.newBuilder() + .setTaskQueue(TASK_QUEUE) + .setWorkflowTaskTimeout(Duration.ofMinutes(5)) + .setWorkflowRunTimeout(Duration.ofMinutes(5)) + .setWorkflowExecutionTimeout(Duration.ofMinutes(5)) + .build(); + System.out.println(workflowOptions); + for (int i = 0; i < 1000; i++) { + executor.submit( + () -> { + HelloActivity.GreetingWorkflow workflow = + client.newWorkflowStub(HelloActivity.GreetingWorkflow.class, workflowOptions); + // Execute a workflow waiting for it to complete. See {@link + // io.temporal.samples.hello.HelloSignal} + // for an example of starting workflow without waiting synchronously for its result. + String greeting = workflow.getGreeting("World"); + System.out.println(greeting); + }); + } + + executor.shutdown(); + executor.awaitTermination(5, TimeUnit.MINUTES); + System.out.println("time = " + (System.currentTimeMillis() - start)); + System.exit(0); + } +}