Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions .github/workflows/clojure.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ jobs:
test:
strategy:
matrix:
java_version: [corretto-11,corretto-24]
test_clojure_alias: [clojure-1.10, clojure-1.11, clojure-1.12]
java_version: [corretto-24]
test_clojure_alias: [clojure-1.12]
fail-fast: false
runs-on: ubuntu-latest
steps:
- name: Checkout code
Expand Down Expand Up @@ -53,8 +54,6 @@ jobs:
echo "GRAALVM_HOME=$(mise where java)" >> $GITHUB_ENV
echo "$(mise where java)/bin" >> $GITHUB_PATH
gu install native-image
- name: Run tests
run: make test
- name: Run clj-kondo linter
run: make lint
- name: Run build jar
Expand Down
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
This is a history of changes to k13labs/failsage

# 0.3.0 - 2025-10-23

## Changes

* Fixed a problem due to limitations with AsyncExecution that was preventing the async task from being canceled.
* Upgrade futurama dependency to 1.4.0 to get latest fixes and improvements.
* Simplify test matrix, although older Clojure versions should work fine.

# 0.2.0 - 2025-10-22

### Features
Expand Down
6 changes: 2 additions & 4 deletions deps.edn
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
{:paths ["src" "resources"]
:deps {org.clojure/clojure {:mvn/version "1.12.3"}
dev.failsafe/failsafe {:mvn/version "3.3.2"}
com.github.k13labs/futurama {:mvn/version "1.3.1"}}
com.github.k13labs/futurama {:mvn/version "1.4.0"}}

:aliases
{:clojure-1.10 {:extra-deps {org.clojure/clojure {:mvn/version "1.10.3"}}}
:clojure-1.11 {:extra-deps {org.clojure/clojure {:mvn/version "1.11.4"}}}
:clojure-1.12 {:extra-deps {org.clojure/clojure {:mvn/version "1.12.3"}}}
{:clojure-1.12 {:extra-deps {org.clojure/clojure {:mvn/version "1.12.3"}}}

:dev {:extra-paths ["dev"]
:extra-deps {reloaded.repl/reloaded.repl {:mvn/version "0.2.4"}
Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
<groupId>com.github.k13labs</groupId>
<artifactId>failsage</artifactId>
<name>failsage</name>
<version>0.2.0</version>
<version>0.3.0</version>
<scm>
<tag>0.2.0</tag>
<tag>0.3.0</tag>
<url>https://github.com/k13labs/failsage</url>
<connection>scm:git:git://github.com/k13labs/failsage.git</connection>
<developerConnection>scm:git:ssh://git@github.com/k13labs/failsage.git</developerConnection>
Expand All @@ -32,7 +32,7 @@
<dependency>
<groupId>com.github.k13labs</groupId>
<artifactId>futurama</artifactId>
<version>1.3.1</version>
<version>1.4.0</version>
</dependency>
</dependencies>
<build>
Expand Down
29 changes: 16 additions & 13 deletions src/failsage/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
"Defines failsafe policies for handling failures, retries, etc."
(:require
[failsage.impl :as impl]
[futurama.core :refer [!<! async]])
[futurama.core :refer [!<!! async-cancel!]])
(:import
[clojure.lang IPersistentMap]
[dev.failsafe
AsyncExecution
Bulkhead
BulkheadBuilder
CircuitBreaker
Expand Down Expand Up @@ -68,10 +67,12 @@
(executor nil nil))
(^FailsafeExecutor [executor-or-policies]
(executor nil executor-or-policies))
(^FailsafeExecutor [pool executor-or-policies]
(let [pool (impl/get-pool pool)]
(^FailsafeExecutor [executor-pool executor-or-policies]
(let [pool (impl/get-pool executor-pool)]
(if (instance? FailsafeExecutor executor-or-policies)
(.with ^FailsafeExecutor executor-or-policies pool)
(if executor-pool
(.with ^FailsafeExecutor executor-or-policies pool)
executor-or-policies)
(let [policies (policies executor-or-policies)]
(if (empty? policies)
(.with (Failsafe/none) pool)
Expand Down Expand Up @@ -114,12 +115,14 @@
`(execute-async ~executor-or-policy context# ~body))
([executor-or-policy context-binding & body]
`(impl/execute-get-async (executor (impl/validate-not-stateful-map! ~executor-or-policy))
(bound-fn [~(vary-meta context-binding assoc :tag AsyncExecution)]
(async
(try
(impl/record-async-success ~context-binding (!<! ~@body))
(catch Throwable ~'t
(impl/record-async-failure ~context-binding ~'t))))))))
(bound-fn [~(vary-meta context-binding assoc :tag ExecutionContext)]
(let [~'result (do ~@body)]
(try
(impl/on-cancel-propagate! ~context-binding ~'result)
(!<!! ~'result)
(catch InterruptedException ~'e
(async-cancel! ~'result)
(throw ~'e))))))))

(defn bulkhead
"A bulkhead allows you to restrict concurrent executions as a way of preventing system overload.
Expand Down Expand Up @@ -569,14 +572,14 @@
- `:on-success-fn` (optional): A function that takes a single ExecutionCompletedEvent argument, called when an execution completes successfully.
- `:on-failure-fn` (optional): A function that takes a single ExecutionCompletedEvent argument, called when an execution completes with a failure.
- `:timeout-ms` (required): The timeout duration in milliseconds.
- `:interrupt` (optional): Configures the policy to interrupt an execution in addition to cancelling it when the timeout is exceeded. Default is false."
- `:interrupt` (optional): Configures the policy to interrupt an execution in addition to cancelling it when the timeout is exceeded. Default is true."
[{:keys [build
timeout-ms
interrupt
on-success-fn
on-failure-fn]
:or {build true
interrupt false}}]
interrupt true}}]
(let [builder (cond-> (Timeout/builder (Duration/ofMillis (long timeout-ms)))
interrupt
(.withInterrupt)
Expand Down
45 changes: 20 additions & 25 deletions src/failsage/impl.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,25 @@
(:require
[futurama.core :as f])
(:import
[dev.failsafe
AsyncExecution
FailsafeExecutor]
[dev.failsafe ExecutionContext FailsafeExecutor]
[dev.failsafe.event EventListener]
[dev.failsafe.function
AsyncRunnable
CheckedFunction
CheckedPredicate
CheckedRunnable
ContextualSupplier]
[java.util.concurrent ExecutorService]))

(deftype FailsafeCheckedRunnable [f]
CheckedRunnable
(run [_]
(f)))

(defn ->checked-runnable
"Converts a Clojure function to a Failsafe CheckedRunnable."
^CheckedRunnable [f]
(FailsafeCheckedRunnable. f))

(deftype FailsafeEventListener [f]
EventListener
(accept [_ event]
Expand Down Expand Up @@ -45,16 +53,6 @@
^CheckedPredicate [f]
(FailsafeCheckedPredicate. f))

(deftype FailsafeAsyncRunnable [f]
AsyncRunnable
(run [_ execution]
(f execution)))

(defn ->async-runnable
"Converts a Clojure function to a Failsafe AsyncRunnable."
^AsyncRunnable [f]
(FailsafeAsyncRunnable. f))

(deftype FailsafeContextualSupplier [f]
ContextualSupplier
(get [_ context]
Expand All @@ -73,16 +71,6 @@
(f/get-pool pool)
(or pool f/*thread-pool* (f/get-pool :io))))

(defn record-async-success
"Records the result of an execution in the given ExecutionContext."
[^AsyncExecution context ^Object result]
(.recordResult context result))

(defn record-async-failure
"Records the error of an execution in the given ExecutionContext."
[^AsyncExecution context ^Throwable error]
(.recordException context error))

(defn execute-get
"Executes the given CheckedSupplier using the Failsafe executor."
[^FailsafeExecutor executor execute-fn]
Expand All @@ -91,7 +79,7 @@
(defn execute-get-async
"Executes the given CheckedSupplier using the Failsafe executor."
[^FailsafeExecutor executor execute-fn]
(.getAsyncExecution executor (->async-runnable execute-fn)))
(.getAsync executor (->contextual-supplier execute-fn)))

(defn- check-stateful-map!
"Checks if a single policy is a stateful map and throws if so."
Expand All @@ -103,6 +91,13 @@
:policy-type (:type policy)
:stateful-types #{:circuit-breaker :rate-limiter :bulkhead}}))))

(defn on-cancel-propagate!
"Creates a CheckedRunnable that invokes the given handler function when called."
[^ExecutionContext context async-result]
(.onCancel context (->checked-runnable
(fn []
(f/async-cancel! async-result)))))

(defn validate-not-stateful-map!
"Validates that executor-or-policy is not a stateful policy map.
Returns the input unchanged if valid, throws if invalid."
Expand Down
34 changes: 29 additions & 5 deletions test/failsage/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
Timeout
TimeoutExceededException]
[java.lang ArithmeticException IllegalArgumentException]
[java.time Duration]))
[java.time Duration]
[java.util.concurrent ExecutionException]))

;; Dynamic var for testing
(def ^:dynamic *test-var* nil)
Expand Down Expand Up @@ -468,11 +469,34 @@
(is (instance? Timeout policy)))))

(deftest test-timeout-enforces-limit
(testing "timeout enforces time limit"
(testing "timeout enforces time limit - execute"
(let [policy (fs/timeout {:timeout-ms 100})
executor (fs/executor policy)]
(is (thrown? TimeoutExceededException
(fs/execute executor (Thread/sleep 200)))))))
executor (fs/executor policy)
start-time (System/currentTimeMillis)
result (try
(fs/execute executor
(do
(Thread/sleep 10000)
::done)) ;;; is interrupted by timeout
(catch TimeoutExceededException e
e))
end-time (System/currentTimeMillis)]
(is (instance? TimeoutExceededException result))
(is (< (- end-time start-time) 1000)))) ;;; should timeout before 200ms sleep completes
(testing "timeout enforces time limit - execute-async"
(let [policy (fs/timeout {:timeout-ms 100})
executor (fs/executor policy)
start-time (System/currentTimeMillis)
result (try
@(fs/execute-async executor
(future
(Thread/sleep 10000)
::done))
(catch ExecutionException e
(.getCause e)))
end-time (System/currentTimeMillis)]
(is (instance? TimeoutExceededException result))
(is (< (- end-time start-time) 1000))))) ;;; should timeout before 200ms sleep completes

(deftest test-timeout-success-callback
(testing "timeout calls on-success-fn"
Expand Down
60 changes: 6 additions & 54 deletions test/failsage/impl_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -105,27 +105,6 @@
(let [pred (impl/->checked-predicate (fn [_] (throw (ex-info "test error" {}))))]
(is (thrown? Exception (.test pred nil))))))

;; AsyncRunnable Converter Tests

(deftest test-async-runnable-creation
(testing "->async-runnable creates a valid AsyncRunnable"
(let [called (atom false)
runnable (impl/->async-runnable (fn [_] (reset! called true)))]
(is (some? runnable))
(.run runnable nil)
(is @called))))

(deftest test-async-runnable-receives-execution
(testing "AsyncRunnable receives execution context"
(let [called (atom false)
received-arg (atom nil)
runnable (impl/->async-runnable (fn [exec]
(reset! called true)
(reset! received-arg exec)))]
(.run runnable nil)
(is @called)
(is (nil? @received-arg)))))

;; Thread Pool Tests

(deftest test-get-pool-with-keyword
Expand Down Expand Up @@ -191,27 +170,6 @@
(is (= "success" result))
(is (= 2 @counter)))))

;; record-async-success and record-async-failure Tests

(deftest test-record-async-success
(testing "record-async-success records result on AsyncExecution"
(let [recorded-result (atom nil)
mock-execution (reify dev.failsafe.AsyncExecution
(recordResult [_ result]
(reset! recorded-result result)))]
(impl/record-async-success mock-execution "success-value")
(is (= "success-value" @recorded-result)))))

(deftest test-record-async-failure
(testing "record-async-failure records exception on AsyncExecution"
(let [recorded-exception (atom nil)
mock-execution (reify dev.failsafe.AsyncExecution
(recordException [_ exception]
(reset! recorded-exception exception)))
test-error (ex-info "test error" {})]
(impl/record-async-failure mock-execution test-error)
(is (= test-error @recorded-exception)))))

;; ContextualSupplier with ExecutionContext Tests

(deftest test-contextual-supplier-receives-context
Expand Down Expand Up @@ -267,21 +225,16 @@
(testing "execute-get-async executes an async function"
(let [executor (.with (Failsafe/none) ^ExecutorService (f/get-pool :io))
result (impl/execute-get-async executor
(fn [ctx]
(f/async
(impl/record-async-success ctx 42))))]
(fn [_]
42))]
(is (= 42 @result)))))

(deftest test-execute-get-async-with-exception
(testing "execute-get-async handles exceptions"
(let [executor (.with (Failsafe/none) ^ExecutorService (f/get-pool :io))
result (impl/execute-get-async executor
(fn [ctx]
(f/async
(try
(throw (ex-info "async error" {}))
(catch Throwable t
(impl/record-async-failure ctx t))))))]
(fn [_]
(throw (ex-info "async error" {}))))]
(is (thrown-with-msg? Exception #"async error"
@result)))))

Expand All @@ -292,8 +245,7 @@
result (impl/execute-get-async executor
(fn [ctx]
(reset! received-context ctx)
(f/async
(impl/record-async-success ctx 42))))]
42))]
(is (= 42 @result))
(is (some? @received-context))
(is (instance? dev.failsafe.AsyncExecution @received-context)))))
(is (instance? ExecutionContext @received-context)))))