From fcd56c1d028af7703663eb8efe8881fc4653c826 Mon Sep 17 00:00:00 2001 From: Jose Gomez Date: Thu, 23 Oct 2025 18:05:22 -0500 Subject: [PATCH] feat: ensure we can async-cancel all async types supported --- .cljfmt.edn | 8 + .github/workflows/clojure.yml | 7 +- .mise.toml | 1 + CHANGELOG.md | 7 + Makefile | 6 + README.md | 424 ++++++++++++++++++++++++++-- deps.edn | 15 +- pom.xml | 11 +- src/futurama/core.clj | 503 +++++++++++++++++++--------------- src/futurama/impl.clj | 58 ++-- src/futurama/state.clj | 36 +-- test/futurama/core_test.clj | 195 ++++++++----- test/futurama/test_setup.clj | 2 +- 13 files changed, 891 insertions(+), 382 deletions(-) create mode 100644 .cljfmt.edn diff --git a/.cljfmt.edn b/.cljfmt.edn new file mode 100644 index 0000000..6421434 --- /dev/null +++ b/.cljfmt.edn @@ -0,0 +1,8 @@ +{:legacy/merge-indents? true + :indents {cond [[:inner 0]] + fn* [[:inner 0]] + futurama.core/async [[:inner 0]] + futurama.core/thread [[:inner 0]] + futurama.core/async! [[:inner 0]] + futurama.core/thread! [[:inner 0]] + futurama.impl/async-dispatch-task-handler [[:inner 0]]}} diff --git a/.github/workflows/clojure.yml b/.github/workflows/clojure.yml index 4abfbf4..b88a26d 100644 --- a/.github/workflows/clojure.yml +++ b/.github/workflows/clojure.yml @@ -12,9 +12,10 @@ jobs: test: strategy: matrix: - java_version: [corretto-11,corretto-21] - test_clojure_alias: [clojure-1.10, clojure-1.11, clojure-1.12] - test_core_async_alias: [core.async-1.6, core.async-1.7, core.async-1.8] + java_version: [corretto-24] + test_clojure_alias: [clojure-1.12] + test_core_async_alias: [core.async-1.8, core.async-1.9] + fail-fast: false runs-on: ubuntu-latest steps: - name: Checkout code diff --git a/.mise.toml b/.mise.toml index d323733..df03a97 100644 --- a/.mise.toml +++ b/.mise.toml @@ -3,3 +3,4 @@ clojure = "https://github.com/asdf-community/asdf-clojure.git" [tools] clojure = "1.12" +java = "graalvm-22.3.1+java11" diff --git a/CHANGELOG.md b/CHANGELOG.md index 7be160a..55ff060 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ This is a history of changes to k13labs/futurama +# 1.4.0 +* **Enhanced Cancellation**: Improved cancellation support now works across all async types including core.async channels +* **State Management**: Replaced WeakHashMap with Caffeine cache for lock-free, thread-safe cancellation state tracking +* **Dependencies**: Added Caffeine 3.2.2, updated Clojure to 1.12.3, core.async to 1.8.741 (with 1.9.829-alpha2 support) +* **CI Updates**: Test matrix now covers Clojure 1.12 with core.async 1.8/1.9 on Java 11 & 24 (library remains backwards compatible) +* **Improvements**: Channel factories use identity exception handler for robust error handling, consistent protocol return values + # 1.3.1 * update core async version to latest `1.8.735` diff --git a/Makefile b/Makefile index 0258637..d32be1b 100644 --- a/Makefile +++ b/Makefile @@ -17,10 +17,16 @@ env: repl: clojure -M:$(REPL_CLOJURE_ALIAS):$(REPL_CORE_ASYNC_ALIAS):dev:test:app:repl +repl-next: REPL_CORE_ASYNC_ALIAS := core.async-1.9 +repl-next: repl + test: clojure -M:$(TEST_CLOJURE_ALIAS):$(TEST_CORE_ASYNC_ALIAS):dev:test:app:runner \ --focus :unit --reporter kaocha.report/documentation --no-capture-output +test-next: TEST_CORE_ASYNC_ALIAS := core.async-1.9 +test-next: test + clean: rm -rf target build diff --git a/README.md b/README.md index e6a0d03..3e700b1 100644 --- a/README.md +++ b/README.md @@ -1,38 +1,100 @@ [![Build Status](https://github.com/k13labs/futurama/actions/workflows/clojure.yml/badge.svg)](https://github.com/k13labs/futurama/actions/workflows/clojure.yml) -# _About_ +# Futurama -Futurama is a Clojure library for more deeply integrating async abstractions in the Clojure and JVM ecosystem with Clojure [core.async](https://github.com/clojure/core.async). +Futurama is a Clojure library that deeply integrates diverse async abstractions across the Clojure and JVM ecosystem with [core.async](https://github.com/clojure/core.async). -It adds support for [CompletableFuture](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html) and [IDeferred](https://github.com/clj-commons/manifold/blob/master/src/manifold/deferred.clj) to be used in approximately the same fashion as Clojure [core.async](https://github.com/clojure/core.async) channels, and for [Future](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html) and [IDeref](https://github.com/clojure/clojure/blob/master/src/jvm/clojure/lang/IDeref.java) to be read in a similar manner as well. +It provides a unified interface for working with multiple async types: +- **core.async** channels +- **CompletableFuture** (Java) +- **Future** (Java) +- **Manifold Deferred** +- **IDeref** types (promises, delays, atoms, refs) -# _Usage_ +All these types can be consumed and composed using a consistent API, making it easy to work with heterogeneous async code. -Here's a simple example. +## Features + +- **Unified Async Operations**: Use `! {:status "ok"} + +;; Use inside async blocks (async - (println (! prints out: "laundry: done" + (let [result (! returns async result with value 55 +``` + +## Usage Examples + +### Basic Async Operations + +```clj +(require '[futurama.core :refer [async thread async-for ! "completed" + +;; Create a thread operation (runs on thread pool) +(def heavy-work + (thread + (Thread/sleep 100) + (* 42 42))) + +(! 1764 +``` + +### Async For Comprehensions +```clj +;; Sequential evaluation (items processed one by one) (! returns `[[1 1 2 4] [1 3 4 8] [3 1 4 8] [3 3 6 12]]` and takes slightly over 200ms total time. +;;=> [[1 1 2 4] [1 3 4 8] [3 1 4 8] [3 3 6 12]] +;; Takes ~200ms (4 items × 50ms each) +;; Concurrent evaluation (items processed in parallel) (! returns `[[1 1 2 4] [1 3 4 8] [3 1 4 8] [3 3 6 12]]` evaluated fully async and takes slightly over 50ms total time. +;;=> [[1 1 2 4] [1 3 4 8] [3 1 4 8] [3 3 6 12]] +;; Takes ~50ms (all items processed concurrently) +``` + +### Cancellation + +Futurama supports cancellation of async operations across all supported types. When an operation is cancelled, it receives a `CancellationException`. + +```clj +(require '[futurama.core :refer [async async-cancel! async-cancelled? + async-cancellable? thread]]) + +;; Cancel a long-running operation +(def work + (async + (loop [i 0] + (when-not (async-cancelled?) + (! true + +;; Cancel it +(async-cancel! work) + +;; Check if it was cancelled +(async-cancelled? work) +;;=> true +``` + +#### Cooperative Cancellation + +For best results, long-running operations should cooperatively check for cancellation: + +```clj +(defn cancellable-work [items] + (async + (loop [remaining items + results []] + (if (or (empty? remaining) (async-cancelled?)) + results + (let [item (first remaining) + result (! returns async result with value 63 +``` + +### Thread Pool Management + +Route work to appropriate thread pools for optimal performance: + +```clj +;; Use :io pool for I/O-bound work (default for async) +(async :io + (! [0 2 4 6 8] + +;; async-reduce: Reduce with async operations +(! 45 + +;; async-some: Find first truthy result +(! x 5) x))) + (range 10))) +;;=> 6 + +;; async-every?: Check if all satisfy predicate +(! true ``` -See the existing tests for more examples. +### Configuring Async Factories + +Control what type of async result is returned: + +```clj +(require '[futurama.core :refer [set-async-factory! set-thread-factory! + async-future-factory async-channel-factory + async-promise-factory async-deferred-factory]]) + +;; Use CompletableFuture for async operations +(set-async-factory! async-future-factory) + +(type (async "hello")) +;;=> java.util.concurrent.CompletableFuture -# _Building_ +;; Use Manifold Deferred +(set-async-factory! async-deferred-factory) -Futurama is built, tested, and deployed using [Clojure Tools Deps](https://clojure.org/guides/deps_and_cli). +(type (async "hello")) +;;=> manifold.deferred.Deferred -GNU Make is used to simplify invocation of some commands. +;; Reset to default (core.async promise-chan) +(set-async-factory! nil) +``` + +See the [tests](test/futurama/core_test.clj) for more examples. -# _Availability_ +## Installation -Futurama releases for this project are on [Clojars](https://clojars.org/). Simply add the following to your project: +Add Futurama to your project dependencies: [![Clojars Project](https://clojars.org/com.github.k13labs/futurama/latest-version.svg)](http://clojars.org/com.github.k13labs/futurama) -# _Communication_ +**deps.edn** +```clojure +{:deps {com.github.k13labs/futurama {:mvn/version "1.4.0"}}} +``` + +**Leiningen project.clj** +```clojure +[com.github.k13labs/futurama "1.4.0"] +``` + +## Requirements + +- **Clojure**: 1.11+ (backwards compatible with 1.10+) +- **Java**: 11+ +- **core.async**: 1.8+ + +## API Reference + +### Core Operations + +| Function | Description | +|----------|-------------| +| `!` | Thread-first with async operations | +| `async->>` | Thread-last with async operations | + +### Thread Pool Management + +| Function/Macro | Description | +|----------------|-------------| +| `with-pool` | Execute body with specified thread pool | +| `get-pool` | Get thread pool for workload type (`:io`, `:compute`, `:mixed`) | + +### Factory Configuration + +| Function | Description | +|----------|-------------| +| `set-async-factory!` | Set global factory for `async` macro results | +| `set-thread-factory!` | Set global factory for `thread` macro results | +| `with-async-factory` | Temporarily bind async factory | +| `with-thread-factory` | Temporarily bind thread factory | +| `async-channel-factory` | Creates core.async channel (buffer size 1) | +| `async-promise-factory` | Creates core.async promise-chan (default) | +| `async-future-factory` | Creates CompletableFuture | +| `async-deferred-factory` | Creates Manifold Deferred | + +## Advanced Configuration + +### Custom Thread Pools + +Configure application-wide thread pools using the Java system property `futurama.executor-factory`: + +```clojure +;; Use core.async's thread pool +(System/setProperty "futurama.executor-factory" + "clojure.core.async.impl.dispatch/executor-for") +``` + +The factory function receives a keyword (`:io`, `:compute`, `:mixed`) and should return an `ExecutorService` or `nil` to use defaults. + +### Exception Handling + +Futurama treats exceptions as values. Uncaught exceptions are: +1. Captured and returned over the async channel/future +2. Re-thrown when read via `!com.github.k13labs futurama futurama - 1.3.1 + 1.4.0 - 1.3.1 + 1.4.0 https://github.com/k13labs/futurama scm:git:git://github.com/k13labs/futurama.git scm:git:ssh://git@github.com/k13labs/futurama.git @@ -22,13 +22,18 @@ org.clojure clojure - 1.12.0 + 1.12.3 manifold manifold 0.4.3 + + com.github.ben-manes.caffeine + caffeine + 3.2.2 + org.clojure core.async diff --git a/src/futurama/core.clj b/src/futurama/core.clj index 2f3581e..57fb7e4 100644 --- a/src/futurama/core.clj +++ b/src/futurama/core.clj @@ -49,6 +49,7 @@ go block threads - use Thread.setDefaultUncaughtExceptionHandler() to catch and handle." (:require [clojure.core.async :as async] + [clojure.core.async.impl.dispatch :as run-impl] [clojure.core.async.impl.protocols :as core-impl] [clojure.core.async.impl.channels :refer [box]] [clojure.core.async.impl.ioc-macros :as rt] @@ -58,10 +59,12 @@ [manifold.deferred :as d]) (:import [clojure.lang Var IDeref IPending IFn IAtom IRef] [clojure.core.async.impl.channels ManyToManyChannel] + [clojure.core.async.impl.buffers PromiseBuffer] [java.util.concurrent CompletableFuture CompletionException ExecutionException + CancellationException ExecutorService ForkJoinPool Future] @@ -76,12 +79,12 @@ (defn async-channel-factory "Creates a core async channel of size 1" ^ManyToManyChannel [] - (async/chan 1)) + (async/chan 1 nil identity)) (defn async-promise-factory "Creates a core async promise channel" ^ManyToManyChannel [] - (async/promise-chan)) + (async/promise-chan nil identity)) (defn async-future-factory "Creates a new CompletableFuture" @@ -187,15 +190,24 @@ (def ^:no-doc rte rethrow-exception) +(defn- set-cancel-state! + [x] + (state/set-cancel-state! x ASYNC_CANCELLED)) + +(defn- get-cancel-state + [x] + (= (state/get-cancel-state x) ASYNC_CANCELLED)) + (defn async-cancellable? - "Determines if v satisfies? `AsyncCancellable`" + "Determines if v can be cancelled" [v] (satisfies? impl/AsyncCancellable v)) (defn async-cancel! "Cancels the async item." [item] - (impl/cancel! item)) + (when (async-cancellable? item) + (impl/cancel! item))) (defn async-cancelled? "Checks if the current executing async item or one of its parents or provided item has been cancelled. @@ -208,13 +220,20 @@ (some async-cancelled? state/*items*) false)) ([item] - (impl/cancelled? item))) + (when (satisfies? impl/AsyncCancellable item) + (impl/cancelled? item)))) (defn async-completed? "Checks if the provided `AsyncCompletableReader` instance is completed?" [x] - (when (satisfies? impl/AsyncCompletableReader x) - (impl/completed? x))) + (cond + (satisfies? impl/AsyncCompletableReader x) + (impl/completed? x) + + (satisfies? core-impl/Channel x) + (core-impl/closed? x) + + :else false)) (defn async? "returns true if v instance satisfies? core.async's `ReadPort`" @@ -225,29 +244,23 @@ "Asynchronously invokes the body inside a pooled thread and return over a write port, preserves the current thread binding frame, the pool used can be specified via `*thread-pool*`." [pool port & body] - `(let [pool# ~pool + `(let [pool# (or ~pool *thread-pool* (get-pool :mixed)) port# ~port] (state/push-item port# (let [binding-frame# (Var/cloneThreadBindingFrame)] - (impl/async-dispatch-task-handler - (or pool# - *thread-pool* - (get-pool :mixed)) - port# - (^:once - fn* - [] - (let [thread-frame# (Var/getThreadBindingFrame)] - (Var/resetThreadBindingFrame binding-frame#) - (try - (let [res# (do ~@body)] - (when (some? res#) - (async/>!! port# res#))) - (catch Throwable ~'e - (async/>!! port# (unwrap-exception ~'e))) - (finally - (async/close! port#) - (Var/resetThreadBindingFrame thread-frame#)))))))))) + (impl/async-dispatch-task-handler pool# port# + (^:once fn* [] + (let [thread-frame# (Var/getThreadBindingFrame)] + (Var/resetThreadBindingFrame binding-frame#) + (try + (let [~'res (do ~@body)] + (when (some? ~'res) + (async/>!! port# ~'res))) + (catch Throwable ~'e + (async/>!! port# (unwrap-exception ~'e))) + (finally + (async/close! port#) + (Var/resetThreadBindingFrame thread-frame#)))))))))) (defmacro thread "Asynchronously invokes the body in a pooled thread, preserves the current thread binding frame, @@ -266,171 +279,6 @@ (thread-factory) ~@body))) -(extend-protocol impl/AsyncCancellable - Object - (cancel! [this] - (state/set-global-state! this ASYNC_CANCELLED) - true) - (cancelled? [this] - (= (state/get-global-state this) ASYNC_CANCELLED)) - - Future - (cancel! [this] - (state/set-global-state! this ASYNC_CANCELLED) - (future-cancel this)) - (cancelled? [this] - (or (future-cancelled? this) - (= (state/get-global-state this) ASYNC_CANCELLED) - false))) - -(extend-type Future - core-impl/ReadPort - (take! [x handler] - (impl/async-read-port-take! x handler)) - - impl/AsyncCompletableReader - (get! [fut] - (try - (.get ^Future fut) - (catch Throwable e - e))) - (completed? [fut] - (.isDone ^Future fut)) - (on-complete [fut f] - (async/thread - (let [r (impl/get! fut)] - (f r)))) - - core-impl/Channel - (close! [x] - (when-not (async-completed? x) - (async-cancel! x))) - (closed? [x] - (async-completed? x))) - -(extend-protocol impl/AsyncCompletableWriter - IFn - (complete! [f v] - (boolean (f v))) - - IAtom - (complete! [a v] - (reset! a v) - true) - - IRef - (complete! [a v] - (dosync - (ref-set a v)) - true)) - -(extend-protocol core-impl/WritePort - IFn - (put! [x val handler] - (impl/async-write-port-put! x val handler)) - - IAtom - (put! [x val handler] - (impl/async-write-port-put! x val handler)) - - IRef - (put! [x val handler] - (impl/async-write-port-put! x val handler))) - -(extend-type IDeref - core-impl/ReadPort - (take! [x handler] - (impl/async-read-port-take! x handler)) - - impl/AsyncCompletableReader - (get! [ref] - (try - (deref ref) - (catch Throwable e - e))) - (completed? [ref] - (if (instance? IPending ref) - (realized? ref) - true)) - (on-complete [ref f] - (async/thread - (let [r (impl/get! ref)] - (f r)))) - - core-impl/Channel - (close! [ref] - (when (instance? IFn ref) - (ref nil))) - (closed? [ref] - (impl/completed? ref))) - -(extend-type CompletableFuture - core-impl/ReadPort - (take! [x handler] - (impl/async-read-port-take! x handler)) - - core-impl/WritePort - (put! [x val handler] - (impl/async-write-port-put! x val handler)) - - impl/AsyncCompletableReader - (get! [fut] - (try - (.get ^CompletableFuture fut) - (catch Throwable e - e))) - (completed? [fut] - (.isDone ^CompletableFuture fut)) - (on-complete [fut f] - (let [^BiConsumer invoke-cb (impl/->JavaBiConsumer - (fn [val ex] - (f (or ex val))))] - (.whenComplete ^CompletableFuture fut ^BiConsumer invoke-cb))) - - impl/AsyncCompletableWriter - (complete! [fut v] - (if (instance? Throwable v) - (.completeExceptionally ^CompletableFuture fut ^Throwable v) - (.complete ^CompletableFuture fut v))) - - core-impl/Channel - (close! [fut] - (.complete ^CompletableFuture fut nil)) - (closed? [fut] - (.isDone ^CompletableFuture fut))) - -(extend-type Deferred - core-impl/ReadPort - (take! [x handler] - (impl/async-read-port-take! x handler)) - - core-impl/WritePort - (put! [x val handler] - (impl/async-write-port-put! x val handler)) - - impl/AsyncCompletableReader - (get! [dfd] - (try - (deref dfd) - (catch Throwable e - e))) - (completed? [dfd] - (d/realized? dfd)) - (on-complete [dfd f] - (d/on-realized dfd f f)) - - impl/AsyncCompletableWriter - (complete! [dfd v] - (if (instance? Throwable v) - (d/error! dfd v) - (d/success! dfd v))) - - core-impl/Channel - (close! [dfd] - (d/success! dfd nil)) - (closed? [dfd] - (d/realized? dfd))) - (defmacro async! "Asynchronously executes the body, returning `port` immediately to te calling thread. Additionally, any visible calls to !! and alt!/alts! @@ -449,31 +297,27 @@ completed; the pool used can be specified via `*thread-pool*` binding." [pool port & body] (let [crossing-env (zipmap (keys &env) (repeatedly gensym))] - `(let [pool# ~pool + `(let [pool# (or ~pool *thread-pool* (get-pool :io)) port# ~port] (state/push-item port# (let [captured-bindings# (Var/getThreadBindingFrame)] - (impl/async-dispatch-task-handler - (or pool# - *thread-pool* - (get-pool :io)) - port# - (^:once - fn* - [] - (let [~@(mapcat - (fn [[l sym]] - [sym `(^:once fn* [] ~(vary-meta l dissoc :tag))]) - crossing-env) - f# ~(impl/async-state-machine - `(try - ~@body - (catch Throwable ~'e - (unwrap-exception ~'e))) 1 [crossing-env &env] rt/async-custom-terminators) - state# (-> (f#) - (rt/aset-all! rt/USER-START-IDX port# - rt/BINDINGS-IDX captured-bindings#))] - (rt/run-state-machine-wrapped state#))))))))) + (impl/async-dispatch-task-handler pool# port# + (^:once fn* [] + (run-impl/with-dispatch-thread-marking + (let [~@(mapcat + (fn [[l sym]] + [sym `(^:once fn* [] + ~(vary-meta l dissoc :tag))]) + crossing-env) + f# ~(impl/async-state-machine + `(try + ~@body + (catch Throwable ~'e + (unwrap-exception ~'e))) 1 [crossing-env &env] rt/async-custom-terminators) + state# (-> (f#) + (rt/aset-all! rt/USER-START-IDX port# + rt/BINDINGS-IDX captured-bindings#))] + (rt/run-state-machine-wrapped state#)))))))))) (defmacro async "Asynchronously executes the body, returning immediately to the @@ -815,3 +659,234 @@ "Like postwalk, but does pre-order traversal." [f form] (async-walk (partial async-prewalk f) identity (f form))) + +;;; Begin protocol implementations + +(extend-type Future + core-impl/ReadPort + (take! [x handler] + (impl/async-read-port-take! x handler)) + + impl/AsyncCompletableReader + (get! [fut] + (try + (.get ^Future fut) + (catch Throwable e + e))) + (completed? [fut] + (.isDone ^Future fut)) + (on-complete [fut f] + (async/thread + (let [r (impl/get! fut)] + (f r))) + fut) + + impl/AsyncCancellable + (cancel! [fut] + (set-cancel-state! fut) + (future-cancel fut)) + (on-cancel-interrupt [fut _] + fut) + (cancelled? [fut] + (or (future-cancelled? fut) + (get-cancel-state fut))) + + core-impl/Channel + (close! [fut] + (when-not (impl/completed? fut) + (impl/cancel! fut)) + nil) + (closed? [fut] + (impl/completed? fut))) + +(extend-protocol impl/AsyncCompletableWriter + IFn + (complete! [f v] + (boolean (f v))) + + IAtom + (complete! [a v] + (reset! a v) + true) + + IRef + (complete! [a v] + (dosync + (ref-set a v)) + true)) + +(extend-protocol core-impl/WritePort + IFn + (put! [x val handler] + (impl/async-write-port-put! x val handler)) + + IAtom + (put! [x val handler] + (impl/async-write-port-put! x val handler)) + + IRef + (put! [x val handler] + (impl/async-write-port-put! x val handler))) + +(extend-type IDeref + core-impl/ReadPort + (take! [x handler] + (impl/async-read-port-take! x handler)) + + impl/AsyncCompletableReader + (get! [ref] + (try + (deref ref) + (catch Throwable e + e))) + (completed? [ref] + (if (instance? IPending ref) + (realized? ref) + true)) + (on-complete [ref f] + (async/thread + (let [r (impl/get! ref)] + (f r))) + ref) + + impl/AsyncCancellable + (cancel! [ref] + (set-cancel-state! ref) + (if (and (instance? IFn ref) + (not (impl/completed? ref))) + (do + (ref (CancellationException. "Task cancelled")) + true) + false)) + (on-cancel-interrupt [ref _] + ref) + (cancelled? [ref] + (get-cancel-state ref)) + + core-impl/Channel + (close! [ref] + (when (and (instance? IFn ref) + (not (impl/completed? ref))) + (ref nil)) + nil) + (closed? [ref] + (impl/completed? ref))) + +(extend-type CompletableFuture + core-impl/ReadPort + (take! [x handler] + (impl/async-read-port-take! x handler)) + + core-impl/WritePort + (put! [x val handler] + (impl/async-write-port-put! x val handler)) + + impl/AsyncCompletableReader + (get! [fut] + (try + (.get ^CompletableFuture fut) + (catch Throwable e + e))) + (completed? [fut] + (.isDone ^CompletableFuture fut)) + (on-complete [fut f] + (let [^BiConsumer invoke-cb (impl/->JavaBiConsumer + (fn [val ex] + (f (or ex val))))] + (.whenComplete ^CompletableFuture fut ^BiConsumer invoke-cb)) + fut) + + impl/AsyncCompletableWriter + (complete! [fut v] + (if (instance? Throwable v) + (.completeExceptionally ^CompletableFuture fut ^Throwable v) + (.complete ^CompletableFuture fut v))) + + impl/AsyncCancellable + (on-cancel-interrupt [fut fut'] + (let [^BiConsumer invoke-cb (impl/->JavaBiConsumer + (fn [_ _] + (future-cancel fut')))] ;;; cancel any linked future + (.whenComplete ^CompletableFuture fut ^BiConsumer invoke-cb) + fut)) + (cancel! [fut] + (set-cancel-state! fut) + (future-cancel fut)) + (cancelled? [fut] + (or (future-cancelled? fut) + (get-cancel-state fut))) + + core-impl/Channel + (close! [fut] + (when-not (impl/completed? fut) + (.complete ^CompletableFuture fut nil)) + nil) + (closed? [fut] + (.isDone ^CompletableFuture fut))) + +(extend-type Deferred + core-impl/ReadPort + (take! [x handler] + (impl/async-read-port-take! x handler)) + + core-impl/WritePort + (put! [x val handler] + (impl/async-write-port-put! x val handler)) + + impl/AsyncCompletableReader + (get! [dfd] + (try + (deref dfd) + (catch Throwable e + e))) + (completed? [dfd] + (d/realized? dfd)) + (on-complete [dfd f] + (d/on-realized dfd f f) + dfd) + + impl/AsyncCompletableWriter + (complete! [dfd v] + (if (instance? Throwable v) + (d/error! dfd v) + (d/success! dfd v))) + + impl/AsyncCancellable + (cancel! [dfd] + (set-cancel-state! dfd) + (if-not (d/realized? dfd) + (d/error! dfd (CancellationException. "Task cancelled")) + false)) + (on-cancel-interrupt [dfd fut] + (let [interrupt-handler (fn [_] + (future-cancel fut))] + (d/on-realized dfd interrupt-handler interrupt-handler) + dfd)) + (cancelled? [dfd] + (get-cancel-state dfd)) + + core-impl/Channel + (close! [dfd] + (d/success! dfd nil) + nil) + (closed? [dfd] + (d/realized? dfd))) + +(extend-type ManyToManyChannel + impl/AsyncCancellable + (cancel! [c] + (set-cancel-state! c) + (if-not (core-impl/closed? c) + (do + (async/put! c (CancellationException. "Task cancelled")) + (async/close! c) + true) + false)) + (on-cancel-interrupt [c fut] + (when (->> (.buf ^ManyToManyChannel c) + (instance? PromiseBuffer)) + (async/take! c (fn [_] + (future-cancel fut)))) + c) + (cancelled? [c] + (get-cancel-state c))) diff --git a/src/futurama/impl.clj b/src/futurama/impl.clj index 6c50113..3eb2feb 100644 --- a/src/futurama/impl.clj +++ b/src/futurama/impl.clj @@ -2,57 +2,49 @@ (:refer-clojure :exclude [realized?]) (:require [clojure.core.async :refer [take!]] + [clojure.core.async.impl.go :as go-impl] [clojure.core.async.impl.channels :refer [box]] [clojure.core.async.impl.protocols :as core-impl]) (:import [java.util.concurrent - CompletableFuture ExecutorService Future] [java.util.concurrent.locks Lock] - [java.util.function BiConsumer Function])) + [java.util.function BiConsumer])) (def async-state-machine - "Use a requiring resolve here to dynamically use the correct implementation and maintain backwards compatibility" - (or (requiring-resolve 'clojure.core.async.impl.ioc-macros/state-machine) - (requiring-resolve 'clojure.core.async.impl.go/state-machine))) - -(deftype JavaFunction [f] - Function - (apply [_ a] - (f a))) - -(deftype JavaBiConsumer [f] - BiConsumer - (accept [_ a b] - (f a b))) + go-impl/state-machine) (defprotocol AsyncCompletableReader - (get! [x]) - (completed? [x]) - (on-complete [x f])) + (get! [x] + "Returns the completed value of this async operation, blocking if the async operation is not yet complete.") + (completed? [x] + "Returns true if this async operation has completed.") + (on-complete [x f] + "Registers a callback f to be called with the completed value when this async operation completes.")) (defprotocol AsyncCompletableWriter - (complete! [x v])) + (complete! [x v] + "Attempts to complete this async operation with value v, returning true if successful, false otherwise.")) (defprotocol AsyncCancellable - (cancel! [this]) - (cancelled? [this])) + (on-cancel-interrupt [this fut] + "Attempts to register a cancellation handler that will be called with the given future when this async operation is cancelled.") + (cancelled? [this] + "Returns true if this async operation has been cancelled.") + (cancel! [this] + "Attempts to cancel this async operation.")) -(defn dispatch - ^Future [^Runnable task ^ExecutorService pool] - (.submit ^ExecutorService pool ^Runnable task)) +(deftype JavaBiConsumer [f] + BiConsumer + (accept [_ a b] + (f a b))) (defn async-dispatch-task-handler - [pool port task] - (let [^Future fut (dispatch task pool)] - (when (instance? CompletableFuture port) - (.exceptionally ^CompletableFuture port - ^Function (->JavaFunction - (fn [t] - (cancel! port) - (future-cancel fut) - (throw t))))) + "Dispatches a task to the given executor service pool, and registers a cancellation handler on the port." + ^Future [^ExecutorService pool port ^Runnable task] + (let [^Future fut (.submit pool task)] + (on-cancel-interrupt port fut) port)) (defn- async-reader-handler* diff --git a/src/futurama/state.clj b/src/futurama/state.clj index d4820e4..8d0e6f9 100644 --- a/src/futurama/state.clj +++ b/src/futurama/state.clj @@ -1,12 +1,16 @@ (ns ^:no-doc futurama.state - (:import [java.util Map WeakHashMap] - [java.util.concurrent.locks Lock ReadWriteLock ReentrantReadWriteLock])) + (:import [com.github.benmanes.caffeine.cache Caffeine Cache])) -(defonce ^:private ^ReadWriteLock GLOBAL_STATE_LOCK - (ReentrantReadWriteLock.)) - -(defonce ^:private GLOBAL_STATE - (WeakHashMap.)) +(defonce ^:private GLOBAL_CANCEL_STATE + (delay + (.. (Caffeine/newBuilder) + ;;; Use weak keys to allow removal of keys and values when no longer referenced elsewhere + (weakKeys) + ;;; Cancellation interruption is best effort, arbitrary size based on CPU count, should be enough for most use cases + (maximumSize (-> (Runtime/getRuntime) + (.availableProcessors) + (* 1000))) + (build)))) (def ^:dynamic *items* []) @@ -16,22 +20,12 @@ `(binding [*items* (conj *items* ~item)] ~@body)) -(defn ^:no-doc set-global-state! +(defn set-cancel-state! "Sets the value of the item's global state" [key val] - (let [^Lock lock (.writeLock GLOBAL_STATE_LOCK)] - (.lock lock) - (try - (.put ^Map GLOBAL_STATE key val) - (finally - (.unlock lock))))) + (.put ^Cache @GLOBAL_CANCEL_STATE key val)) -(defn ^:no-doc get-global-state +(defn get-cancel-state "Gets the value of the item's global state" [key] - (let [^Lock lock (.readLock GLOBAL_STATE_LOCK)] - (.lock lock) - (try - (.get ^Map GLOBAL_STATE key) - (finally - (.unlock lock))))) + (.getIfPresent ^Cache @GLOBAL_CANCEL_STATE key)) diff --git a/test/futurama/core_test.clj b/test/futurama/core_test.clj index 043a9f5..f0652ca 100644 --- a/test/futurama/core_test.clj +++ b/test/futurama/core_test.clj @@ -4,8 +4,8 @@ [clojure.test :refer [deftest is testing use-fixtures]] [criterium.core :refer [quick-benchmark report-result with-progress-reporting]] - [futurama.core :refer [! - async->> async-cancel! async-cancellable? + [futurama.core :refer [! async->> + async-cancel! async-cancellable? async-completed? async-cancelled? async-every? async-for async-map async-postwalk async-prewalk async-reduce async-some async? get-pool thread with-pool] :as f]) @@ -14,21 +14,21 @@ (defn async-fixture [f] + (f/set-async-factory! f/async-future-factory) + (f/set-thread-factory! f/async-future-factory) + (f) + (f/set-async-factory! f/async-channel-factory) + (f/set-thread-factory! f/async-channel-factory) + (f) (f/set-async-factory! f/async-promise-factory) (f/set-thread-factory! f/async-promise-factory) (f) - (f/with-async-factory f/async-future-factory - (f/with-thread-factory f/async-future-factory - (f))) - (f/with-async-factory f/async-channel-factory - (f/with-thread-factory f/async-channel-factory - (f))) - (f/with-async-factory f/async-promise-factory - (f/with-thread-factory f/async-promise-factory - (f))) - (f/with-async-factory f/async-deferred-factory - (f/with-thread-factory f/async-deferred-factory - (f)))) + (f/set-async-factory! f/async-deferred-factory) + (f/set-thread-factory! f/async-deferred-factory) + (f) + (f/set-async-factory! nil) + (f/set-thread-factory! nil) + (f)) (use-fixtures :once async-fixture) @@ -48,69 +48,112 @@ (def test-pool (delay - (Executors/newFixedThreadPool 2))) + (Executors/newFixedThreadPool 10))) (deftest cancel-async-test + (testing "cancellable future is interrupted test" + (with-pool @test-pool + (let [interrupted (atom false) + a (promise) + s (atom 0) + f (future + (try + (while (not (async-cancelled?)) ;;; this loop goes on infinitely until the thread is interrupted + (Thread/sleep 90) + (println "thread looping..." (swap! s inc))) + (println "ended thread looping.") + (deliver a true) + (catch InterruptedException e + (println "interrupted looping by:" (type e)) + (reset! interrupted true) + (deliver a true))))] + (is (true? (async-cancellable? f))) + (go + (> get-pool bond/calls (map :args))))))) (testing "with-pool uses specified workload pool - mixed" (let [mixed-pool (get-pool :mixed)] @@ -139,7 +182,7 @@ (async (is (= 100 (!> get-pool bond/calls (map :args))))))) (testing "with-pool uses specified workload pool - compute" (let [compute-pool (get-pool :compute)] @@ -149,7 +192,7 @@ (async (is (= 100 (!> get-pool bond/calls (map :args)))))))) (deftest thread-macro-workload-test diff --git a/test/futurama/test_setup.clj b/test/futurama/test_setup.clj index 4cc131a..dde7e4d 100644 --- a/test/futurama/test_setup.clj +++ b/test/futurama/test_setup.clj @@ -7,7 +7,7 @@ (hto/activate!) -#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]} +#_{:clojure-lsp/ignore [:clojure-lsp/unused-public-var]} (defn defuse-zero-assertions "Don't fail the test suite if we hide an `is` within a `doseq`.