Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be a static inner class if only used in the one place. Every other class in this package is a standalone sample.

Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package io.temporal.samples.hello;

import io.temporal.activity.ActivityExecutionContext;
import io.temporal.client.ActivityCompletionException;
import io.temporal.failure.ApplicationFailure;
import io.temporal.failure.CanceledFailure;
import java.text.MessageFormat;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.slf4j.LoggerFactory;

public class HeartbeatUtils {
// private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatUtils.class);

// withBackgroundHeartbeatAndActivity runs the underlying activity Callable in a thread and
// heartbeats in another thread
// Cancellation can be ignored by returning `true` from the `shouldIgnoreCancel` predicate,
// otherwise the
// activity Callable is cancelled and a Cancellation failure is thrown.
// Callers should handle `ApplicationFailure` if you are allowing cancellation and determine
// if you want to exit the Activity with or without the failure bubbling up to the Workflow.
public static <T> T withBackgroundHeartbeatAndActivity(
Copy link
Member

@cretz cretz Jul 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there may be a few issues here, like:

  • Not reusing the executor service
  • Starting a second thread unnecessarily
  • Not cancelling heartbeater when activity done and maybe sending extra RPC unnecessarily
  • Not sure there is value in heartbeating after server sent a cancellation, would have to check
  • Could use the heartbeat timeout by default to derive interval
  • Force wrapping user exception in a runtime exception
  • Maybe a few more, didn't check

Here's an untested (so probably doesn't even compile) alternative approach:

public class AutoHeartbeater implements AutoCloseable {
  private final ScheduledExecutorService scheduler;

  public AutoHeartbeater(ScheduledExecutorService scheduler) {
    this.scheduler = scheduler;
  }

  public void close() {
    scheduler.shutdown();
  }

  public class Run implements AutoCloseable {
    private final ScheduledFuture<?> ticker;
    private final Object cancelLock = new Object();
    private ActivityCompletionException cancel;
    private Consumer<ActivityCompletionException> cancelConsumer;

    public Run() {
      this((Consumer<ActivityCompletionException>)null);
    }

    public Run(Consumer<ActivityCompletionException> onCancel) {
      this(Activity.getExecutionContext().getInfo().getHeartbeatTimeout().dividedBy(2), onCancel);
    }

    public Run(Duration interval) {
      this(interval, null);
    }

    public Run(Duration interval, Consumer<ActivityCompletionException> onCancel) {
      if (onCancel == null) {
        var thread = Thread.currentThread();
        onCancel = ignore -> thread.interrupt();
      }
      this.onCancel = onCancel;
      var context = Activity.getExecutionContext();
      ticker = scheduler.scheduleAtFixedRate(() -> {
        try {
          context.heartbeat(null);
        } catch (ActivityCompletionException e) {
          Consumer<ActivityCompletionException> localCancelConsumer;
          synchronized (cancelLock) {
            localCancelConsumer = cancelConsumer;
            cancel = e;
          }
          // There's technically a race here I'd have to think about where close's setOnCancel(null) ran
          // but this has the old one so it could interrupt a thread too late. This is bad and can be
          // avoided by synchronizing the cancel call itself, just didn't have time to change the sample.
          if (localCancelConsumer != null) {
            localCancelConsumer.accept(e);
          }
          throw e;
        }
      }, 0, interval.toMillis(), TimeUnit.MILLISECONDS);
    }

    public ActivityCompletionException getCancel() {
      synchronized (cancelLock) {
        return cancel;
      }
    }

    // May be invoked immediately if a cancel has occurred
    public void setOnCancel(Consumer<ActivityCompletionException> onCancel) {
      ActivityCompletionException localCancel;
      synchronized (cancelLock) {
        localCancel = cancel;
        this.onCancel = onCancel;
      }
      if (localCancel != null && onCancel != null) {
        onCancel.accept(localCancel);
      }
    }

    public void close() {
      setOnCancel(null);
      ticker.cancel(false);
      scheduler.shutdown();
    }
  }
}

I haven't tested it so it may not work, but here's how you'd use it:

public static class GreetingActivitiesImpl implements GreetingActivities {
  private final AutoHeartbeater heartbeater;

  public GreetingActivitiesImpl(AutoHeartbeater heartbeater) {
    this.heartbeater = heartbeater;
  }

  @Override
  public String composeGreeting(String greeting, String name) {
    try (var run = heartbeater.new Run()) {
      doStuff();
    }
  }
}

Of course it can be adapted, this is just an idea.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oooo...I was looking for something like try-with-resources and didn't know Java had that. Your solution looks like it is headed in a much more elegant direction.
If a cancellation is rejected by an activity I'd need to keep heartbeating lest the server actually will fail the activity task due to heartbeat timeout. Not sure this implementation allows that..I need to digest it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this implementation allows that..I need to digest it.

It doesn't, but a simple option to run (or a heartbeater.runWithoutCancel() or something) can be added.

final Supplier<ActivityExecutionContext> activityContext,
final Callable<T> callable,
final int heartbeatIntervalSeconds,
final Predicate<Callable<T>> shouldIgnoreCancel)
throws CanceledFailure {

var context = activityContext.get();
var logger =
LoggerFactory.getLogger(
MessageFormat.format(
"{0}/{1}", HeartbeatUtils.class.getName(), context.getInfo().getActivityId()));
final ScheduledExecutorService heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
final ScheduledExecutorService activityExecutor = Executors.newSingleThreadScheduledExecutor();
var activityInvocation = activityExecutor.schedule(callable, 0, TimeUnit.SECONDS);
final AtomicReference<Runnable> canceller =
new AtomicReference<>(
() -> {
logger.warn("canceller is running...");
activityInvocation.cancel(true);
});

var unused =
heartbeatExecutor.scheduleAtFixedRate(
() -> {
try {
logger.info("heartbeating...");
context.heartbeat(null);
} catch (ActivityCompletionException e) {
logger.warn("received cancellation", e);
try {
if (shouldIgnoreCancel == null || !shouldIgnoreCancel.test(callable)) {
// cancellation should be accepted so cancel the invocation and rethrow the e
canceller.get().run();
throw e;
} else {
logger.warn("Activity Cancellation ignored so keep heartbeating...");
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
},
0,
heartbeatIntervalSeconds,
TimeUnit.SECONDS);

try {
return activityInvocation.get();
} catch (CancellationException e) {
logger.warn("Canceled activity invocation", e);
// Opinionated way to keep Workflow from retrying this activity that is no longer going to
// heartbeat.
// if we don't returning a "non-retryable" failure, you will see Heartbeat timeout failures
// but really
// we want to communicate that the activity has been canceled and allow the caller to handle
// the exception.
// We could just rethrow the CancellationException here but then every user of this utility
// would have to convert to a nonretryable error.
throw ApplicationFailure.newNonRetryableFailureWithCause(
e.getMessage(), e.getClass().getTypeName(), e);
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
} finally {
// regardless of whether the activity ignores cancellation using `onCancel` or continued,
// shutdown at last
activityExecutor.shutdown();
heartbeatExecutor.shutdown();
}
}
}
Loading