Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
This is a history of changes to gateless/futurama

# 1.4.2
* **Feature**: Add utility `->future` function to easily convert values to future.

# 1.4.1
* **Bug Fix**: Ensure future is only cancelled if async item was cancelled.

Expand Down
29 changes: 26 additions & 3 deletions src/futurama/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@
(let [^BiConsumer invoke-cb (impl/->JavaBiConsumer
(fn [_ _]
(when (impl/cancelled? fut)
(future-cancel fut'))))] ;;; cancel any linked future
(impl/cancel! fut'))))] ;;; cancel any linked future
(.whenComplete ^CompletableFuture fut ^BiConsumer invoke-cb)
fut))
(cancel! [fut]
Expand Down Expand Up @@ -861,7 +861,7 @@
(on-cancel-interrupt [dfd fut]
(let [interrupt-handler (fn [_]
(when (impl/cancelled? dfd)
(future-cancel fut)))]
(impl/cancel! fut)))]
(d/on-realized dfd interrupt-handler interrupt-handler)
dfd))
(cancelled? [dfd]
Expand Down Expand Up @@ -889,7 +889,30 @@
(instance? PromiseBuffer))
(async/take! c (fn [_]
(when (impl/cancelled? c)
(future-cancel fut)))))
(impl/cancel! fut)))))
c)
(cancelled? [c]
(get-cancel-state c)))

(defn ->future
"Converts any value into a CompletableFuture, reading
through any async result returned inside or returning
a completed future for non-async objects."
[val]
(cond
(instance? CompletableFuture val)
val

(async? val)
(let [fut (CompletableFuture.)]
(impl/on-cancel-interrupt fut val)
(async
(try
(let [res (!<! val)]
(impl/complete! fut res))
(catch Throwable rex
(impl/complete! fut rex))))
fut)

:else
(CompletableFuture/completedFuture val)))
45 changes: 45 additions & 0 deletions test/futurama/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,31 @@
(Executors/newFixedThreadPool 10)))

(deftest cancel-async-test
(testing "cancellable ->future is interrupted test"
(with-pool @test-pool
(let [interrupted (atom false)
a (promise)
s (atom 0)
f (future
(try
(while (not (async-cancelled?)) ;;; this loop goes on infinitely until the thread is interrupted
(Thread/sleep 90)
(println "thread looping..." (swap! s inc)))
(println "ended thread looping.")
(deliver a true)
(catch InterruptedException e
(println "interrupted looping by:" (type e))
(reset! interrupted true)
(deliver a true))))
f' (f/->future f)]
(is (true? (async-cancellable? f)))
(go
(<! (timeout 100))
(async-cancel! f')) ;;; cancelling the ->future causes the converted async object to be interrupted
(is (true? @a))
(is (true? (async-cancelled? f)))
(is (true? (async-completed? f)))
(is (true? @interrupted)))))
(testing "cancellable future is interrupted test"
(with-pool @test-pool
(let [interrupted (atom false)
Expand Down Expand Up @@ -512,3 +537,23 @@
(!<! (async
(throw (ex-info "foobar" {}))
::result))))))))

(deftest test-future-conversion
(testing "can convert any async result to CompletableFuture - success"
(let [fut (f/->future (async ::foobar))]
(is (= ::foobar @fut))))
(testing "can convert any async result to CompletableFuture - failure"
(let [fut (f/->future (async (throw (ex-info "foobar" {}))))]
(is (thrown-with-msg? Exception #"foobar" @fut))))
(testing "can convert any thread result to CompletableFuture"
(let [fut (f/->future (thread ::foobar))]
(is (= ::foobar @fut))))
(testing "can use CompletableFuture as CompletableFuture"
(let [fut' (CompletableFuture/completedFuture ::foobar)
fut (f/->future fut')]
(is (= ::foobar @fut))
(is (identical? fut' fut))))
(testing "can use non-async value as CompletableFuture"
(let [val ::foobar
fut (f/->future val)]
(is (= ::foobar @fut)))))