From 6bcdab56e7add8a038e6d017970bd165386128fd Mon Sep 17 00:00:00 2001 From: Mike Nichols Date: Thu, 2 May 2024 16:10:10 -0400 Subject: [PATCH 1/5] adapt code to actually DO cancellation and show the thread does not go "rogue". also removed a timeout you should not prefer --- .../samples/custom_activity/Main.java | 139 ++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 core/src/main/java/io/temporal/samples/custom_activity/Main.java diff --git a/core/src/main/java/io/temporal/samples/custom_activity/Main.java b/core/src/main/java/io/temporal/samples/custom_activity/Main.java new file mode 100644 index 000000000..fea92891a --- /dev/null +++ b/core/src/main/java/io/temporal/samples/custom_activity/Main.java @@ -0,0 +1,139 @@ +package io.temporal.samples.custom_activity; + +import io.temporal.activity.*; +import io.temporal.client.ActivityCompletionException; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.serviceclient.WorkflowServiceStubsOptions; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import io.temporal.workflow.*; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class Main { + static final String TASK_QUEUE = "CustomActivityTaskQueue"; + static final String WORKFLOW_ID = "workflow_id_" + UUID.randomUUID(); + + public static void main(String[] args) { + WorkflowServiceStubs service = + WorkflowServiceStubs.newServiceStubs( + WorkflowServiceStubsOptions.newBuilder().setTarget("127.0.0.1:7233").build()); + WorkflowClient client = WorkflowClient.newInstance(service); + WorkerFactory factory = WorkerFactory.newInstance(client); + Worker worker = factory.newWorker(TASK_QUEUE); + worker.registerWorkflowImplementationTypes(CustomWorkflowImpl.class); + worker.registerActivitiesImplementations(new CustomActivityImpl()); + factory.start(); + CustomWorkflow workflow = + client.newWorkflowStub( + CustomWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(WORKFLOW_ID) + .setTaskQueue(TASK_QUEUE) + .build()); + + String result = workflow.doSomeWork(); + System.out.println(result); + try { + // pause and make sure we dont have a "rogue thread" that is still executing... + Thread.sleep(3000); + } catch (InterruptedException e) { + System.out.println(e.getMessage()); + } + System.exit(0); + } + + @ActivityInterface + public interface CustomActivity { + @ActivityMethod + String doSomeWork(); + } + + @WorkflowInterface + public interface CustomWorkflow { + @WorkflowMethod + String doSomeWork(); + } + + public static class CustomActivityImpl implements CustomActivity { + + @Override + public String doSomeWork() { + Instant exitAt = Instant.now().plus(Duration.ofHours(1)); + var executionContext = Activity.getExecutionContext(); + final ScheduledExecutorService scheduledExecutor = + Executors.newSingleThreadScheduledExecutor(); + try { + var unused = + scheduledExecutor.scheduleAtFixedRate( + () -> { + try { + System.out.println("sending heartbeat"); + executionContext.heartbeat(1); + } catch (ActivityCompletionException e) { + System.out.println("activitycompletionexception " + e.getMessage()); + throw e; + } + }, + 0, + 1, + TimeUnit.SECONDS); + while (Instant.now().isBefore(exitAt)) { + sleep(Duration.ofSeconds(5)); + } + return "Done"; + } finally { + System.out.println("shutting down heartbeat thread"); + scheduledExecutor.shutdown(); + } + } + + private void sleep(Duration duration) { + try { + System.out.println("Sleeping for " + duration); + Thread.sleep(duration.toMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + } + + public static class CustomWorkflowImpl implements CustomWorkflow { + private final CustomActivity customActivity = + Workflow.newActivityStub( + CustomActivity.class, + ActivityOptions.newBuilder() + .setTaskQueue(TASK_QUEUE) + .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) + .setHeartbeatTimeout(Duration.ofSeconds(2)) + .setStartToCloseTimeout(Duration.ofHours(1)) + .build()); + + @Override + public String doSomeWork() { + List> results = new ArrayList<>(1); + + CancellationScope scope = + Workflow.newCancellationScope( + () -> { + results.add(Async.function(customActivity::doSomeWork)); + }); + scope.run(); + Workflow.sleep(3000); + scope.cancel(); + System.out.println("failure " + results.get(0).getFailure().getMessage()); + // String result = Promise.anyOf(results).get(); + + return "Cancellation worked"; + } + } +} From 44ae49d5954c02b6e22930d34ffb27ba4fb2f370 Mon Sep 17 00:00:00 2001 From: Mike Nichols Date: Tue, 16 Jul 2024 18:05:39 -0400 Subject: [PATCH 2/5] working out backgrounding activity AND heartbeat --- .../samples/custom_activity/Main.java | 1 + .../samples/hello/HeartbeatUtils.java | 62 +++ ...CancellationScopeMultithreadHeartbeat.java | 444 ++++++++++++++++++ 3 files changed, 507 insertions(+) create mode 100644 core/src/main/java/io/temporal/samples/hello/HeartbeatUtils.java create mode 100644 core/src/main/java/io/temporal/samples/hello/HelloCancellationScopeMultithreadHeartbeat.java diff --git a/core/src/main/java/io/temporal/samples/custom_activity/Main.java b/core/src/main/java/io/temporal/samples/custom_activity/Main.java index fea92891a..c0b147969 100644 --- a/core/src/main/java/io/temporal/samples/custom_activity/Main.java +++ b/core/src/main/java/io/temporal/samples/custom_activity/Main.java @@ -63,6 +63,7 @@ public interface CustomWorkflow { String doSomeWork(); } + // https://aozturk.medium.com/how-to-handle-uncaught-exceptions-in-java-abf819347906 public static class CustomActivityImpl implements CustomActivity { @Override diff --git a/core/src/main/java/io/temporal/samples/hello/HeartbeatUtils.java b/core/src/main/java/io/temporal/samples/hello/HeartbeatUtils.java new file mode 100644 index 000000000..4b9b87f85 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/hello/HeartbeatUtils.java @@ -0,0 +1,62 @@ +package io.temporal.samples.hello; + +import io.temporal.activity.ActivityExecutionContext; +import io.temporal.client.ActivityCompletionException; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HeartbeatUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatUtils.class); + + public static T withBackgroundHeartbeatAndActivity( + final AtomicReference cancellationCallbackRef, + final Callable callable, + final Supplier activityContext, + final int heartbeatIntervalSeconds) throws ExecutionException { + + var context = activityContext.get(); + final ScheduledExecutorService heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(); + final ScheduledExecutorService activityExecutor = Executors.newSingleThreadScheduledExecutor(); + var activityInvocation = activityExecutor.schedule(callable, 0, TimeUnit.SECONDS); + final AtomicReference canceller = + new AtomicReference<>( + () -> { + LOGGER.warn("canceller is running..."); + activityInvocation.cancel(true); + if (cancellationCallbackRef != null) { + cancellationCallbackRef.get().run(); + } + }); + + var unused = + heartbeatExecutor.scheduleAtFixedRate( + () -> { + try { + LOGGER.info("heartbeating..."); + context.heartbeat(null); + } catch (ActivityCompletionException e) { + LOGGER.warn("received cancellation", e); + canceller.get().run(); + throw e; + } + }, + 0, + heartbeatIntervalSeconds, + TimeUnit.SECONDS); + + try { + return activityInvocation.get(); + } catch (ExecutionException e) { + LOGGER.warn("Background heartbeated invocation interrupt {}", e.getMessage(), e); + throw e; + } catch (InterruptedException e) { + throw new ExecutionException(e); + } finally { + activityExecutor.shutdown(); + heartbeatExecutor.shutdown(); + } + } +} diff --git a/core/src/main/java/io/temporal/samples/hello/HelloCancellationScopeMultithreadHeartbeat.java b/core/src/main/java/io/temporal/samples/hello/HelloCancellationScopeMultithreadHeartbeat.java new file mode 100644 index 000000000..e7ac5f83c --- /dev/null +++ b/core/src/main/java/io/temporal/samples/hello/HelloCancellationScopeMultithreadHeartbeat.java @@ -0,0 +1,444 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.hello; + +import io.temporal.activity.*; +import io.temporal.client.ActivityCompletionException; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.failure.ActivityFailure; +import io.temporal.failure.CanceledFailure; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import io.temporal.worker.WorkerOptions; +import io.temporal.workflow.*; +import java.text.MessageFormat; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Sample Temporal Workflow Definition that demonstrates parallel Activity Executions with a + * Cancellation Scope. When one of the Activity Executions finish, we cancel the execution of the + * other Activities and wait for their cancellation to complete. + */ +public class HelloCancellationScopeMultithreadHeartbeat { + + // Define the task queue name + static final String TASK_QUEUE = "HelloCancellationScopeTaskQueue"; + + // Define our workflow unique id + static final String WORKFLOW_ID = "HelloCancellationScopeWorkflow"; + + /** + * The Workflow Definition's Interface must contain one method annotated with @WorkflowMethod. + * + *

Workflow Definitions should not contain any heavyweight computations, non-deterministic + * code, network calls, database operations, etc. Those things should be handled by the + * Activities. + * + * @see WorkflowInterface + * @see WorkflowMethod + */ + @WorkflowInterface + public interface GreetingWorkflow { + + /** + * This is the method that is executed when the Workflow Execution is started. The Workflow + * Execution completes when this method finishes execution. + */ + @WorkflowMethod + String getGreeting(String name); + } + + /** + * This is the Activity Definition's Interface. Activities are building blocks of any Temporal + * Workflow and contain any business logic that could perform long running computation, network + * calls, etc. + * + *

Annotating Activity Definition methods with @ActivityMethod is optional. + * + * @see ActivityInterface + * @see io.temporal.activity.ActivityMethod + */ + @ActivityInterface + public interface GreetingActivities { + String composeGreeting(String greeting, String name); + } + + // Define the workflow implementation which implements our getGreeting workflow method. + public static class GreetingWorkflowImpl implements GreetingWorkflow { + + private static final int ACTIVITY_MAX_SLEEP_SECONDS = 30; + private static final int ACTIVITY_MAX_CLEANUP_SECONDS = 5; + private static final int ACTIVITY_START_TO_CLOSE_TIMEOUT = + ACTIVITY_MAX_SLEEP_SECONDS + ACTIVITY_MAX_CLEANUP_SECONDS + 10; + + // private static final String[] greetings = + // new String[] {"Hello", "Bye", "Hola", "Привет", "Oi", "Hallo"}; + private static final String[] greetings = new String[] {"Hello"}; + + /** + * Define the GreetingActivities stub. Activity stubs are proxies for activity invocations that + * are executed outside of the workflow thread on the activity worker, that can be on a + * different host. Temporal is going to dispatch the activity results back to the workflow and + * unblock the stub as soon as activity is completed on the activity worker. + * + *

In the {@link ActivityOptions} definition the "setStartToCloseTimeout" option sets the + * maximum time of a single Activity execution attempt. For this example it is set to 10 + * seconds. + * + *

The "setCancellationType" option means that in case of activity cancellation the activity + * should fail with {@link CanceledFailure}. We set + * ActivityCancellationType.WAIT_CANCELLATION_COMPLETED which denotes that activity should be + * first notified of the cancellation, and cancelled after it can perform some cleanup tasks for + * example. Note that an activity must heartbeat to receive cancellation notifications. + */ + private final GreetingActivities activities = + Workflow.newActivityStub( + GreetingActivities.class, + ActivityOptions.newBuilder() + // if heartbeat timeout is not set, activity heartbeats will be throttled to one + // every 30 seconds + // which is too rare for the cancellations to be delivered in this example. + .setHeartbeatTimeout(Duration.ofSeconds(5)) + .setStartToCloseTimeout(Duration.ofSeconds(ACTIVITY_START_TO_CLOSE_TIMEOUT)) + .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) + .build()); + + @Override + public String getGreeting(String name) { + var logger = Workflow.getLogger(GreetingWorkflowImpl.class); + + List> results = new ArrayList<>(greetings.length); + + /* + * Create our CancellationScope. Within this scope we call the workflow activity + * composeGreeting method asynchronously for each of our defined greetings in different + * languages. + */ + CancellationScope scope = + Workflow.newCancellationScope( + () -> { + for (String greeting : greetings) { + logger.info("starting greeting {}", greeting); + results.add( + Async.function(activities::composeGreeting, greeting, name)); + } + }); + /* + * Execute all activities within the CancellationScope. Note that this execution is + * non-blocking as the code inside our cancellation scope is also non-blocking. + */ + scope.run(); + logger.info("started all the things"); + + // We use "anyOf" here to wait for one of the activity invocations to return + Workflow.newTimer(Duration.ofSeconds(10)).get(); + // String result = Promise.anyOf(results).get(); + // logger.info("received all the things {}", result); + + // Trigger cancellation of all uncompleted activity invocations within the cancellation scope + scope.cancel(); + + logger.info("canceled scoped...moving on"); + /* + * Wait for all activities to perform cleanup if needed. + * For the sake of the example we ignore cancellations and + * get all the results so that we can print them in the end. + * + * Note that we cannot use "allOf" here as that fails on any Promise failures + */ + for (Promise activityResult : results) { + try { + activityResult.get(); + } catch (ActivityFailure e) { + if (!(e.getCause() instanceof CanceledFailure)) { + throw e; + } + } + } + return "don"; + } + } + + static class Greeter implements Callable { + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(Greeter.class); + private int mockActivityTimeSecs = 10; + private String greeting; + private String name; + + public Greeter(int mockActivityTimeSecs, String greeting, String name) { + this.mockActivityTimeSecs = mockActivityTimeSecs; + this.greeting = greeting; + this.name = name; + } + + @Override + public String call() throws Exception { + try { + LOGGER.info( + "GREETER sleeping " + + mockActivityTimeSecs + + " seconds on thread " + + Thread.currentThread().getName()); + Thread.sleep(TimeUnit.SECONDS.toMillis(mockActivityTimeSecs)); + LOGGER.info("GREETER awakened after " + mockActivityTimeSecs + " seconds"); + return greeting + " " + name + "! from thread: " + Thread.currentThread().getName(); + } catch (InterruptedException ee) { + LOGGER.info("GREETER interrupted. aborted"); + throw ee; + } + } + } + /** + * Implementation of our workflow activity interface. It overwrites our defined composeGreeting + * method. + */ + static class GreetingActivitiesImpl implements GreetingActivities { + + private static final Logger LOGGER = LoggerFactory.getLogger(GreetingActivitiesImpl.class); + + @Override + public String composeGreetingMulti(String greeting, String name) { + + var context = Activity.getExecutionContext(); + + var greeter = new Greeter(30, greeting, name); + + final ScheduledExecutorService heartbeatExecutor = + Executors.newSingleThreadScheduledExecutor(); + final ScheduledExecutorService workExecutor = Executors.newSingleThreadScheduledExecutor(); + var greetInvocation = workExecutor.schedule(greeter, 0, TimeUnit.SECONDS); + AtomicReference canceller = + new AtomicReference<>( + () -> { + LOGGER.info("Canceller invoked on thread {}", Thread.currentThread().getName()); + greetInvocation.cancel(true); + // workExecutor.shutdown(); + // heartbeatExecutor.shutdown(); + // cancel the main thing here + }); + var heartbeatInvocation = + heartbeatExecutor.scheduleAtFixedRate( + () -> { + try { + LOGGER.info("heartbeating..."); + context.heartbeat(null); + } catch (ActivityCompletionException e) { + LOGGER.info("activity completed in thread {}", Thread.currentThread().getName()); + canceller.get().run(); + throw e; + } + }, + 0, + 4, + TimeUnit.SECONDS); + + try { + // block here + var result = greetInvocation.get(); + LOGGER.info("unblocked"); + // this is the special sauce...blocking on the heartbeat + var unused = heartbeatInvocation.get(); + LOGGER.info( + "heartbeat invocation got on thread {} / {}", Thread.currentThread().getName(), unused); + LOGGER.info("got result {}", result); + } catch (ExecutionException e) { + LOGGER.info("received execution exception", e); + return MessageFormat.format("Execution exception {0}", e.getMessage()); + } catch (InterruptedException e) { + LOGGER.info("received interrupt exception", e); + } catch (Throwable e) { + LOGGER.info("received unexpected exception", e); + } finally { + LOGGER.info("shutting down the activity executors"); + canceller.get().run(); + // these are redundant + workExecutor.shutdown(); + heartbeatExecutor.shutdown(); + } + return "NEVER GOT IT"; + } + + @Override + public String composeGreetingWithHelper(String greeting, String name) { + // simulate a random time this activity should execute for + Random random = new Random(); + int activityDurationSecs = + random.nextInt(GreetingWorkflowImpl.ACTIVITY_MAX_SLEEP_SECONDS - 5) + 5; + // Get the activity execution context + LOGGER.info( + "composeGreetingWithHelper started with activityDurationSecs {}", activityDurationSecs); + + var greeter = new Greeter(30, greeting, name); + try { + var result = + HeartbeatUtils.withBackgroundHeartbeatAndActivity( + null, greeter, Activity::getExecutionContext, 4); + return result; + } catch (ExecutionException e) { + LOGGER.error("Caught ExecutionException", e); + return "NO SOUP FOR YOU"; + } + } + + @Override + public String composeGreeting(String greeting, String name) { + + // Get the activity execution context + ActivityExecutionContext context = Activity.getExecutionContext(); + + // simulate a random time this activity should execute for + Random random = new Random(); + int seconds = random.nextInt(GreetingWorkflowImpl.ACTIVITY_MAX_SLEEP_SECONDS - 5) + 5; + System.out.println("Activity for " + greeting + " going to take " + seconds + " seconds"); + + for (int i = 0; i < seconds; i++) { + sleep(1); + try { + // Perform the heartbeat. Used to notify the workflow that activity execution is alive + context.heartbeat(i); + } catch (ActivityCompletionException e) { + /* + * Activity heartbeat can throw an exception for multiple reasons, including: + * 1) activity cancellation + * 2) activity not existing (due to a timeout for example) from the service point of view + * 3) activity worker shutdown request + * + * In our case our activity fails because one of the other performed activities + * has completed execution and our workflow method has issued the "cancel" request + * to cancel all other activities in the cancellation scope. + * + * The following code simulates our activity after cancellation "cleanup" + */ + seconds = random.nextInt(GreetingWorkflowImpl.ACTIVITY_MAX_CLEANUP_SECONDS); + System.out.println( + "Activity for " + + greeting + + " was cancelled. Cleanup is expected to take " + + seconds + + " seconds."); + sleep(seconds); + System.out.println("Activity for " + greeting + " finished cancellation"); + throw e; + } + } + + // return results of activity invocation + System.out.println("Activity for " + greeting + " completed"); + return greeting + " " + name + "!"; + } + + private void sleep(int seconds) { + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(seconds)); + } catch (InterruptedException ee) { + // Empty + } + } + } + + /** + * With our Workflow and Activities defined, we can now start execution. The main method starts + * the worker and then the workflow. + */ + public static void main(String[] args) { + + // Get a Workflow service stub. + WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + + /* + * Get a Workflow service client which can be used to start, Signal, and Query Workflow Executions. + */ + WorkflowClient client = WorkflowClient.newInstance(service); + + /* + * Define the workflow factory. It is used to create workflow workers for a specific task queue. + */ + WorkerFactory factory = WorkerFactory.newInstance(client); + + /* + * Define the workflow worker. Workflow workers listen to a defined task queue and process + * workflows and activities. + * + * In the {@link ActivityOptions} definition the + * "setMaxConcurrentActivityExecutionSize" option sets the max number of parallel activity executions allowed + * The "setMaxConcurrentActivityTaskPollers" option sets the number of simultaneous poll requests on activity task queue + */ + Worker worker = + factory.newWorker( + TASK_QUEUE, + WorkerOptions.newBuilder() + .setMaxConcurrentActivityExecutionSize(100) + .setMaxConcurrentActivityTaskPollers(1) + .build()); + + /* + * Register our workflow implementation with the worker. + * Workflow implementations must be known to the worker at runtime in + * order to dispatch workflow tasks. + */ + worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class); + + /* + * Register our Activity Types with the Worker. Since Activities are stateless and thread-safe, + * the Activity Type is a shared instance. + */ + worker.registerActivitiesImplementations(new GreetingActivitiesImpl()); + + /* + * Start all the workers registered for a specific task queue. + * The started workers then start polling for workflows and activities. + */ + factory.start(); + + // Create the workflow client stub. It is used to start our workflow execution. + GreetingWorkflow workflow = + client.newWorkflowStub( + GreetingWorkflow.class, + WorkflowOptions.newBuilder() + .setWorkflowId(WORKFLOW_ID + UUID.randomUUID().toString()) + .setTaskQueue(TASK_QUEUE) + .build()); + + /* + * Execute our workflow and wait for it to complete. The call to our getGreeting method is + * synchronous. + */ + String greeting = workflow.getGreeting("World"); + + // Display workflow execution results + System.out.println(greeting); + // try { + // Thread.sleep(5000); + // } catch (InterruptedException e) { + // System.out.println("failed to sleep after the things"); + // } + System.exit(0); + } +} From b2e61e747521426adf6550c383cb551828ee5725 Mon Sep 17 00:00:00 2001 From: Mike Nichols Date: Tue, 16 Jul 2024 18:27:45 -0400 Subject: [PATCH 3/5] added cancellationexception handling to allow graceful exit --- .../samples/hello/HeartbeatUtils.java | 8 +- ...CancellationScopeMultithreadHeartbeat.java | 155 +++--------------- 2 files changed, 26 insertions(+), 137 deletions(-) diff --git a/core/src/main/java/io/temporal/samples/hello/HeartbeatUtils.java b/core/src/main/java/io/temporal/samples/hello/HeartbeatUtils.java index 4b9b87f85..34407d904 100644 --- a/core/src/main/java/io/temporal/samples/hello/HeartbeatUtils.java +++ b/core/src/main/java/io/temporal/samples/hello/HeartbeatUtils.java @@ -15,7 +15,8 @@ public static T withBackgroundHeartbeatAndActivity( final AtomicReference cancellationCallbackRef, final Callable callable, final Supplier activityContext, - final int heartbeatIntervalSeconds) throws ExecutionException { + final int heartbeatIntervalSeconds) + throws ExecutionException { var context = activityContext.get(); final ScheduledExecutorService heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(); @@ -53,7 +54,10 @@ public static T withBackgroundHeartbeatAndActivity( LOGGER.warn("Background heartbeated invocation interrupt {}", e.getMessage(), e); throw e; } catch (InterruptedException e) { - throw new ExecutionException(e); + throw new ExecutionException(e); + } catch (CancellationException e) { + LOGGER.warn("Cancellation exception", e); + throw new ExecutionException(e); } finally { activityExecutor.shutdown(); heartbeatExecutor.shutdown(); diff --git a/core/src/main/java/io/temporal/samples/hello/HelloCancellationScopeMultithreadHeartbeat.java b/core/src/main/java/io/temporal/samples/hello/HelloCancellationScopeMultithreadHeartbeat.java index e7ac5f83c..535910908 100644 --- a/core/src/main/java/io/temporal/samples/hello/HelloCancellationScopeMultithreadHeartbeat.java +++ b/core/src/main/java/io/temporal/samples/hello/HelloCancellationScopeMultithreadHeartbeat.java @@ -20,7 +20,6 @@ package io.temporal.samples.hello; import io.temporal.activity.*; -import io.temporal.client.ActivityCompletionException; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; import io.temporal.failure.ActivityFailure; @@ -30,14 +29,12 @@ import io.temporal.worker.WorkerFactory; import io.temporal.worker.WorkerOptions; import io.temporal.workflow.*; -import java.text.MessageFormat; import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.UUID; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,7 +97,8 @@ public static class GreetingWorkflowImpl implements GreetingWorkflow { // private static final String[] greetings = // new String[] {"Hello", "Bye", "Hola", "Привет", "Oi", "Hallo"}; - private static final String[] greetings = new String[] {"Hello"}; + private static final String[] greetings = + new String[] {"Hello", "Bye", "Hola", "Привет", "Oi", "Hallo"}; /** * Define the GreetingActivities stub. Activity stubs are proxies for activity invocations that @@ -146,8 +144,7 @@ public String getGreeting(String name) { () -> { for (String greeting : greetings) { logger.info("starting greeting {}", greeting); - results.add( - Async.function(activities::composeGreeting, greeting, name)); + results.add(Async.function(activities::composeGreeting, greeting, name)); } }); /* @@ -182,7 +179,7 @@ public String getGreeting(String name) { } } } - return "don"; + return "done"; } } @@ -202,15 +199,23 @@ public Greeter(int mockActivityTimeSecs, String greeting, String name) { public String call() throws Exception { try { LOGGER.info( - "GREETER sleeping " + "GREETER (" + + greeting + + ") sleeping " + mockActivityTimeSecs + " seconds on thread " + Thread.currentThread().getName()); Thread.sleep(TimeUnit.SECONDS.toMillis(mockActivityTimeSecs)); - LOGGER.info("GREETER awakened after " + mockActivityTimeSecs + " seconds"); + LOGGER.info( + "GREETER (" + greeting + ") awakened after " + mockActivityTimeSecs + " seconds"); return greeting + " " + name + "! from thread: " + Thread.currentThread().getName(); } catch (InterruptedException ee) { - LOGGER.info("GREETER interrupted. aborted"); + LOGGER.info( + "GREETER (" + + greeting + + ") interrupted from timeout of " + + mockActivityTimeSecs + + " - ABORTED!"); throw ee; } } @@ -224,143 +229,23 @@ static class GreetingActivitiesImpl implements GreetingActivities { private static final Logger LOGGER = LoggerFactory.getLogger(GreetingActivitiesImpl.class); @Override - public String composeGreetingMulti(String greeting, String name) { - - var context = Activity.getExecutionContext(); - - var greeter = new Greeter(30, greeting, name); - - final ScheduledExecutorService heartbeatExecutor = - Executors.newSingleThreadScheduledExecutor(); - final ScheduledExecutorService workExecutor = Executors.newSingleThreadScheduledExecutor(); - var greetInvocation = workExecutor.schedule(greeter, 0, TimeUnit.SECONDS); - AtomicReference canceller = - new AtomicReference<>( - () -> { - LOGGER.info("Canceller invoked on thread {}", Thread.currentThread().getName()); - greetInvocation.cancel(true); - // workExecutor.shutdown(); - // heartbeatExecutor.shutdown(); - // cancel the main thing here - }); - var heartbeatInvocation = - heartbeatExecutor.scheduleAtFixedRate( - () -> { - try { - LOGGER.info("heartbeating..."); - context.heartbeat(null); - } catch (ActivityCompletionException e) { - LOGGER.info("activity completed in thread {}", Thread.currentThread().getName()); - canceller.get().run(); - throw e; - } - }, - 0, - 4, - TimeUnit.SECONDS); - - try { - // block here - var result = greetInvocation.get(); - LOGGER.info("unblocked"); - // this is the special sauce...blocking on the heartbeat - var unused = heartbeatInvocation.get(); - LOGGER.info( - "heartbeat invocation got on thread {} / {}", Thread.currentThread().getName(), unused); - LOGGER.info("got result {}", result); - } catch (ExecutionException e) { - LOGGER.info("received execution exception", e); - return MessageFormat.format("Execution exception {0}", e.getMessage()); - } catch (InterruptedException e) { - LOGGER.info("received interrupt exception", e); - } catch (Throwable e) { - LOGGER.info("received unexpected exception", e); - } finally { - LOGGER.info("shutting down the activity executors"); - canceller.get().run(); - // these are redundant - workExecutor.shutdown(); - heartbeatExecutor.shutdown(); - } - return "NEVER GOT IT"; - } - - @Override - public String composeGreetingWithHelper(String greeting, String name) { + public String composeGreeting(String greeting, String name) { // simulate a random time this activity should execute for Random random = new Random(); int activityDurationSecs = random.nextInt(GreetingWorkflowImpl.ACTIVITY_MAX_SLEEP_SECONDS - 5) + 5; // Get the activity execution context - LOGGER.info( - "composeGreetingWithHelper started with activityDurationSecs {}", activityDurationSecs); + LOGGER.info("composeGreeting started with activityDurationSecs {}", activityDurationSecs); - var greeter = new Greeter(30, greeting, name); + var greeter = new Greeter(activityDurationSecs, greeting, name); try { - var result = - HeartbeatUtils.withBackgroundHeartbeatAndActivity( - null, greeter, Activity::getExecutionContext, 4); - return result; + return HeartbeatUtils.withBackgroundHeartbeatAndActivity( + null, greeter, Activity::getExecutionContext, 4); } catch (ExecutionException e) { LOGGER.error("Caught ExecutionException", e); return "NO SOUP FOR YOU"; } } - - @Override - public String composeGreeting(String greeting, String name) { - - // Get the activity execution context - ActivityExecutionContext context = Activity.getExecutionContext(); - - // simulate a random time this activity should execute for - Random random = new Random(); - int seconds = random.nextInt(GreetingWorkflowImpl.ACTIVITY_MAX_SLEEP_SECONDS - 5) + 5; - System.out.println("Activity for " + greeting + " going to take " + seconds + " seconds"); - - for (int i = 0; i < seconds; i++) { - sleep(1); - try { - // Perform the heartbeat. Used to notify the workflow that activity execution is alive - context.heartbeat(i); - } catch (ActivityCompletionException e) { - /* - * Activity heartbeat can throw an exception for multiple reasons, including: - * 1) activity cancellation - * 2) activity not existing (due to a timeout for example) from the service point of view - * 3) activity worker shutdown request - * - * In our case our activity fails because one of the other performed activities - * has completed execution and our workflow method has issued the "cancel" request - * to cancel all other activities in the cancellation scope. - * - * The following code simulates our activity after cancellation "cleanup" - */ - seconds = random.nextInt(GreetingWorkflowImpl.ACTIVITY_MAX_CLEANUP_SECONDS); - System.out.println( - "Activity for " - + greeting - + " was cancelled. Cleanup is expected to take " - + seconds - + " seconds."); - sleep(seconds); - System.out.println("Activity for " + greeting + " finished cancellation"); - throw e; - } - } - - // return results of activity invocation - System.out.println("Activity for " + greeting + " completed"); - return greeting + " " + name + "!"; - } - - private void sleep(int seconds) { - try { - Thread.sleep(TimeUnit.SECONDS.toMillis(seconds)); - } catch (InterruptedException ee) { - // Empty - } - } } /** From 4989cb89eca91d2be99a1d3c750a2bb733ee1216 Mon Sep 17 00:00:00 2001 From: Mike Nichols Date: Wed, 17 Jul 2024 17:19:00 -0400 Subject: [PATCH 4/5] multiple activity invocations failing as I would expect now due to cancellation --- .../samples/hello/HeartbeatUtils.java | 70 +++++++++++++------ ...CancellationScopeMultithreadHeartbeat.java | 11 +-- 2 files changed, 56 insertions(+), 25 deletions(-) diff --git a/core/src/main/java/io/temporal/samples/hello/HeartbeatUtils.java b/core/src/main/java/io/temporal/samples/hello/HeartbeatUtils.java index 34407d904..f2a15d561 100644 --- a/core/src/main/java/io/temporal/samples/hello/HeartbeatUtils.java +++ b/core/src/main/java/io/temporal/samples/hello/HeartbeatUtils.java @@ -2,46 +2,66 @@ import io.temporal.activity.ActivityExecutionContext; import io.temporal.client.ActivityCompletionException; +import io.temporal.failure.ApplicationFailure; +import io.temporal.failure.CanceledFailure; +import java.text.MessageFormat; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import java.util.function.Supplier; -import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class HeartbeatUtils { - private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatUtils.class); + // private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatUtils.class); + // withBackgroundHeartbeatAndActivity runs the underlying activity Callable in a thread and + // heartbeats in another thread + // Cancellation can be ignored by returning `true` from the `shouldIgnoreCancel` predicate, + // otherwise the + // activity Callable is cancelled and a Cancellation failure is thrown. + // Callers should handle `ApplicationFailure` if you are allowing cancellation and determine + // if you want to exit the Activity with or without the failure bubbling up to the Workflow. public static T withBackgroundHeartbeatAndActivity( - final AtomicReference cancellationCallbackRef, - final Callable callable, final Supplier activityContext, - final int heartbeatIntervalSeconds) - throws ExecutionException { + final Callable callable, + final int heartbeatIntervalSeconds, + final Predicate> shouldIgnoreCancel) + throws CanceledFailure { var context = activityContext.get(); + var logger = + LoggerFactory.getLogger( + MessageFormat.format( + "{0}/{1}", HeartbeatUtils.class.getName(), context.getInfo().getActivityId())); final ScheduledExecutorService heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(); final ScheduledExecutorService activityExecutor = Executors.newSingleThreadScheduledExecutor(); var activityInvocation = activityExecutor.schedule(callable, 0, TimeUnit.SECONDS); final AtomicReference canceller = new AtomicReference<>( () -> { - LOGGER.warn("canceller is running..."); + logger.warn("canceller is running..."); activityInvocation.cancel(true); - if (cancellationCallbackRef != null) { - cancellationCallbackRef.get().run(); - } }); var unused = heartbeatExecutor.scheduleAtFixedRate( () -> { try { - LOGGER.info("heartbeating..."); + logger.info("heartbeating..."); context.heartbeat(null); } catch (ActivityCompletionException e) { - LOGGER.warn("received cancellation", e); - canceller.get().run(); - throw e; + logger.warn("received cancellation", e); + try { + if (shouldIgnoreCancel == null || !shouldIgnoreCancel.test(callable)) { + // cancellation should be accepted so cancel the invocation and rethrow the e + canceller.get().run(); + throw e; + } else { + logger.warn("Activity Cancellation ignored so keep heartbeating..."); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } } }, 0, @@ -50,15 +70,23 @@ public static T withBackgroundHeartbeatAndActivity( try { return activityInvocation.get(); - } catch (ExecutionException e) { - LOGGER.warn("Background heartbeated invocation interrupt {}", e.getMessage(), e); - throw e; - } catch (InterruptedException e) { - throw new ExecutionException(e); } catch (CancellationException e) { - LOGGER.warn("Cancellation exception", e); - throw new ExecutionException(e); + logger.warn("Canceled activity invocation", e); + // Opinionated way to keep Workflow from retrying this activity that is no longer going to + // heartbeat. + // if we don't returning a "non-retryable" failure, you will see Heartbeat timeout failures + // but really + // we want to communicate that the activity has been canceled and allow the caller to handle + // the exception. + // We could just rethrow the CancellationException here but then every user of this utility + // would have to convert to a nonretryable error. + throw ApplicationFailure.newNonRetryableFailureWithCause( + e.getMessage(), e.getClass().getTypeName(), e); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); } finally { + // regardless of whether the activity ignores cancellation using `onCancel` or continued, + // shutdown at last activityExecutor.shutdown(); heartbeatExecutor.shutdown(); } diff --git a/core/src/main/java/io/temporal/samples/hello/HelloCancellationScopeMultithreadHeartbeat.java b/core/src/main/java/io/temporal/samples/hello/HelloCancellationScopeMultithreadHeartbeat.java index 535910908..fe8c3f28f 100644 --- a/core/src/main/java/io/temporal/samples/hello/HelloCancellationScopeMultithreadHeartbeat.java +++ b/core/src/main/java/io/temporal/samples/hello/HelloCancellationScopeMultithreadHeartbeat.java @@ -23,6 +23,7 @@ import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; import io.temporal.failure.ActivityFailure; +import io.temporal.failure.ApplicationFailure; import io.temporal.failure.CanceledFailure; import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.worker.Worker; @@ -205,6 +206,7 @@ public String call() throws Exception { + mockActivityTimeSecs + " seconds on thread " + Thread.currentThread().getName()); + // spoof some long-running work that never iterates to allow us to heartbeat Thread.sleep(TimeUnit.SECONDS.toMillis(mockActivityTimeSecs)); LOGGER.info( "GREETER (" + greeting + ") awakened after " + mockActivityTimeSecs + " seconds"); @@ -240,10 +242,11 @@ public String composeGreeting(String greeting, String name) { var greeter = new Greeter(activityDurationSecs, greeting, name); try { return HeartbeatUtils.withBackgroundHeartbeatAndActivity( - null, greeter, Activity::getExecutionContext, 4); - } catch (ExecutionException e) { - LOGGER.error("Caught ExecutionException", e); - return "NO SOUP FOR YOU"; + Activity::getExecutionContext, greeter, 4, (unused) -> false); + } catch (ApplicationFailure e) { + LOGGER.error( + "Caught Exception but rethrowing to show activities as failed due to cancellation", e); + throw e; } } } From 5e34bb8603e4b3537e3c48bde56951e19814d638 Mon Sep 17 00:00:00 2001 From: Mike Nichols Date: Wed, 17 Jul 2024 17:25:44 -0400 Subject: [PATCH 5/5] rm unused file --- .../samples/custom_activity/Main.java | 140 ------------------ 1 file changed, 140 deletions(-) delete mode 100644 core/src/main/java/io/temporal/samples/custom_activity/Main.java diff --git a/core/src/main/java/io/temporal/samples/custom_activity/Main.java b/core/src/main/java/io/temporal/samples/custom_activity/Main.java deleted file mode 100644 index c0b147969..000000000 --- a/core/src/main/java/io/temporal/samples/custom_activity/Main.java +++ /dev/null @@ -1,140 +0,0 @@ -package io.temporal.samples.custom_activity; - -import io.temporal.activity.*; -import io.temporal.client.ActivityCompletionException; -import io.temporal.client.WorkflowClient; -import io.temporal.client.WorkflowOptions; -import io.temporal.serviceclient.WorkflowServiceStubs; -import io.temporal.serviceclient.WorkflowServiceStubsOptions; -import io.temporal.worker.Worker; -import io.temporal.worker.WorkerFactory; -import io.temporal.workflow.*; -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -public class Main { - static final String TASK_QUEUE = "CustomActivityTaskQueue"; - static final String WORKFLOW_ID = "workflow_id_" + UUID.randomUUID(); - - public static void main(String[] args) { - WorkflowServiceStubs service = - WorkflowServiceStubs.newServiceStubs( - WorkflowServiceStubsOptions.newBuilder().setTarget("127.0.0.1:7233").build()); - WorkflowClient client = WorkflowClient.newInstance(service); - WorkerFactory factory = WorkerFactory.newInstance(client); - Worker worker = factory.newWorker(TASK_QUEUE); - worker.registerWorkflowImplementationTypes(CustomWorkflowImpl.class); - worker.registerActivitiesImplementations(new CustomActivityImpl()); - factory.start(); - CustomWorkflow workflow = - client.newWorkflowStub( - CustomWorkflow.class, - WorkflowOptions.newBuilder() - .setWorkflowId(WORKFLOW_ID) - .setTaskQueue(TASK_QUEUE) - .build()); - - String result = workflow.doSomeWork(); - System.out.println(result); - try { - // pause and make sure we dont have a "rogue thread" that is still executing... - Thread.sleep(3000); - } catch (InterruptedException e) { - System.out.println(e.getMessage()); - } - System.exit(0); - } - - @ActivityInterface - public interface CustomActivity { - @ActivityMethod - String doSomeWork(); - } - - @WorkflowInterface - public interface CustomWorkflow { - @WorkflowMethod - String doSomeWork(); - } - - // https://aozturk.medium.com/how-to-handle-uncaught-exceptions-in-java-abf819347906 - public static class CustomActivityImpl implements CustomActivity { - - @Override - public String doSomeWork() { - Instant exitAt = Instant.now().plus(Duration.ofHours(1)); - var executionContext = Activity.getExecutionContext(); - final ScheduledExecutorService scheduledExecutor = - Executors.newSingleThreadScheduledExecutor(); - try { - var unused = - scheduledExecutor.scheduleAtFixedRate( - () -> { - try { - System.out.println("sending heartbeat"); - executionContext.heartbeat(1); - } catch (ActivityCompletionException e) { - System.out.println("activitycompletionexception " + e.getMessage()); - throw e; - } - }, - 0, - 1, - TimeUnit.SECONDS); - while (Instant.now().isBefore(exitAt)) { - sleep(Duration.ofSeconds(5)); - } - return "Done"; - } finally { - System.out.println("shutting down heartbeat thread"); - scheduledExecutor.shutdown(); - } - } - - private void sleep(Duration duration) { - try { - System.out.println("Sleeping for " + duration); - Thread.sleep(duration.toMillis()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } - } - - public static class CustomWorkflowImpl implements CustomWorkflow { - private final CustomActivity customActivity = - Workflow.newActivityStub( - CustomActivity.class, - ActivityOptions.newBuilder() - .setTaskQueue(TASK_QUEUE) - .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) - .setHeartbeatTimeout(Duration.ofSeconds(2)) - .setStartToCloseTimeout(Duration.ofHours(1)) - .build()); - - @Override - public String doSomeWork() { - List> results = new ArrayList<>(1); - - CancellationScope scope = - Workflow.newCancellationScope( - () -> { - results.add(Async.function(customActivity::doSomeWork)); - }); - scope.run(); - Workflow.sleep(3000); - scope.cancel(); - System.out.println("failure " + results.get(0).getFailure().getMessage()); - // String result = Promise.anyOf(results).get(); - - return "Cancellation worked"; - } - } -}