Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ dependencies {
implementation group: 'commons-configuration', name: 'commons-configuration', version: '1.10'
implementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'

implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.12.1'

testImplementation group: 'junit', name: 'junit', version: '4.13.1'
testImplementation group: 'org.mockito', name: 'mockito-all', version: '1.10.19'
testImplementation group: 'org.powermock', name: 'powermock-api-mockito', version: '1.7.4'
Expand Down
1 change: 1 addition & 0 deletions docker/buildkite/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ RUN apk update && apk add --virtual wget ca-certificates wget && apk add protobu
RUN apk add --virtual git

RUN mkdir /temporal-java-samples
COPY . /temporal-java-samples
WORKDIR /temporal-java-samples
9 changes: 3 additions & 6 deletions docker/buildkite/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
version: '3.5'

services:
unit-test:
worker:
build:
context: ../../
dockerfile: ./docker/buildkite/Dockerfile
command: "./gradlew --no-daemon test"
environment:
- "USER=unittest"
volumes:
- "../../:/temporal-java-samples"
command: "./gradlew -q execute -PmainClass=io.temporal.samples.hello.HelloActivity"

12 changes: 12 additions & 0 deletions src/main/java/io/temporal/samples/complex/ComplexWorkflow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.temporal.samples.complex;

import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;

@WorkflowInterface
public interface ComplexWorkflow {
String TASK_QUEUE = "complex";

@WorkflowMethod
void handleLambda(Input input);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.temporal.samples.complex;

public class ComplexWorkflowImpl implements ComplexWorkflow {
@Override
public void handleLambda(Input input) {
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

real logic is here. Arbitrary workload should be done in Activity. WorkflowMethod only contains deterministic code.

System.out.println(input.apply());
}
}
17 changes: 17 additions & 0 deletions src/main/java/io/temporal/samples/complex/Input.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.temporal.samples.complex;

public class Input {
private int i;

public Input() {
this.i = 0;
}

public Input(int i) {
this.i = i;
}

public String apply() {
return "name: " + i;
}
}
53 changes: 53 additions & 0 deletions src/main/java/io/temporal/samples/complex/Starter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.temporal.samples.complex;

import static io.temporal.samples.complex.ComplexWorkflow.TASK_QUEUE;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowClientOptions;
import io.temporal.client.WorkflowOptions;
import io.temporal.common.converter.ByteArrayPayloadConverter;
import io.temporal.common.converter.DefaultDataConverter;
import io.temporal.common.converter.JacksonJsonPayloadConverter;
import io.temporal.common.converter.NullPayloadConverter;
import io.temporal.common.converter.ProtobufJsonPayloadConverter;
import io.temporal.serviceclient.WorkflowServiceStubs;

public class Starter {
public static void main(String[] args) {
// gRPC stubs wrapper that talks to the local docker instance of temporal service.
WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();

ObjectMapper mapper = new ObjectMapper();
mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
DefaultDataConverter dataConverter =
new DefaultDataConverter(
new NullPayloadConverter(),
new ByteArrayPayloadConverter(),
new ProtobufJsonPayloadConverter(),
new JacksonJsonPayloadConverter(mapper));

WorkflowClientOptions options =
WorkflowClientOptions.newBuilder().setDataConverter(dataConverter).build();
// client that can be used to start and signal workflows
WorkflowClient client = WorkflowClient.newInstance(service, options);

for (int i = 0; i < 1000; i++) {
ComplexWorkflow workflow =
client.newWorkflowStub(
ComplexWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build());
// Execute a workflow waiting for it to complete. See {@link
// io.temporal.samples.hello.HelloSignal}
// for an example of starting workflow without waiting synchronously for its result.

WorkflowClient.start(workflow::handleLambda, new Input(i));
System.out.println("done " + i);
}
System.exit(0);
}
}
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this class publishes jobs to Temporal server.

29 changes: 29 additions & 0 deletions src/main/java/io/temporal/samples/complex/Worker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.temporal.samples.complex;

import static io.temporal.samples.complex.ComplexWorkflow.TASK_QUEUE;

import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
import io.temporal.worker.WorkerFactory;

public class Worker {
public static void main(String[] args) {
// gRPC stubs wrapper that talks to the local docker instance of temporal service.
WorkflowServiceStubs service =
WorkflowServiceStubs.newInstance(
WorkflowServiceStubsOptions.newBuilder().setTarget("192.168.0.3:7233").build());
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to register the worker with Temporal service.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by default, it connects to the local Temporal service

// client that can be used to start and signal workflows
WorkflowClient client = WorkflowClient.newInstance(service);

// worker factory that can be used to create workers for specific task queues
WorkerFactory factory = WorkerFactory.newInstance(client);
// Worker that listens on a task queue and hosts both workflow and activity implementations.
io.temporal.worker.Worker worker = factory.newWorker(TASK_QUEUE);
// Workflows are stateful. So you need a type to create instances.
worker.registerWorkflowImplementationTypes(ComplexWorkflowImpl.class);
// Activities are stateless and thread safe. So a shared instance is used.
// Start listening to the workflow and activity task queues.
factory.start();
}
}
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the worker which handles the task

29 changes: 14 additions & 15 deletions src/main/java/io/temporal/samples/hello/HelloActivity.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
import io.temporal.activity.ActivityMethod;
import io.temporal.activity.ActivityOptions;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

/**
* Hello World Temporal workflow that executes a single activity. Requires a local instance the
Expand Down Expand Up @@ -65,7 +66,7 @@ public static class GreetingWorkflowImpl implements GreetingWorkflow {
private final GreetingActivities activities =
Workflow.newActivityStub(
GreetingActivities.class,
ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofSeconds(2)).build());
ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofMinutes(5)).build());

@Override
public String getGreeting(String name) {
Expand All @@ -77,13 +78,23 @@ public String getGreeting(String name) {
static class GreetingActivitiesImpl implements GreetingActivities {
@Override
public String composeGreeting(String greeting, String name) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getId());
return greeting + " " + name + "!";
}
}

public static void main(String[] args) {
// gRPC stubs wrapper that talks to the local docker instance of temporal service.
WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
WorkflowServiceStubs service =
WorkflowServiceStubs.newInstance(
WorkflowServiceStubsOptions.newBuilder()
.setTarget("temporaltest-frontend:7233")
.build());
// client that can be used to start and signal workflows
WorkflowClient client = WorkflowClient.newInstance(service);

Expand All @@ -97,17 +108,5 @@ public static void main(String[] args) {
worker.registerActivitiesImplementations(new GreetingActivitiesImpl());
// Start listening to the workflow and activity task queues.
factory.start();

// Start a workflow execution. Usually this is done from another program.
// Uses task queue from the GreetingWorkflow @WorkflowMethod annotation.
GreetingWorkflow workflow =
client.newWorkflowStub(
GreetingWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build());
// Execute a workflow waiting for it to complete. See {@link
// io.temporal.samples.hello.HelloSignal}
// for an example of starting workflow without waiting synchronously for its result.
String greeting = workflow.getGreeting("World");
System.out.println(greeting);
System.exit(0);
}
}
79 changes: 79 additions & 0 deletions src/main/java/io/temporal/samples/hello/HelloStarter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.samples.hello;

import static io.temporal.samples.hello.HelloActivity.TASK_QUEUE;

import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* Hello World Temporal workflow that executes a single activity. Requires a local instance the
* Temporal service to be running.
*/
public class HelloStarter {

public static void main(String[] args) throws InterruptedException {

WorkflowServiceStubs service =
WorkflowServiceStubs.newInstance(
WorkflowServiceStubsOptions.newBuilder()
.setRpcTimeout(Duration.ofMinutes(5))
.setQueryRpcTimeout(Duration.ofMinutes(5))
.setRpcLongPollTimeout(Duration.ofMinutes(5))
.setTarget("temporaltest-frontend:7233")
.build());
WorkflowClient client = WorkflowClient.newInstance(service);

long start = System.currentTimeMillis();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1000);
WorkflowOptions workflowOptions =
WorkflowOptions.newBuilder()
.setTaskQueue(TASK_QUEUE)
.setWorkflowTaskTimeout(Duration.ofMinutes(5))
.setWorkflowRunTimeout(Duration.ofMinutes(5))
.setWorkflowExecutionTimeout(Duration.ofMinutes(5))
.build();
System.out.println(workflowOptions);
for (int i = 0; i < 1000; i++) {
executor.submit(
() -> {
HelloActivity.GreetingWorkflow workflow =
client.newWorkflowStub(HelloActivity.GreetingWorkflow.class, workflowOptions);
// Execute a workflow waiting for it to complete. See {@link
// io.temporal.samples.hello.HelloSignal}
// for an example of starting workflow without waiting synchronously for its result.
String greeting = workflow.getGreeting("World");
System.out.println(greeting);
});
}

executor.shutdown();
executor.awaitTermination(5, TimeUnit.MINUTES);
System.out.println("time = " + (System.currentTimeMillis() - start));
System.exit(0);
}
}