From 86134e0712932c4107195528afa69fff42a2e845 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Sat, 7 Jun 2025 11:04:59 -0700 Subject: [PATCH 1/2] Add a Nexus early return sample --- build.gradle | 2 +- .../samples/nexusEarlyReturn/README.MD | 38 ++++ .../caller/CallerStarter.java | 51 ++++++ .../nexusEarlyReturn/caller/CallerWorker.java | 50 +++++ .../caller/TransferWorkflow.java | 30 +++ .../caller/TransferWorkflowImpl.java | 80 ++++++++ .../handler/HandlerWorker.java | 44 +++++ .../handler/TransactionServiceImpl.java | 171 ++++++++++++++++++ .../options/ClientOptions.java | 150 +++++++++++++++ .../service/TransactionService.java | 123 +++++++++++++ .../nexusEarlyReturn/service/description.md | 8 + 11 files changed, 746 insertions(+), 1 deletion(-) create mode 100644 core/src/main/java/io/temporal/samples/nexusEarlyReturn/README.MD create mode 100644 core/src/main/java/io/temporal/samples/nexusEarlyReturn/caller/CallerStarter.java create mode 100644 core/src/main/java/io/temporal/samples/nexusEarlyReturn/caller/CallerWorker.java create mode 100644 core/src/main/java/io/temporal/samples/nexusEarlyReturn/caller/TransferWorkflow.java create mode 100644 core/src/main/java/io/temporal/samples/nexusEarlyReturn/caller/TransferWorkflowImpl.java create mode 100644 core/src/main/java/io/temporal/samples/nexusEarlyReturn/handler/HandlerWorker.java create mode 100644 core/src/main/java/io/temporal/samples/nexusEarlyReturn/handler/TransactionServiceImpl.java create mode 100644 core/src/main/java/io/temporal/samples/nexusEarlyReturn/options/ClientOptions.java create mode 100644 core/src/main/java/io/temporal/samples/nexusEarlyReturn/service/TransactionService.java create mode 100644 core/src/main/java/io/temporal/samples/nexusEarlyReturn/service/description.md diff --git a/build.gradle b/build.gradle index f1731c4fd..38dd33581 100644 --- a/build.gradle +++ b/build.gradle @@ -28,7 +28,7 @@ subprojects { ext { otelVersion = '1.30.1' otelVersionAlpha = "${otelVersion}-alpha" - javaSDKVersion = '1.28.1' + javaSDKVersion = '1.28.4' camelVersion = '3.22.1' jarVersion = '1.0.0' } diff --git a/core/src/main/java/io/temporal/samples/nexusEarlyReturn/README.MD b/core/src/main/java/io/temporal/samples/nexusEarlyReturn/README.MD new file mode 100644 index 000000000..a28b28319 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexusEarlyReturn/README.MD @@ -0,0 +1,38 @@ +# Nexus Early Return + +This sample demonstrates the early return pattern with Nexus. + +For more details on the early return pattern see the [Early Return Sample](../earlyreturn/README.md) + +To run this sample, set up your environment following the instructions in the main [Nexus Sample](../nexus/README.md). + +In separate terminal windows: + +### Nexus handler worker + +``` +./gradlew -q execute -PmainClass=io.temporal.samples.nexusEarlyReturn.handler.HandlerWorker \ + --args="-target-host localhost:7233 -namespace my-target-namespace" +``` + +### Nexus caller worker + +``` +./gradlew -q execute -PmainClass=io.temporal.samples.nexusEarlyReturn.caller.CallerWorker \ + --args="-target-host localhost:7233 -namespace my-caller-namespace" +``` + +### Start caller workflow + +``` +./gradlew -q execute -PmainClass=io.temporal.samples.nexusEarlyReturn.caller.CallerStarter \ + --args="-target-host localhost:7233 -namespace my-caller-namespace" +``` + +### Output + +which should result in: +``` +10:52:08.542 { } [main] INFO i.t.s.n.caller.CallerStarter - Started TransferWorkflow workflowId: 1526a62a-cc76-49a9-bc10-b91d67b4f0e8 runId: 01974b85-7b59-76e5-80aa-3ab0dfb983ef +10:52:12.084 { } [main] INFO i.t.s.n.caller.CallerStarter - Workflow result: Transaction completed successfully. +``` diff --git a/core/src/main/java/io/temporal/samples/nexusEarlyReturn/caller/CallerStarter.java b/core/src/main/java/io/temporal/samples/nexusEarlyReturn/caller/CallerStarter.java new file mode 100644 index 000000000..2685f1fcd --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexusEarlyReturn/caller/CallerStarter.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.nexusEarlyReturn.caller; + +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.samples.earlyreturn.TransactionRequest; +import io.temporal.samples.nexusEarlyReturn.options.ClientOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CallerStarter { + private static final Logger logger = LoggerFactory.getLogger(CallerStarter.class); + + public static void main(String[] args) { + WorkflowClient client = ClientOptions.getWorkflowClient(args); + + WorkflowOptions workflowOptions = + WorkflowOptions.newBuilder().setTaskQueue(CallerWorker.DEFAULT_TASK_QUEUE_NAME).build(); + TransferWorkflow transferWorkflow = + client.newWorkflowStub(TransferWorkflow.class, workflowOptions); + WorkflowExecution execution = + WorkflowClient.start( + transferWorkflow::transfer, new TransactionRequest("source", "target", 100)); + logger.info( + "Started TransferWorkflow workflowId: {} runId: {}", + execution.getWorkflowId(), + execution.getRunId()); + logger.info( + "Workflow result: {}", + transferWorkflow.transfer(new TransactionRequest("source", "target", 100))); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexusEarlyReturn/caller/CallerWorker.java b/core/src/main/java/io/temporal/samples/nexusEarlyReturn/caller/CallerWorker.java new file mode 100644 index 000000000..69f654804 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexusEarlyReturn/caller/CallerWorker.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.nexusEarlyReturn.caller; + +import io.temporal.client.WorkflowClient; +import io.temporal.samples.nexusEarlyReturn.options.ClientOptions; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import io.temporal.worker.WorkflowImplementationOptions; +import io.temporal.workflow.NexusServiceOptions; +import java.util.Collections; + +public class CallerWorker { + public static final String DEFAULT_TASK_QUEUE_NAME = "my-caller-workflow-task-queue"; + + public static void main(String[] args) { + WorkflowClient client = ClientOptions.getWorkflowClient(args); + + WorkerFactory factory = WorkerFactory.newInstance(client); + + Worker worker = factory.newWorker(DEFAULT_TASK_QUEUE_NAME); + worker.registerWorkflowImplementationTypes( + WorkflowImplementationOptions.newBuilder() + .setNexusServiceOptions( + Collections.singletonMap( + "TransactionService", + NexusServiceOptions.newBuilder().setEndpoint("my-nexus-endpoint-name").build())) + .build(), + TransferWorkflowImpl.class); + + factory.start(); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexusEarlyReturn/caller/TransferWorkflow.java b/core/src/main/java/io/temporal/samples/nexusEarlyReturn/caller/TransferWorkflow.java new file mode 100644 index 000000000..2c1ea59e5 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexusEarlyReturn/caller/TransferWorkflow.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.nexusEarlyReturn.caller; + +import io.temporal.samples.earlyreturn.TransactionRequest; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +@WorkflowInterface +public interface TransferWorkflow { + @WorkflowMethod + String transfer(TransactionRequest message); +} diff --git a/core/src/main/java/io/temporal/samples/nexusEarlyReturn/caller/TransferWorkflowImpl.java b/core/src/main/java/io/temporal/samples/nexusEarlyReturn/caller/TransferWorkflowImpl.java new file mode 100644 index 000000000..562cb4b8c --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexusEarlyReturn/caller/TransferWorkflowImpl.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.nexusEarlyReturn.caller; + +import io.temporal.failure.ApplicationFailure; +import io.temporal.failure.NexusOperationFailure; +import io.temporal.samples.earlyreturn.TransactionRequest; +import io.temporal.samples.earlyreturn.TxResult; +import io.temporal.samples.nexusEarlyReturn.service.TransactionService; +import io.temporal.workflow.NexusOperationOptions; +import io.temporal.workflow.NexusServiceOptions; +import io.temporal.workflow.Workflow; +import java.time.Duration; + +/** + * TransferWorkflowImpl starts a transfer and waits for it to complete (if the transaction is + * successful or not). + */ +public class TransferWorkflowImpl implements TransferWorkflow { + TransactionService transactionService = + Workflow.newNexusServiceStub( + TransactionService.class, + NexusServiceOptions.newBuilder() + .setOperationOptions( + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build()) + .build()); + + @Override + public String transfer(TransactionRequest request) { + try { + // Start a transaction using the TransactionService Nexus service. Once the transaction is + // started, it will run asynchronously, and we can check the result later. + TransactionService.StartTransactionResponse r = + transactionService.startTransaction( + new TransactionService.StartTransactionRequest(request)); + // Note: this random sleep is to simulate some processing time before checking the result. + // Depending on how long the sleep is, the transaction may complete before we check the + // result. + Workflow.sleep(Duration.ofMillis(Workflow.newRandom().nextInt(100))); + TxResult result = + transactionService.getTransactionResult( + new TransactionService.GetTransactionResultRequest(r.getTransactionToken())); + return result.getStatus(); + } catch (NexusOperationFailure of) { + // If the operation failed, we check if it was due to a transaction failure. + if (of.getCause() instanceof ApplicationFailure) { + ApplicationFailure af = (ApplicationFailure) of.getCause(); + if (af.getType().equals("TransactionFailed")) { + // If the transaction failed, we can retrieve the transaction token from the details + // and use it to wait for the transaction to cancel. + String transactionToken = af.getDetails().get(String.class); + TxResult result = + transactionService.getTransactionResult( + new TransactionService.GetTransactionResultRequest(transactionToken)); + return result.getStatus(); + } + } + throw of; + } + } +} diff --git a/core/src/main/java/io/temporal/samples/nexusEarlyReturn/handler/HandlerWorker.java b/core/src/main/java/io/temporal/samples/nexusEarlyReturn/handler/HandlerWorker.java new file mode 100644 index 000000000..2a337be1d --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexusEarlyReturn/handler/HandlerWorker.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.nexusEarlyReturn.handler; + +import io.temporal.client.WorkflowClient; +import io.temporal.samples.earlyreturn.TransactionActivitiesImpl; +import io.temporal.samples.earlyreturn.TransactionWorkflowImpl; +import io.temporal.samples.nexusEarlyReturn.options.ClientOptions; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; + +public class HandlerWorker { + public static final String DEFAULT_TASK_QUEUE_NAME = "my-handler-task-queue"; + + public static void main(String[] args) { + WorkflowClient client = ClientOptions.getWorkflowClient(args); + + WorkerFactory factory = WorkerFactory.newInstance(client); + + Worker worker = factory.newWorker(DEFAULT_TASK_QUEUE_NAME); + worker.registerWorkflowImplementationTypes(TransactionWorkflowImpl.class); + worker.registerActivitiesImplementations(new TransactionActivitiesImpl()); + worker.registerNexusServiceImplementation(new TransactionServiceImpl()); + + factory.start(); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexusEarlyReturn/handler/TransactionServiceImpl.java b/core/src/main/java/io/temporal/samples/nexusEarlyReturn/handler/TransactionServiceImpl.java new file mode 100644 index 000000000..57a09bd39 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexusEarlyReturn/handler/TransactionServiceImpl.java @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.nexusEarlyReturn.handler; + +import io.nexusrpc.OperationException; +import io.nexusrpc.OperationInfo; +import io.nexusrpc.handler.*; +import io.temporal.api.enums.v1.WorkflowExecutionStatus; +import io.temporal.api.enums.v1.WorkflowIdConflictPolicy; +import io.temporal.api.enums.v1.WorkflowIdReusePolicy; +import io.temporal.client.*; +import io.temporal.failure.ApplicationFailure; +import io.temporal.nexus.Nexus; +import io.temporal.nexus.WorkflowRunOperation; +import io.temporal.samples.earlyreturn.TransactionRequest; +import io.temporal.samples.earlyreturn.TransactionWorkflow; +import io.temporal.samples.earlyreturn.TxResult; +import io.temporal.samples.nexusEarlyReturn.service.TransactionService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Base64; + +import static java.nio.charset.StandardCharsets.UTF_8; + +@ServiceImpl(service = TransactionService.class) +public class TransactionServiceImpl { + private static final Logger logger = LoggerFactory.getLogger(TransactionServiceImpl.class); + private final Base64.Encoder encoder = Base64.getEncoder().withoutPadding(); + private final Base64.Decoder decoder = Base64.getDecoder(); + + @OperationImpl + public OperationHandler< + TransactionService.StartTransactionRequest, TransactionService.StartTransactionResponse> + startTransaction() { + return OperationHandler.sync( + (ctx, details, request) -> { + // Note: It is important to use a unique workflow ID for each transaction since + // the workflow ID is used to identify the transaction in the system, and we can only get + // the result by the workflow ID since Temporal only supports attaching to workflow by + // workflowID. + String workflowId = "transaction-" + details.getRequestId(); + WorkflowOptions options = + WorkflowOptions.newBuilder() + .setTaskQueue(HandlerWorker.DEFAULT_TASK_QUEUE_NAME) + .setWorkflowIdReusePolicy( + WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE) + .setWorkflowIdConflictPolicy( + WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_FAIL) + .setWorkflowId(workflowId) + .build(); + TransactionWorkflow workflow = + Nexus.getOperationContext() + .getWorkflowClient() + .newWorkflowStub(TransactionWorkflow.class, options); + String transactionToken = encoder.encodeToString(workflowId.getBytes(UTF_8)); + try { + TxResult result = + WorkflowClient.executeUpdateWithStart( + workflow::returnInitResult, + UpdateOptions.newBuilder().build(), + new WithStartWorkflowOperation<>( + workflow::processTransaction, request.getTransactionRequest())); + return new TransactionService.StartTransactionResponse(transactionToken, result); + } catch (WorkflowUpdateException e) { + throw OperationException.failure( + ApplicationFailure.newNonRetryableFailureWithCause( + "Transaction invalid, cancellation.", + "TransactionFailed", + e.getCause(), + transactionToken)); + } + }); + } + + @OperationImpl + public OperationHandler + getTransactionResult() { + class AlreadyStartedWorkflowOperationHandler + implements OperationHandler { + @Override + public OperationStartResult start( + OperationContext context, + OperationStartDetails osd, + TransactionService.GetTransactionResultRequest request) + throws HandlerException, OperationException { + String workflowId = + new String(decoder.decode(request.getTransactionToken().getBytes(UTF_8)), UTF_8); + WorkflowClient client = Nexus.getOperationContext().getWorkflowClient(); + // Describe the workflow to ensure it exists before attempting to attach so we + // don't accidentally start a new workflow when trying to attach. + WorkflowStub stub = client.newUntypedWorkflowStub(workflowId); + try { + WorkflowExecutionDescription desc = stub.describe(); + // If the workflow is not running, we can fetch the result directly. + if (!desc.getStatus().equals(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING)) { + return OperationStartResult.sync(stub.getResult(TxResult.class)); + } + } catch (WorkflowNotFoundException e) { + throw OperationException.failure( + new IllegalStateException( + "Workflow with ID " + workflowId + " not found. Ensure it was started first.", + e)); + } + // Create a workflow stub with the same ID and task queue as the original workflow. + // With this, when we attempt to start the workflow attach to the existing workflow if it is + // still running or fail if it has already completed. + TransactionWorkflow workflowStub = + client.newWorkflowStub( + TransactionWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowIdConflictPolicy( + WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING) + .setWorkflowIdReusePolicy( + WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE) + .setTaskQueue(HandlerWorker.DEFAULT_TASK_QUEUE_NAME) + .setWorkflowId(workflowId) + .build()); + OperationHandler workflowOperation = + WorkflowRunOperation.fromWorkflowMethod( + (ctx, details, input) -> workflowStub::processTransaction); + try { + // This should always attach to an existing workflow since we know the workflow was + // already started. + // So the input does not matter here. + return workflowOperation.start(context, osd, new TransactionRequest("", "", 0)); + } catch (WorkflowExecutionAlreadyStarted e) { + // If we get this exception, it means the workflow completed between when we called + // describe and start, now we can + // fetch the result directly. + logger.info("Workflow already started for ID: {}", workflowId); + return OperationStartResult.sync(stub.getResult(TxResult.class)); + } + } + + @Override + public TxResult fetchResult(OperationContext context, OperationFetchResultDetails details) + throws HandlerException { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public OperationInfo fetchInfo(OperationContext context, OperationFetchInfoDetails details) + throws HandlerException { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void cancel(OperationContext context, OperationCancelDetails details) + throws HandlerException {} + } + return new AlreadyStartedWorkflowOperationHandler(); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexusEarlyReturn/options/ClientOptions.java b/core/src/main/java/io/temporal/samples/nexusEarlyReturn/options/ClientOptions.java new file mode 100644 index 000000000..6b379c39e --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexusEarlyReturn/options/ClientOptions.java @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.nexusEarlyReturn.options; + +import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; +import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder; +import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.serviceclient.WorkflowServiceStubsOptions; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import javax.net.ssl.SSLException; +import org.apache.commons.cli.*; + +public class ClientOptions { + + public static WorkflowClient getWorkflowClient(String[] args) { + return getWorkflowClient(args, WorkflowClientOptions.newBuilder()); + } + + public static WorkflowClient getWorkflowClient( + String[] args, WorkflowClientOptions.Builder clientOptions) { + Options options = new Options(); + Option targetHostOption = new Option("target-host", true, "Host:port for the Temporal service"); + targetHostOption.setRequired(false); + options.addOption(targetHostOption); + + Option namespaceOption = new Option("namespace", true, "Namespace to connect to"); + namespaceOption.setRequired(false); + options.addOption(namespaceOption); + + Option serverRootCaOption = + new Option("server-root-ca-cert", true, "Optional path to root server CA cert"); + serverRootCaOption.setRequired(false); + options.addOption(serverRootCaOption); + + Option clientCertOption = + new Option( + "client-cert", true, "Optional path to client cert, mutually exclusive with API key"); + clientCertOption.setRequired(false); + options.addOption(clientCertOption); + + Option clientKeyOption = + new Option( + "client-key", true, "Optional path to client key, mutually exclusive with API key"); + clientKeyOption.setRequired(false); + options.addOption(clientKeyOption); + + Option apiKeyOption = + new Option("api-key", true, "Optional API key, mutually exclusive with cert/key"); + apiKeyOption.setRequired(false); + options.addOption(apiKeyOption); + + Option serverNameOption = + new Option( + "server-name", true, "Server name to use for verifying the server's certificate"); + serverNameOption.setRequired(false); + options.addOption(serverNameOption); + + Option insercureSkipVerifyOption = + new Option( + "insecure-skip-verify", + false, + "Skip verification of the server's certificate and host name"); + insercureSkipVerifyOption.setRequired(false); + options.addOption(insercureSkipVerifyOption); + + CommandLineParser parser = new DefaultParser(); + HelpFormatter formatter = new HelpFormatter(); + CommandLine cmd = null; + + try { + cmd = parser.parse(options, args); + } catch (ParseException e) { + System.out.println(e.getMessage()); + formatter.printHelp("utility-name", options); + + System.exit(1); + } + + String targetHost = cmd.getOptionValue("target-host", "localhost:7233"); + String namespace = cmd.getOptionValue("namespace", "default"); + String serverRootCaCert = cmd.getOptionValue("server-root-ca-cert", ""); + String clientCert = cmd.getOptionValue("client-cert", ""); + String clientKey = cmd.getOptionValue("client-key", ""); + String serverName = cmd.getOptionValue("server-name", ""); + boolean insecureSkipVerify = cmd.hasOption("insecure-skip-verify"); + String apiKey = cmd.getOptionValue("api-key", ""); + + // API key and client cert/key are mutually exclusive + if (!apiKey.isEmpty() && (!clientCert.isEmpty() || !clientKey.isEmpty())) { + throw new IllegalArgumentException("API key and client cert/key are mutually exclusive"); + } + WorkflowServiceStubsOptions.Builder serviceStubOptionsBuilder = + WorkflowServiceStubsOptions.newBuilder().setTarget(targetHost); + // Configure TLS if client cert and key are provided + if (!clientCert.isEmpty() || !clientKey.isEmpty()) { + if (clientCert.isEmpty() || clientKey.isEmpty()) { + throw new IllegalArgumentException("Both client-cert and client-key must be provided"); + } + try { + SslContextBuilder sslContext = + SslContextBuilder.forClient() + .keyManager(new FileInputStream(clientCert), new FileInputStream(clientKey)); + if (serverRootCaCert != null && !serverRootCaCert.isEmpty()) { + sslContext.trustManager(new FileInputStream(serverRootCaCert)); + } + if (insecureSkipVerify) { + sslContext.trustManager(InsecureTrustManagerFactory.INSTANCE); + } + serviceStubOptionsBuilder.setSslContext(GrpcSslContexts.configure(sslContext).build()); + } catch (SSLException e) { + throw new RuntimeException(e); + } catch (FileNotFoundException e) { + throw new RuntimeException(e); + } + if (serverName != null && !serverName.isEmpty()) { + serviceStubOptionsBuilder.setChannelInitializer(c -> c.overrideAuthority(serverName)); + } + } + // Configure API key if provided + if (!apiKey.isEmpty()) { + serviceStubOptionsBuilder.setEnableHttps(true); + serviceStubOptionsBuilder.addApiKey(() -> apiKey); + } + + WorkflowServiceStubs service = + WorkflowServiceStubs.newServiceStubs(serviceStubOptionsBuilder.build()); + return WorkflowClient.newInstance(service, clientOptions.setNamespace(namespace).build()); + } +} diff --git a/core/src/main/java/io/temporal/samples/nexusEarlyReturn/service/TransactionService.java b/core/src/main/java/io/temporal/samples/nexusEarlyReturn/service/TransactionService.java new file mode 100644 index 000000000..bf45f96e5 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexusEarlyReturn/service/TransactionService.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.nexusEarlyReturn.service; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.nexusrpc.Operation; +import io.nexusrpc.Service; +import io.temporal.samples.earlyreturn.TransactionRequest; +import io.temporal.samples.earlyreturn.TxResult; + +/** + * TransactionService is a Nexus service that allows starting a transfer between accounts and + * retrieving the result of that transfer asynchronously. Each transaction is identified by a + * transaction token, which is an opaque string that can be used to query the status of the + * transaction. + */ +@Service +public interface TransactionService { + class StartTransactionRequest { + private final TransactionRequest transactionRequest; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public StartTransactionRequest( + @JsonProperty("transactionRequest") TransactionRequest transactionRequest) { + this.transactionRequest = transactionRequest; + } + + @JsonProperty("transactionRequest") + public TransactionRequest getTransactionRequest() { + return transactionRequest; + } + } + + class StartTransactionResponse { + private final String transactionToken; + private final TxResult txResult; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public StartTransactionResponse( + @JsonProperty("transactionToken") String transactionToken, + @JsonProperty("txResult") TxResult txResult) { + this.txResult = txResult; + this.transactionToken = transactionToken; + } + + /** + * Returns the transaction token that can be used to retrieve the result of the transaction. + * This is an opaque string that is unique for each transaction in the {@link + * TransactionService}. + * + * @return The transaction token. + */ + @JsonProperty("transactionToken") + public String getTransactionToken() { + return transactionToken; + } + + /** + * Returns the initial result of the transaction, which may include details such as the + * transaction ID and status. + * + * @return The initial transaction result. + */ + @JsonProperty("txResult") + public TxResult getTxResult() { + return txResult; + } + } + + class GetTransactionResultRequest { + private final String transactionToken; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public GetTransactionResultRequest(@JsonProperty("transactionToken") String transactionToken) { + this.transactionToken = transactionToken; + } + + @JsonProperty("transactionToken") + public String getTransactionToken() { + return transactionToken; + } + } + + /** + * Starts a transaction to transfer funds between accounts. If the transaction is initialized + * successfully, it returns a transaction token that can be used to wait for the transaction to + * complete. If the transaction fails during initialization, it throws an {@link + * io.temporal.failure.ApplicationFailure} with the transaction token as a detail, which can be + * used to wait for the transaction to be cancelled. + * + * @param request The request containing the transaction details. + * @return A response containing the transaction token and initial result. + */ + @Operation + StartTransactionResponse startTransaction(StartTransactionRequest request); + + /** + * Wait for a transaction to complete and return the result using the transaction token. + * + * @param request The request containing the transaction token. + * @return The result of the transaction. + */ + @Operation + TxResult getTransactionResult(GetTransactionResultRequest request); +} diff --git a/core/src/main/java/io/temporal/samples/nexusEarlyReturn/service/description.md b/core/src/main/java/io/temporal/samples/nexusEarlyReturn/service/description.md new file mode 100644 index 000000000..75fa5c5b7 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/nexusEarlyReturn/service/description.md @@ -0,0 +1,8 @@ +Service Name: +TransactionService +Operation Names: +startTransaction +getTransactionResult + +Input / Output arguments are in the following repository: +https://github.com/temporalio/samples-java/core/src/main/java/io/temporal/samples/nexusEarlyReturn/service/NexusService.java From fcd44cb66bc4e61d7dbe690eaacf7f5fceb5c382 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Sat, 7 Jun 2025 11:07:30 -0700 Subject: [PATCH 2/2] run spotless --- .../nexusEarlyReturn/handler/TransactionServiceImpl.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/io/temporal/samples/nexusEarlyReturn/handler/TransactionServiceImpl.java b/core/src/main/java/io/temporal/samples/nexusEarlyReturn/handler/TransactionServiceImpl.java index 57a09bd39..6a88d8e90 100644 --- a/core/src/main/java/io/temporal/samples/nexusEarlyReturn/handler/TransactionServiceImpl.java +++ b/core/src/main/java/io/temporal/samples/nexusEarlyReturn/handler/TransactionServiceImpl.java @@ -19,6 +19,8 @@ package io.temporal.samples.nexusEarlyReturn.handler; +import static java.nio.charset.StandardCharsets.UTF_8; + import io.nexusrpc.OperationException; import io.nexusrpc.OperationInfo; import io.nexusrpc.handler.*; @@ -33,13 +35,10 @@ import io.temporal.samples.earlyreturn.TransactionWorkflow; import io.temporal.samples.earlyreturn.TxResult; import io.temporal.samples.nexusEarlyReturn.service.TransactionService; +import java.util.Base64; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Base64; - -import static java.nio.charset.StandardCharsets.UTF_8; - @ServiceImpl(service = TransactionService.class) public class TransactionServiceImpl { private static final Logger logger = LoggerFactory.getLogger(TransactionServiceImpl.class);