diff --git a/.cljfmt.edn b/.cljfmt.edn
new file mode 100644
index 0000000..6421434
--- /dev/null
+++ b/.cljfmt.edn
@@ -0,0 +1,8 @@
+{:legacy/merge-indents? true
+ :indents {cond [[:inner 0]]
+ fn* [[:inner 0]]
+ futurama.core/async [[:inner 0]]
+ futurama.core/thread [[:inner 0]]
+ futurama.core/async! [[:inner 0]]
+ futurama.core/thread! [[:inner 0]]
+ futurama.impl/async-dispatch-task-handler [[:inner 0]]}}
diff --git a/.github/workflows/clojure.yml b/.github/workflows/clojure.yml
index 4abfbf4..b88a26d 100644
--- a/.github/workflows/clojure.yml
+++ b/.github/workflows/clojure.yml
@@ -12,9 +12,10 @@ jobs:
test:
strategy:
matrix:
- java_version: [corretto-11,corretto-21]
- test_clojure_alias: [clojure-1.10, clojure-1.11, clojure-1.12]
- test_core_async_alias: [core.async-1.6, core.async-1.7, core.async-1.8]
+ java_version: [corretto-24]
+ test_clojure_alias: [clojure-1.12]
+ test_core_async_alias: [core.async-1.8, core.async-1.9]
+ fail-fast: false
runs-on: ubuntu-latest
steps:
- name: Checkout code
diff --git a/.mise.toml b/.mise.toml
index d323733..df03a97 100644
--- a/.mise.toml
+++ b/.mise.toml
@@ -3,3 +3,4 @@ clojure = "https://github.com/asdf-community/asdf-clojure.git"
[tools]
clojure = "1.12"
+java = "graalvm-22.3.1+java11"
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 7be160a..55ff060 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,12 @@
This is a history of changes to k13labs/futurama
+# 1.4.0
+* **Enhanced Cancellation**: Improved cancellation support now works across all async types including core.async channels
+* **State Management**: Replaced WeakHashMap with Caffeine cache for lock-free, thread-safe cancellation state tracking
+* **Dependencies**: Added Caffeine 3.2.2, updated Clojure to 1.12.3, core.async to 1.8.741 (with 1.9.829-alpha2 support)
+* **CI Updates**: Test matrix now covers Clojure 1.12 with core.async 1.8/1.9 on Java 11 & 24 (library remains backwards compatible)
+* **Improvements**: Channel factories use identity exception handler for robust error handling, consistent protocol return values
+
# 1.3.1
* update core async version to latest `1.8.735`
diff --git a/Makefile b/Makefile
index 0258637..d32be1b 100644
--- a/Makefile
+++ b/Makefile
@@ -17,10 +17,16 @@ env:
repl:
clojure -M:$(REPL_CLOJURE_ALIAS):$(REPL_CORE_ASYNC_ALIAS):dev:test:app:repl
+repl-next: REPL_CORE_ASYNC_ALIAS := core.async-1.9
+repl-next: repl
+
test:
clojure -M:$(TEST_CLOJURE_ALIAS):$(TEST_CORE_ASYNC_ALIAS):dev:test:app:runner \
--focus :unit --reporter kaocha.report/documentation --no-capture-output
+test-next: TEST_CORE_ASYNC_ALIAS := core.async-1.9
+test-next: test
+
clean:
rm -rf target build
diff --git a/README.md b/README.md
index e6a0d03..3e700b1 100644
--- a/README.md
+++ b/README.md
@@ -1,38 +1,100 @@
[](https://github.com/k13labs/futurama/actions/workflows/clojure.yml)
-# _About_
+# Futurama
-Futurama is a Clojure library for more deeply integrating async abstractions in the Clojure and JVM ecosystem with Clojure [core.async](https://github.com/clojure/core.async).
+Futurama is a Clojure library that deeply integrates diverse async abstractions across the Clojure and JVM ecosystem with [core.async](https://github.com/clojure/core.async).
-It adds support for [CompletableFuture](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html) and [IDeferred](https://github.com/clj-commons/manifold/blob/master/src/manifold/deferred.clj) to be used in approximately the same fashion as Clojure [core.async](https://github.com/clojure/core.async) channels, and for [Future](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html) and [IDeref](https://github.com/clojure/clojure/blob/master/src/jvm/clojure/lang/IDeref.java) to be read in a similar manner as well.
+It provides a unified interface for working with multiple async types:
+- **core.async** channels
+- **CompletableFuture** (Java)
+- **Future** (Java)
+- **Manifold Deferred**
+- **IDeref** types (promises, delays, atoms, refs)
-# _Usage_
+All these types can be consumed and composed using a consistent API, making it easy to work with heterogeneous async code.
-Here's a simple example.
+## Features
+
+- **Unified Async Operations**: Use `! {:status "ok"}
+
+;; Use inside async blocks
(async
- (println (! prints out: "laundry: done"
+ (let [result (! returns async result with value 55
+```
+
+## Usage Examples
+
+### Basic Async Operations
+
+```clj
+(require '[futurama.core :refer [async thread async-for ! "completed"
+
+;; Create a thread operation (runs on thread pool)
+(def heavy-work
+ (thread
+ (Thread/sleep 100)
+ (* 42 42)))
+
+(! 1764
+```
+
+### Async For Comprehensions
+```clj
+;; Sequential evaluation (items processed one by one)
(! returns `[[1 1 2 4] [1 3 4 8] [3 1 4 8] [3 3 6 12]]` and takes slightly over 200ms total time.
+;;=> [[1 1 2 4] [1 3 4 8] [3 1 4 8] [3 3 6 12]]
+;; Takes ~200ms (4 items × 50ms each)
+;; Concurrent evaluation (items processed in parallel)
(! returns `[[1 1 2 4] [1 3 4 8] [3 1 4 8] [3 3 6 12]]` evaluated fully async and takes slightly over 50ms total time.
+;;=> [[1 1 2 4] [1 3 4 8] [3 1 4 8] [3 3 6 12]]
+;; Takes ~50ms (all items processed concurrently)
+```
+
+### Cancellation
+
+Futurama supports cancellation of async operations across all supported types. When an operation is cancelled, it receives a `CancellationException`.
+
+```clj
+(require '[futurama.core :refer [async async-cancel! async-cancelled?
+ async-cancellable? thread]])
+
+;; Cancel a long-running operation
+(def work
+ (async
+ (loop [i 0]
+ (when-not (async-cancelled?)
+ (! true
+
+;; Cancel it
+(async-cancel! work)
+
+;; Check if it was cancelled
+(async-cancelled? work)
+;;=> true
+```
+
+#### Cooperative Cancellation
+
+For best results, long-running operations should cooperatively check for cancellation:
+
+```clj
+(defn cancellable-work [items]
+ (async
+ (loop [remaining items
+ results []]
+ (if (or (empty? remaining) (async-cancelled?))
+ results
+ (let [item (first remaining)
+ result (! returns async result with value 63
+```
+
+### Thread Pool Management
+
+Route work to appropriate thread pools for optimal performance:
+
+```clj
+;; Use :io pool for I/O-bound work (default for async)
+(async :io
+ (! [0 2 4 6 8]
+
+;; async-reduce: Reduce with async operations
+(! 45
+
+;; async-some: Find first truthy result
+(! x 5) x)))
+ (range 10)))
+;;=> 6
+
+;; async-every?: Check if all satisfy predicate
+(! true
```
-See the existing tests for more examples.
+### Configuring Async Factories
+
+Control what type of async result is returned:
+
+```clj
+(require '[futurama.core :refer [set-async-factory! set-thread-factory!
+ async-future-factory async-channel-factory
+ async-promise-factory async-deferred-factory]])
+
+;; Use CompletableFuture for async operations
+(set-async-factory! async-future-factory)
+
+(type (async "hello"))
+;;=> java.util.concurrent.CompletableFuture
-# _Building_
+;; Use Manifold Deferred
+(set-async-factory! async-deferred-factory)
-Futurama is built, tested, and deployed using [Clojure Tools Deps](https://clojure.org/guides/deps_and_cli).
+(type (async "hello"))
+;;=> manifold.deferred.Deferred
-GNU Make is used to simplify invocation of some commands.
+;; Reset to default (core.async promise-chan)
+(set-async-factory! nil)
+```
+
+See the [tests](test/futurama/core_test.clj) for more examples.
-# _Availability_
+## Installation
-Futurama releases for this project are on [Clojars](https://clojars.org/). Simply add the following to your project:
+Add Futurama to your project dependencies:
[](http://clojars.org/com.github.k13labs/futurama)
-# _Communication_
+**deps.edn**
+```clojure
+{:deps {com.github.k13labs/futurama {:mvn/version "1.4.0"}}}
+```
+
+**Leiningen project.clj**
+```clojure
+[com.github.k13labs/futurama "1.4.0"]
+```
+
+## Requirements
+
+- **Clojure**: 1.11+ (backwards compatible with 1.10+)
+- **Java**: 11+
+- **core.async**: 1.8+
+
+## API Reference
+
+### Core Operations
+
+| Function | Description |
+|----------|-------------|
+| `!` | Thread-first with async operations |
+| `async->>` | Thread-last with async operations |
+
+### Thread Pool Management
+
+| Function/Macro | Description |
+|----------------|-------------|
+| `with-pool` | Execute body with specified thread pool |
+| `get-pool` | Get thread pool for workload type (`:io`, `:compute`, `:mixed`) |
+
+### Factory Configuration
+
+| Function | Description |
+|----------|-------------|
+| `set-async-factory!` | Set global factory for `async` macro results |
+| `set-thread-factory!` | Set global factory for `thread` macro results |
+| `with-async-factory` | Temporarily bind async factory |
+| `with-thread-factory` | Temporarily bind thread factory |
+| `async-channel-factory` | Creates core.async channel (buffer size 1) |
+| `async-promise-factory` | Creates core.async promise-chan (default) |
+| `async-future-factory` | Creates CompletableFuture |
+| `async-deferred-factory` | Creates Manifold Deferred |
+
+## Advanced Configuration
+
+### Custom Thread Pools
+
+Configure application-wide thread pools using the Java system property `futurama.executor-factory`:
+
+```clojure
+;; Use core.async's thread pool
+(System/setProperty "futurama.executor-factory"
+ "clojure.core.async.impl.dispatch/executor-for")
+```
+
+The factory function receives a keyword (`:io`, `:compute`, `:mixed`) and should return an `ExecutorService` or `nil` to use defaults.
+
+### Exception Handling
+
+Futurama treats exceptions as values. Uncaught exceptions are:
+1. Captured and returned over the async channel/future
+2. Re-thrown when read via `!com.github.k13labs
futurama
futurama
- 1.3.1
+ 1.4.0
- 1.3.1
+ 1.4.0
https://github.com/k13labs/futurama
scm:git:git://github.com/k13labs/futurama.git
scm:git:ssh://git@github.com/k13labs/futurama.git
@@ -22,13 +22,18 @@
org.clojure
clojure
- 1.12.0
+ 1.12.3
manifold
manifold
0.4.3
+
+ com.github.ben-manes.caffeine
+ caffeine
+ 3.2.2
+
org.clojure
core.async
diff --git a/src/futurama/core.clj b/src/futurama/core.clj
index 2f3581e..57fb7e4 100644
--- a/src/futurama/core.clj
+++ b/src/futurama/core.clj
@@ -49,6 +49,7 @@
go block threads - use Thread.setDefaultUncaughtExceptionHandler()
to catch and handle."
(:require [clojure.core.async :as async]
+ [clojure.core.async.impl.dispatch :as run-impl]
[clojure.core.async.impl.protocols :as core-impl]
[clojure.core.async.impl.channels :refer [box]]
[clojure.core.async.impl.ioc-macros :as rt]
@@ -58,10 +59,12 @@
[manifold.deferred :as d])
(:import [clojure.lang Var IDeref IPending IFn IAtom IRef]
[clojure.core.async.impl.channels ManyToManyChannel]
+ [clojure.core.async.impl.buffers PromiseBuffer]
[java.util.concurrent
CompletableFuture
CompletionException
ExecutionException
+ CancellationException
ExecutorService
ForkJoinPool
Future]
@@ -76,12 +79,12 @@
(defn async-channel-factory
"Creates a core async channel of size 1"
^ManyToManyChannel []
- (async/chan 1))
+ (async/chan 1 nil identity))
(defn async-promise-factory
"Creates a core async promise channel"
^ManyToManyChannel []
- (async/promise-chan))
+ (async/promise-chan nil identity))
(defn async-future-factory
"Creates a new CompletableFuture"
@@ -187,15 +190,24 @@
(def ^:no-doc rte rethrow-exception)
+(defn- set-cancel-state!
+ [x]
+ (state/set-cancel-state! x ASYNC_CANCELLED))
+
+(defn- get-cancel-state
+ [x]
+ (= (state/get-cancel-state x) ASYNC_CANCELLED))
+
(defn async-cancellable?
- "Determines if v satisfies? `AsyncCancellable`"
+ "Determines if v can be cancelled"
[v]
(satisfies? impl/AsyncCancellable v))
(defn async-cancel!
"Cancels the async item."
[item]
- (impl/cancel! item))
+ (when (async-cancellable? item)
+ (impl/cancel! item)))
(defn async-cancelled?
"Checks if the current executing async item or one of its parents or provided item has been cancelled.
@@ -208,13 +220,20 @@
(some async-cancelled? state/*items*)
false))
([item]
- (impl/cancelled? item)))
+ (when (satisfies? impl/AsyncCancellable item)
+ (impl/cancelled? item))))
(defn async-completed?
"Checks if the provided `AsyncCompletableReader` instance is completed?"
[x]
- (when (satisfies? impl/AsyncCompletableReader x)
- (impl/completed? x)))
+ (cond
+ (satisfies? impl/AsyncCompletableReader x)
+ (impl/completed? x)
+
+ (satisfies? core-impl/Channel x)
+ (core-impl/closed? x)
+
+ :else false))
(defn async?
"returns true if v instance satisfies? core.async's `ReadPort`"
@@ -225,29 +244,23 @@
"Asynchronously invokes the body inside a pooled thread and return over a write port,
preserves the current thread binding frame, the pool used can be specified via `*thread-pool*`."
[pool port & body]
- `(let [pool# ~pool
+ `(let [pool# (or ~pool *thread-pool* (get-pool :mixed))
port# ~port]
(state/push-item port#
(let [binding-frame# (Var/cloneThreadBindingFrame)]
- (impl/async-dispatch-task-handler
- (or pool#
- *thread-pool*
- (get-pool :mixed))
- port#
- (^:once
- fn*
- []
- (let [thread-frame# (Var/getThreadBindingFrame)]
- (Var/resetThreadBindingFrame binding-frame#)
- (try
- (let [res# (do ~@body)]
- (when (some? res#)
- (async/>!! port# res#)))
- (catch Throwable ~'e
- (async/>!! port# (unwrap-exception ~'e)))
- (finally
- (async/close! port#)
- (Var/resetThreadBindingFrame thread-frame#))))))))))
+ (impl/async-dispatch-task-handler pool# port#
+ (^:once fn* []
+ (let [thread-frame# (Var/getThreadBindingFrame)]
+ (Var/resetThreadBindingFrame binding-frame#)
+ (try
+ (let [~'res (do ~@body)]
+ (when (some? ~'res)
+ (async/>!! port# ~'res)))
+ (catch Throwable ~'e
+ (async/>!! port# (unwrap-exception ~'e)))
+ (finally
+ (async/close! port#)
+ (Var/resetThreadBindingFrame thread-frame#))))))))))
(defmacro thread
"Asynchronously invokes the body in a pooled thread, preserves the current thread binding frame,
@@ -266,171 +279,6 @@
(thread-factory)
~@body)))
-(extend-protocol impl/AsyncCancellable
- Object
- (cancel! [this]
- (state/set-global-state! this ASYNC_CANCELLED)
- true)
- (cancelled? [this]
- (= (state/get-global-state this) ASYNC_CANCELLED))
-
- Future
- (cancel! [this]
- (state/set-global-state! this ASYNC_CANCELLED)
- (future-cancel this))
- (cancelled? [this]
- (or (future-cancelled? this)
- (= (state/get-global-state this) ASYNC_CANCELLED)
- false)))
-
-(extend-type Future
- core-impl/ReadPort
- (take! [x handler]
- (impl/async-read-port-take! x handler))
-
- impl/AsyncCompletableReader
- (get! [fut]
- (try
- (.get ^Future fut)
- (catch Throwable e
- e)))
- (completed? [fut]
- (.isDone ^Future fut))
- (on-complete [fut f]
- (async/thread
- (let [r (impl/get! fut)]
- (f r))))
-
- core-impl/Channel
- (close! [x]
- (when-not (async-completed? x)
- (async-cancel! x)))
- (closed? [x]
- (async-completed? x)))
-
-(extend-protocol impl/AsyncCompletableWriter
- IFn
- (complete! [f v]
- (boolean (f v)))
-
- IAtom
- (complete! [a v]
- (reset! a v)
- true)
-
- IRef
- (complete! [a v]
- (dosync
- (ref-set a v))
- true))
-
-(extend-protocol core-impl/WritePort
- IFn
- (put! [x val handler]
- (impl/async-write-port-put! x val handler))
-
- IAtom
- (put! [x val handler]
- (impl/async-write-port-put! x val handler))
-
- IRef
- (put! [x val handler]
- (impl/async-write-port-put! x val handler)))
-
-(extend-type IDeref
- core-impl/ReadPort
- (take! [x handler]
- (impl/async-read-port-take! x handler))
-
- impl/AsyncCompletableReader
- (get! [ref]
- (try
- (deref ref)
- (catch Throwable e
- e)))
- (completed? [ref]
- (if (instance? IPending ref)
- (realized? ref)
- true))
- (on-complete [ref f]
- (async/thread
- (let [r (impl/get! ref)]
- (f r))))
-
- core-impl/Channel
- (close! [ref]
- (when (instance? IFn ref)
- (ref nil)))
- (closed? [ref]
- (impl/completed? ref)))
-
-(extend-type CompletableFuture
- core-impl/ReadPort
- (take! [x handler]
- (impl/async-read-port-take! x handler))
-
- core-impl/WritePort
- (put! [x val handler]
- (impl/async-write-port-put! x val handler))
-
- impl/AsyncCompletableReader
- (get! [fut]
- (try
- (.get ^CompletableFuture fut)
- (catch Throwable e
- e)))
- (completed? [fut]
- (.isDone ^CompletableFuture fut))
- (on-complete [fut f]
- (let [^BiConsumer invoke-cb (impl/->JavaBiConsumer
- (fn [val ex]
- (f (or ex val))))]
- (.whenComplete ^CompletableFuture fut ^BiConsumer invoke-cb)))
-
- impl/AsyncCompletableWriter
- (complete! [fut v]
- (if (instance? Throwable v)
- (.completeExceptionally ^CompletableFuture fut ^Throwable v)
- (.complete ^CompletableFuture fut v)))
-
- core-impl/Channel
- (close! [fut]
- (.complete ^CompletableFuture fut nil))
- (closed? [fut]
- (.isDone ^CompletableFuture fut)))
-
-(extend-type Deferred
- core-impl/ReadPort
- (take! [x handler]
- (impl/async-read-port-take! x handler))
-
- core-impl/WritePort
- (put! [x val handler]
- (impl/async-write-port-put! x val handler))
-
- impl/AsyncCompletableReader
- (get! [dfd]
- (try
- (deref dfd)
- (catch Throwable e
- e)))
- (completed? [dfd]
- (d/realized? dfd))
- (on-complete [dfd f]
- (d/on-realized dfd f f))
-
- impl/AsyncCompletableWriter
- (complete! [dfd v]
- (if (instance? Throwable v)
- (d/error! dfd v)
- (d/success! dfd v)))
-
- core-impl/Channel
- (close! [dfd]
- (d/success! dfd nil))
- (closed? [dfd]
- (d/realized? dfd)))
-
(defmacro async!
"Asynchronously executes the body, returning `port` immediately to te
calling thread. Additionally, any visible calls to !! and alt!/alts!
@@ -449,31 +297,27 @@
completed; the pool used can be specified via `*thread-pool*` binding."
[pool port & body]
(let [crossing-env (zipmap (keys &env) (repeatedly gensym))]
- `(let [pool# ~pool
+ `(let [pool# (or ~pool *thread-pool* (get-pool :io))
port# ~port]
(state/push-item port#
(let [captured-bindings# (Var/getThreadBindingFrame)]
- (impl/async-dispatch-task-handler
- (or pool#
- *thread-pool*
- (get-pool :io))
- port#
- (^:once
- fn*
- []
- (let [~@(mapcat
- (fn [[l sym]]
- [sym `(^:once fn* [] ~(vary-meta l dissoc :tag))])
- crossing-env)
- f# ~(impl/async-state-machine
- `(try
- ~@body
- (catch Throwable ~'e
- (unwrap-exception ~'e))) 1 [crossing-env &env] rt/async-custom-terminators)
- state# (-> (f#)
- (rt/aset-all! rt/USER-START-IDX port#
- rt/BINDINGS-IDX captured-bindings#))]
- (rt/run-state-machine-wrapped state#)))))))))
+ (impl/async-dispatch-task-handler pool# port#
+ (^:once fn* []
+ (run-impl/with-dispatch-thread-marking
+ (let [~@(mapcat
+ (fn [[l sym]]
+ [sym `(^:once fn* []
+ ~(vary-meta l dissoc :tag))])
+ crossing-env)
+ f# ~(impl/async-state-machine
+ `(try
+ ~@body
+ (catch Throwable ~'e
+ (unwrap-exception ~'e))) 1 [crossing-env &env] rt/async-custom-terminators)
+ state# (-> (f#)
+ (rt/aset-all! rt/USER-START-IDX port#
+ rt/BINDINGS-IDX captured-bindings#))]
+ (rt/run-state-machine-wrapped state#))))))))))
(defmacro async
"Asynchronously executes the body, returning immediately to the
@@ -815,3 +659,234 @@
"Like postwalk, but does pre-order traversal."
[f form]
(async-walk (partial async-prewalk f) identity (f form)))
+
+;;; Begin protocol implementations
+
+(extend-type Future
+ core-impl/ReadPort
+ (take! [x handler]
+ (impl/async-read-port-take! x handler))
+
+ impl/AsyncCompletableReader
+ (get! [fut]
+ (try
+ (.get ^Future fut)
+ (catch Throwable e
+ e)))
+ (completed? [fut]
+ (.isDone ^Future fut))
+ (on-complete [fut f]
+ (async/thread
+ (let [r (impl/get! fut)]
+ (f r)))
+ fut)
+
+ impl/AsyncCancellable
+ (cancel! [fut]
+ (set-cancel-state! fut)
+ (future-cancel fut))
+ (on-cancel-interrupt [fut _]
+ fut)
+ (cancelled? [fut]
+ (or (future-cancelled? fut)
+ (get-cancel-state fut)))
+
+ core-impl/Channel
+ (close! [fut]
+ (when-not (impl/completed? fut)
+ (impl/cancel! fut))
+ nil)
+ (closed? [fut]
+ (impl/completed? fut)))
+
+(extend-protocol impl/AsyncCompletableWriter
+ IFn
+ (complete! [f v]
+ (boolean (f v)))
+
+ IAtom
+ (complete! [a v]
+ (reset! a v)
+ true)
+
+ IRef
+ (complete! [a v]
+ (dosync
+ (ref-set a v))
+ true))
+
+(extend-protocol core-impl/WritePort
+ IFn
+ (put! [x val handler]
+ (impl/async-write-port-put! x val handler))
+
+ IAtom
+ (put! [x val handler]
+ (impl/async-write-port-put! x val handler))
+
+ IRef
+ (put! [x val handler]
+ (impl/async-write-port-put! x val handler)))
+
+(extend-type IDeref
+ core-impl/ReadPort
+ (take! [x handler]
+ (impl/async-read-port-take! x handler))
+
+ impl/AsyncCompletableReader
+ (get! [ref]
+ (try
+ (deref ref)
+ (catch Throwable e
+ e)))
+ (completed? [ref]
+ (if (instance? IPending ref)
+ (realized? ref)
+ true))
+ (on-complete [ref f]
+ (async/thread
+ (let [r (impl/get! ref)]
+ (f r)))
+ ref)
+
+ impl/AsyncCancellable
+ (cancel! [ref]
+ (set-cancel-state! ref)
+ (if (and (instance? IFn ref)
+ (not (impl/completed? ref)))
+ (do
+ (ref (CancellationException. "Task cancelled"))
+ true)
+ false))
+ (on-cancel-interrupt [ref _]
+ ref)
+ (cancelled? [ref]
+ (get-cancel-state ref))
+
+ core-impl/Channel
+ (close! [ref]
+ (when (and (instance? IFn ref)
+ (not (impl/completed? ref)))
+ (ref nil))
+ nil)
+ (closed? [ref]
+ (impl/completed? ref)))
+
+(extend-type CompletableFuture
+ core-impl/ReadPort
+ (take! [x handler]
+ (impl/async-read-port-take! x handler))
+
+ core-impl/WritePort
+ (put! [x val handler]
+ (impl/async-write-port-put! x val handler))
+
+ impl/AsyncCompletableReader
+ (get! [fut]
+ (try
+ (.get ^CompletableFuture fut)
+ (catch Throwable e
+ e)))
+ (completed? [fut]
+ (.isDone ^CompletableFuture fut))
+ (on-complete [fut f]
+ (let [^BiConsumer invoke-cb (impl/->JavaBiConsumer
+ (fn [val ex]
+ (f (or ex val))))]
+ (.whenComplete ^CompletableFuture fut ^BiConsumer invoke-cb))
+ fut)
+
+ impl/AsyncCompletableWriter
+ (complete! [fut v]
+ (if (instance? Throwable v)
+ (.completeExceptionally ^CompletableFuture fut ^Throwable v)
+ (.complete ^CompletableFuture fut v)))
+
+ impl/AsyncCancellable
+ (on-cancel-interrupt [fut fut']
+ (let [^BiConsumer invoke-cb (impl/->JavaBiConsumer
+ (fn [_ _]
+ (future-cancel fut')))] ;;; cancel any linked future
+ (.whenComplete ^CompletableFuture fut ^BiConsumer invoke-cb)
+ fut))
+ (cancel! [fut]
+ (set-cancel-state! fut)
+ (future-cancel fut))
+ (cancelled? [fut]
+ (or (future-cancelled? fut)
+ (get-cancel-state fut)))
+
+ core-impl/Channel
+ (close! [fut]
+ (when-not (impl/completed? fut)
+ (.complete ^CompletableFuture fut nil))
+ nil)
+ (closed? [fut]
+ (.isDone ^CompletableFuture fut)))
+
+(extend-type Deferred
+ core-impl/ReadPort
+ (take! [x handler]
+ (impl/async-read-port-take! x handler))
+
+ core-impl/WritePort
+ (put! [x val handler]
+ (impl/async-write-port-put! x val handler))
+
+ impl/AsyncCompletableReader
+ (get! [dfd]
+ (try
+ (deref dfd)
+ (catch Throwable e
+ e)))
+ (completed? [dfd]
+ (d/realized? dfd))
+ (on-complete [dfd f]
+ (d/on-realized dfd f f)
+ dfd)
+
+ impl/AsyncCompletableWriter
+ (complete! [dfd v]
+ (if (instance? Throwable v)
+ (d/error! dfd v)
+ (d/success! dfd v)))
+
+ impl/AsyncCancellable
+ (cancel! [dfd]
+ (set-cancel-state! dfd)
+ (if-not (d/realized? dfd)
+ (d/error! dfd (CancellationException. "Task cancelled"))
+ false))
+ (on-cancel-interrupt [dfd fut]
+ (let [interrupt-handler (fn [_]
+ (future-cancel fut))]
+ (d/on-realized dfd interrupt-handler interrupt-handler)
+ dfd))
+ (cancelled? [dfd]
+ (get-cancel-state dfd))
+
+ core-impl/Channel
+ (close! [dfd]
+ (d/success! dfd nil)
+ nil)
+ (closed? [dfd]
+ (d/realized? dfd)))
+
+(extend-type ManyToManyChannel
+ impl/AsyncCancellable
+ (cancel! [c]
+ (set-cancel-state! c)
+ (if-not (core-impl/closed? c)
+ (do
+ (async/put! c (CancellationException. "Task cancelled"))
+ (async/close! c)
+ true)
+ false))
+ (on-cancel-interrupt [c fut]
+ (when (->> (.buf ^ManyToManyChannel c)
+ (instance? PromiseBuffer))
+ (async/take! c (fn [_]
+ (future-cancel fut))))
+ c)
+ (cancelled? [c]
+ (get-cancel-state c)))
diff --git a/src/futurama/impl.clj b/src/futurama/impl.clj
index 6c50113..3eb2feb 100644
--- a/src/futurama/impl.clj
+++ b/src/futurama/impl.clj
@@ -2,57 +2,49 @@
(:refer-clojure :exclude [realized?])
(:require
[clojure.core.async :refer [take!]]
+ [clojure.core.async.impl.go :as go-impl]
[clojure.core.async.impl.channels :refer [box]]
[clojure.core.async.impl.protocols :as core-impl])
(:import
[java.util.concurrent
- CompletableFuture
ExecutorService
Future]
[java.util.concurrent.locks Lock]
- [java.util.function BiConsumer Function]))
+ [java.util.function BiConsumer]))
(def async-state-machine
- "Use a requiring resolve here to dynamically use the correct implementation and maintain backwards compatibility"
- (or (requiring-resolve 'clojure.core.async.impl.ioc-macros/state-machine)
- (requiring-resolve 'clojure.core.async.impl.go/state-machine)))
-
-(deftype JavaFunction [f]
- Function
- (apply [_ a]
- (f a)))
-
-(deftype JavaBiConsumer [f]
- BiConsumer
- (accept [_ a b]
- (f a b)))
+ go-impl/state-machine)
(defprotocol AsyncCompletableReader
- (get! [x])
- (completed? [x])
- (on-complete [x f]))
+ (get! [x]
+ "Returns the completed value of this async operation, blocking if the async operation is not yet complete.")
+ (completed? [x]
+ "Returns true if this async operation has completed.")
+ (on-complete [x f]
+ "Registers a callback f to be called with the completed value when this async operation completes."))
(defprotocol AsyncCompletableWriter
- (complete! [x v]))
+ (complete! [x v]
+ "Attempts to complete this async operation with value v, returning true if successful, false otherwise."))
(defprotocol AsyncCancellable
- (cancel! [this])
- (cancelled? [this]))
+ (on-cancel-interrupt [this fut]
+ "Attempts to register a cancellation handler that will be called with the given future when this async operation is cancelled.")
+ (cancelled? [this]
+ "Returns true if this async operation has been cancelled.")
+ (cancel! [this]
+ "Attempts to cancel this async operation."))
-(defn dispatch
- ^Future [^Runnable task ^ExecutorService pool]
- (.submit ^ExecutorService pool ^Runnable task))
+(deftype JavaBiConsumer [f]
+ BiConsumer
+ (accept [_ a b]
+ (f a b)))
(defn async-dispatch-task-handler
- [pool port task]
- (let [^Future fut (dispatch task pool)]
- (when (instance? CompletableFuture port)
- (.exceptionally ^CompletableFuture port
- ^Function (->JavaFunction
- (fn [t]
- (cancel! port)
- (future-cancel fut)
- (throw t)))))
+ "Dispatches a task to the given executor service pool, and registers a cancellation handler on the port."
+ ^Future [^ExecutorService pool port ^Runnable task]
+ (let [^Future fut (.submit pool task)]
+ (on-cancel-interrupt port fut)
port))
(defn- async-reader-handler*
diff --git a/src/futurama/state.clj b/src/futurama/state.clj
index d4820e4..8d0e6f9 100644
--- a/src/futurama/state.clj
+++ b/src/futurama/state.clj
@@ -1,12 +1,16 @@
(ns ^:no-doc futurama.state
- (:import [java.util Map WeakHashMap]
- [java.util.concurrent.locks Lock ReadWriteLock ReentrantReadWriteLock]))
+ (:import [com.github.benmanes.caffeine.cache Caffeine Cache]))
-(defonce ^:private ^ReadWriteLock GLOBAL_STATE_LOCK
- (ReentrantReadWriteLock.))
-
-(defonce ^:private GLOBAL_STATE
- (WeakHashMap.))
+(defonce ^:private GLOBAL_CANCEL_STATE
+ (delay
+ (.. (Caffeine/newBuilder)
+ ;;; Use weak keys to allow removal of keys and values when no longer referenced elsewhere
+ (weakKeys)
+ ;;; Cancellation interruption is best effort, arbitrary size based on CPU count, should be enough for most use cases
+ (maximumSize (-> (Runtime/getRuntime)
+ (.availableProcessors)
+ (* 1000)))
+ (build))))
(def ^:dynamic *items* [])
@@ -16,22 +20,12 @@
`(binding [*items* (conj *items* ~item)]
~@body))
-(defn ^:no-doc set-global-state!
+(defn set-cancel-state!
"Sets the value of the item's global state"
[key val]
- (let [^Lock lock (.writeLock GLOBAL_STATE_LOCK)]
- (.lock lock)
- (try
- (.put ^Map GLOBAL_STATE key val)
- (finally
- (.unlock lock)))))
+ (.put ^Cache @GLOBAL_CANCEL_STATE key val))
-(defn ^:no-doc get-global-state
+(defn get-cancel-state
"Gets the value of the item's global state"
[key]
- (let [^Lock lock (.readLock GLOBAL_STATE_LOCK)]
- (.lock lock)
- (try
- (.get ^Map GLOBAL_STATE key)
- (finally
- (.unlock lock)))))
+ (.getIfPresent ^Cache @GLOBAL_CANCEL_STATE key))
diff --git a/test/futurama/core_test.clj b/test/futurama/core_test.clj
index 043a9f5..f0652ca 100644
--- a/test/futurama/core_test.clj
+++ b/test/futurama/core_test.clj
@@ -4,8 +4,8 @@
[clojure.test :refer [deftest is testing use-fixtures]]
[criterium.core :refer [quick-benchmark report-result
with-progress-reporting]]
- [futurama.core :refer [!
- async->> async-cancel! async-cancellable?
+ [futurama.core :refer [! async->>
+ async-cancel! async-cancellable? async-completed?
async-cancelled? async-every? async-for async-map
async-postwalk async-prewalk async-reduce async-some
async? get-pool thread with-pool] :as f])
@@ -14,21 +14,21 @@
(defn async-fixture
[f]
+ (f/set-async-factory! f/async-future-factory)
+ (f/set-thread-factory! f/async-future-factory)
+ (f)
+ (f/set-async-factory! f/async-channel-factory)
+ (f/set-thread-factory! f/async-channel-factory)
+ (f)
(f/set-async-factory! f/async-promise-factory)
(f/set-thread-factory! f/async-promise-factory)
(f)
- (f/with-async-factory f/async-future-factory
- (f/with-thread-factory f/async-future-factory
- (f)))
- (f/with-async-factory f/async-channel-factory
- (f/with-thread-factory f/async-channel-factory
- (f)))
- (f/with-async-factory f/async-promise-factory
- (f/with-thread-factory f/async-promise-factory
- (f)))
- (f/with-async-factory f/async-deferred-factory
- (f/with-thread-factory f/async-deferred-factory
- (f))))
+ (f/set-async-factory! f/async-deferred-factory)
+ (f/set-thread-factory! f/async-deferred-factory)
+ (f)
+ (f/set-async-factory! nil)
+ (f/set-thread-factory! nil)
+ (f))
(use-fixtures :once async-fixture)
@@ -48,69 +48,112 @@
(def test-pool
(delay
- (Executors/newFixedThreadPool 2)))
+ (Executors/newFixedThreadPool 10)))
(deftest cancel-async-test
+ (testing "cancellable future is interrupted test"
+ (with-pool @test-pool
+ (let [interrupted (atom false)
+ a (promise)
+ s (atom 0)
+ f (future
+ (try
+ (while (not (async-cancelled?)) ;;; this loop goes on infinitely until the thread is interrupted
+ (Thread/sleep 90)
+ (println "thread looping..." (swap! s inc)))
+ (println "ended thread looping.")
+ (deliver a true)
+ (catch InterruptedException e
+ (println "interrupted looping by:" (type e))
+ (reset! interrupted true)
+ (deliver a true))))]
+ (is (true? (async-cancellable? f)))
+ (go
+ (> get-pool bond/calls (map :args)))))))
(testing "with-pool uses specified workload pool - mixed"
(let [mixed-pool (get-pool :mixed)]
@@ -139,7 +182,7 @@
(async
(is (= 100
(!> get-pool bond/calls (map :args)))))))
(testing "with-pool uses specified workload pool - compute"
(let [compute-pool (get-pool :compute)]
@@ -149,7 +192,7 @@
(async
(is (= 100
(!> get-pool bond/calls (map :args))))))))
(deftest thread-macro-workload-test
diff --git a/test/futurama/test_setup.clj b/test/futurama/test_setup.clj
index 4cc131a..dde7e4d 100644
--- a/test/futurama/test_setup.clj
+++ b/test/futurama/test_setup.clj
@@ -7,7 +7,7 @@
(hto/activate!)
-#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
+#_{:clojure-lsp/ignore [:clojure-lsp/unused-public-var]}
(defn defuse-zero-assertions
"Don't fail the test suite if we hide an `is` within a `doseq`.