Skip to content

Commit 11419e0

Browse files
committed
Fix dropped errors in CompletionStage's *EitherAsync methods
For `*Async` variants of `CompletionStage` methods, the (first) deferred argument was wrapped with `onto` so that its callbacks are invoked on the given `executor`. This results in a new deferred connected to the original one. In case of the `*EitherAsync` methods, that new deferred would then leak when `alt` chooses the `other` one but the `onto` deferred ends up in an error state since there was no way for the caller to attach an error handler to it. Furthermore, for async variants of methods accepting two completion stages, `f` was never executed on the given executor. For example, `then-combine` was defined like this: (defn- then-combine [d other ^BiFunction f] (assert-some other f) (fmap-deferred (zip d other) (fn [[x y]] (.apply f x y)))) Now `then-combine-async` (defined via `def-async-for-dual`) would wrap `d` with `(onto d executor)` before calling `then-combine`. However, the deferred passed to `fmap-deferred` was the one returned from `(zip d other)` which would not have the given executor attached. Thus, `f` would have been executed on a different executor than desired. This patch addresses both issues by adding a `to` deferred as the final argument to every method implementation. This deferred is bound to the desired `executor`. The operation `f` is then attached as a callback to that `to` deferred and the deferred of the main operation is connected to it. For synchronous implementations, `nil` can be passed as `to` in which case the callback is attached directly to the main operation deferred to save some overhead. This means that now the original deferred is passed to `alt` so that all error states can be handled by the caller. It also removes the `def-async-for` and `def-async-for-dual` macros and opts to instead always explicitly pass the `to` deferred in all method implementations. This isn't much more verbose but hopefully serves to make the code bit easier to follow. Finally, it renames `fmap-deferred` to `shallow-chain` for consistency with the established `chain` naming and introduces the analogous `shallow-connect` (also private for now).
1 parent b634e95 commit 11419e0

File tree

1 file changed

+118
-141
lines changed

1 file changed

+118
-141
lines changed

src/manifold/deferred.clj

Lines changed: 118 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,9 @@
7474

7575
to-completable-future
7676

77-
when-complete when-complete-async)
77+
when-complete when-complete-async
78+
79+
cs-default-executor)
7880

7981
;; The potemkin abstract type for
8082
;; implementations such as CompletionStage
@@ -84,94 +86,99 @@
8486
ADeferred
8587
CompletionStage
8688
(thenApply [d f]
87-
(then-apply d f))
89+
(then-apply d f nil))
8890
(thenApplyAsync [d f]
89-
(then-apply-async d f))
91+
(then-apply d f (cs-default-executor)))
9092
(thenApplyAsync [d f executor]
91-
(then-apply-async d f executor))
93+
(then-apply d f executor))
9294

9395
(thenAccept [d f]
94-
(then-accept d f))
96+
(then-accept d f nil))
9597
(thenAcceptAsync [d f]
96-
(then-accept-async d f))
98+
(then-accept d f (cs-default-executor)))
9799
(thenAcceptAsync [d f executor]
98-
(then-accept-async d f executor))
100+
(then-accept d f executor))
99101

100102
(thenRun [d f]
101-
(then-run d f))
103+
(then-run d f nil))
102104
(thenRunAsync [d f]
103-
(then-run-async d f))
105+
(then-run d f (cs-default-executor)))
104106
(thenRunAsync [d f executor]
105-
(then-run-async d f executor))
107+
(then-run d f executor))
106108

107109
(thenCombine [d other f]
108-
(then-combine d other f))
110+
(then-combine d other f nil))
109111
(thenCombineAsync [d other f]
110-
(then-combine-async d other f))
112+
(then-combine d other f (cs-default-executor)))
111113
(thenCombineAsync [d other f executor]
112-
(then-combine-async d other f executor))
114+
(then-combine d other f executor))
113115

114116
(thenAcceptBoth [d other f]
115-
(then-accept-both d other f))
117+
(then-accept-both d other f nil))
116118
(thenAcceptBothAsync [d other f]
117-
(then-accept-both-async d other f))
119+
(then-accept-both d other f (cs-default-executor)))
118120
(thenAcceptBothAsync [d other f executor]
119-
(then-accept-both-async d other f executor))
121+
(then-accept-both d other f executor))
120122

121123
(runAfterBoth [d other f]
122-
(run-after-both d other f))
124+
(run-after-both d other f nil))
123125
(runAfterBothAsync [d other f]
124-
(run-after-both-async d other f))
126+
(run-after-both d other f (cs-default-executor)))
125127
(runAfterBothAsync [d other f executor]
126-
(run-after-both-async d other f executor))
128+
(run-after-both d other f executor))
127129

128130
(applyToEither [d other f]
129-
(apply-to-either d other f))
131+
(apply-to-either d other f nil))
130132
(applyToEitherAsync [d other f]
131-
(apply-to-either-async d other f))
133+
(apply-to-either d other f (cs-default-executor)))
132134
(applyToEitherAsync [d other f executor]
133-
(apply-to-either-async d other f executor))
135+
(apply-to-either d other f executor))
134136

135137
(acceptEither [d other f]
136-
(accept-either d other f))
138+
(accept-either d other f nil))
137139
(acceptEitherAsync [d other f]
138-
(accept-either-async d other f))
140+
(accept-either d other f (cs-default-executor)))
139141
(acceptEitherAsync [d other f executor]
140-
(accept-either-async d other f executor))
142+
(accept-either d other f executor))
141143

142144
(runAfterEither [d other f]
143-
(run-after-either d other f))
145+
(run-after-either d other f nil))
144146
(runAfterEitherAsync [d other f]
145-
(run-after-either-async d other f))
147+
(run-after-either d other f (cs-default-executor)))
146148
(runAfterEitherAsync [d other f executor]
147-
(run-after-either-async d other f executor))
149+
(run-after-either d other f executor))
148150

149151
(thenCompose [d f]
150-
(then-compose d f))
152+
(then-compose d f nil))
151153
(thenComposeAsync [d f]
152-
(then-compose-async d f))
154+
(then-compose d f (cs-default-executor)))
153155
(thenComposeAsync [d f executor]
154-
(then-compose-async d f executor))
156+
(then-compose d f executor))
155157

156158
(handle [d f]
157-
(then-handle d f))
159+
(then-handle d f nil))
158160
(handleAsync [d f]
159-
(then-handle-async d f))
161+
(then-handle d f (cs-default-executor)))
160162
(handleAsync [d f executor]
161-
(then-handle-async d f executor))
163+
(then-handle d f executor))
162164

163165
(exceptionally [d f]
164-
(then-exceptionally d f))
165-
166-
(toCompletableFuture [d]
167-
(to-completable-future d))
166+
(then-exceptionally d f nil))
167+
;; Only available since Java 12
168+
;; (exceptionallyAsync [d f]
169+
;; (then-exceptionally d f (cs-default-executor)))
170+
;; (exceptionallyAsync [d f executor]
171+
;; (then-exceptionally d executor))
168172

169173
(whenComplete [d f]
170-
(when-complete d f))
174+
(when-complete d f nil))
171175
(whenCompleteAsync [d f]
172-
(when-complete-async d f))
176+
(when-complete d f (cs-default-executor)))
173177
(whenCompleteAsync [d f executor]
174-
(when-complete-async d f executor)))
178+
(when-complete d f executor))
179+
180+
(toCompletableFuture [d]
181+
(to-completable-future d)))
175182

176183
(definline realized?
177184
"Returns true if the manifold deferred is realized."
@@ -1538,141 +1545,111 @@
15381545
;; CompletionStage helper fns
15391546
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
15401547

1541-
(defmacro ^:no-doc def-async-for
1542-
"Defines a CompletionStage async version of the function associated with
1543-
the given symbol, with '-async' appended."
1544-
[fn-name]
1545-
(let [async-name (symbol (str (name fn-name) "-async"))]
1546-
`(defn- ~async-name
1547-
([d# f#]
1548-
(~async-name d# f# (or (ex/executor) (ex/execute-pool))))
1549-
([d# f# executor#]
1550-
(~fn-name (onto d# executor#) f#)))))
1551-
1552-
(defmacro ^:no-doc def-async-for-dual
1553-
"Defines a CompletionStage async version of the two-deferred
1554-
function associated with the given symbol, with '-async' appended."
1555-
[fn-name]
1556-
(let [async-name (symbol (str (name fn-name) "-async"))]
1557-
`(defn- ~async-name
1558-
([d# d2# f#]
1559-
(~async-name d# d2# f# (or (ex/executor) (ex/execute-pool))))
1560-
([d# d2# f# executor#]
1561-
(~fn-name (onto d# executor#) d2# f#)))))
1562-
1563-
(defn- fmap-deferred
1564-
"Returns a new deferred with function `f` applies to realized value of `d`.
1565-
(Like fmap but for deferreds.)
1566-
1567-
This function does not unwrap the result of f; it will only be applied to
1568-
the immediate value of `d`. This is for mimicking CompletionStage's
1569-
behavior."
1548+
(defn- cs-default-executor []
1549+
(or (ex/executor) (ex/execute-pool)))
1550+
1551+
(defn- shallow-connect
1552+
"Like `connect` but without implicit unwrapping of conveyed value."
1553+
[from to]
1554+
(on-realized from
1555+
(fn [val] (success! to val))
1556+
(fn [error] (error! to error))))
1557+
1558+
(defn- shallow-chain
1559+
"Returns a new deferred with function `f` applied to realized value of `d`.
1560+
1561+
Unlike `chain`, this function does not unwrap the result of `f`; it will only be applied to the
1562+
immediate value of `d`. This is for mimicking `CompletionStage`'s behavior."
15701563
[d f]
15711564
(let [d' (deferred)]
15721565
(on-realized d
15731566
(fn [val] (success! d' (f val)))
15741567
(fn [error] (error! d' error)))
15751568
d'))
15761569

1577-
(defn- then-apply [d ^Function f]
1578-
(assert-some f)
1579-
(fmap-deferred d #(.apply f %)))
1580-
1581-
(def-async-for then-apply)
1582-
1583-
(defn- then-accept [d ^Consumer c]
1584-
(assert-some c)
1585-
(fmap-deferred d #(.accept c %)))
1570+
(defn- shallow-onto [^IDeferred d executor]
1571+
(if (identical? executor (.executor d))
1572+
d
1573+
(let [d' (deferred executor)]
1574+
(shallow-connect d d')
1575+
d')))
15861576

1587-
(def-async-for then-accept)
1577+
(defn- shallow-chain-onto [d f executor]
1578+
(-> d
1579+
(shallow-onto executor)
1580+
(shallow-chain f)))
15881581

1589-
(defn- then-run [d ^Runnable f]
1582+
(defn- then-apply [d ^Function f executor]
15901583
(assert-some f)
1591-
(fmap-deferred d (fn [_] (.run f))))
1584+
(shallow-chain-onto d #(.apply f %) executor))
15921585

1593-
(def-async-for then-run)
1586+
(defn- then-accept [d ^Consumer c executor]
1587+
(assert-some c)
1588+
(shallow-chain-onto d #(.accept c %) executor))
15941589

1590+
(defn- then-run [d ^Runnable f executor]
1591+
(assert-some f)
1592+
(shallow-chain-onto d (fn [_] (.run f)) executor))
15951593

1596-
(defn- then-combine [d other ^BiFunction f]
1594+
(defn- then-combine [d other ^BiFunction f executor]
15971595
(assert-some other f)
1598-
(fmap-deferred (zip d other)
1599-
(fn [[x y]] (.apply f x y))))
1600-
1601-
(def-async-for-dual then-combine)
1596+
(shallow-chain-onto (zip d other)
1597+
(fn [[x y]] (.apply f x y))
1598+
executor))
16021599

1603-
1604-
(defn- then-accept-both [d other ^BiConsumer f]
1600+
(defn- then-accept-both [d other ^BiConsumer f executor]
16051601
(assert-some other f)
1606-
(fmap-deferred (zip d other)
1607-
(fn [[x y]] (.accept f x y))))
1608-
1609-
(def-async-for-dual then-accept-both)
1602+
(shallow-chain-onto (zip d other)
1603+
(fn [[x y]] (.accept f x y))
1604+
executor))
16101605

1611-
1612-
(defn- run-after-both [d other ^Runnable f]
1606+
(defn- run-after-both [d other ^Runnable f executor]
16131607
(assert-some other f)
1614-
(fmap-deferred (zip d other)
1615-
(fn [[_ _]] (.run f))))
1616-
1617-
1618-
(def-async-for-dual run-after-both)
1608+
(shallow-chain-onto (zip d other)
1609+
(fn [[_ _]] (.run f))
1610+
executor))
16191611

1620-
1621-
(defn- apply-to-either [d other ^Function f]
1612+
(defn- apply-to-either [d other ^Function f executor]
16221613
(assert-some other f)
1623-
(then-apply (alt d other) f))
1624-
1625-
(def-async-for-dual apply-to-either)
1614+
(then-apply (alt d other) f executor))
16261615

1627-
1628-
(defn- accept-either [d other ^Function f]
1616+
(defn- accept-either [d other ^Function f executor]
16291617
(assert-some other f)
1630-
(then-accept (alt d other) f))
1631-
1632-
(def-async-for-dual accept-either)
1633-
1618+
(then-accept (alt d other) f executor))
16341619

1635-
(defn- run-after-either [d other ^Function f]
1620+
(defn- run-after-either [d other ^Function f executor]
16361621
(assert-some other f)
1637-
(then-run (alt d other) f))
1638-
1639-
(def-async-for-dual run-after-either)
1640-
1622+
(then-run (alt d other) f executor))
16411623

1642-
(defn- then-compose [d ^Function f]
1624+
(defn- then-compose [d ^Function f executor]
16431625
(assert-some f)
16441626
(let [d' (deferred)]
1645-
(on-realized d
1646-
(fn [val]
1647-
(on-realized (->deferred (.apply f val))
1648-
#(success! d' %)
1649-
#(error! d' %)))
1650-
(fn [error] (error! d' error)))
1627+
(-> (shallow-chain-onto d #(->deferred (.apply f %)) executor)
1628+
(on-realized (fn [fd]
1629+
(shallow-connect fd d'))
1630+
(fn [error]
1631+
(error! d' error))))
16511632
d'))
16521633

1653-
(def-async-for then-compose)
1654-
1655-
1656-
(defn- then-handle [d ^BiFunction f]
1634+
(defn- then-handle [d ^BiFunction f executor]
16571635
(assert-some f)
1636+
;; Can't use `shallow-chain-onto` here because it only covers the success case.
16581637
(let [d' (deferred)]
16591638
(on-realized
1660-
d
1639+
(shallow-onto d executor)
16611640
(fn [val] (success! d' (.apply f val nil)))
16621641
(fn [error] (success! d' (.apply f nil error))))
16631642
d'))
16641643

1665-
1666-
(def-async-for then-handle)
1667-
1668-
1669-
(defn- then-exceptionally [d ^Function f]
1644+
(defn- then-exceptionally [d ^Function f executor]
16701645
(assert-some f)
1646+
;; Can't use `shallow-chain-onto` here because it only covers
1647+
;; the success case.
16711648
(let [d' (deferred)]
16721649
(on-realized
1673-
d
1674-
(fn [val] (success! d' val))
1675-
(fn [error] (success! d' (.apply f error))))
1650+
(shallow-onto d executor)
1651+
(fn [val] (success! d' val))
1652+
(fn [error] (success! d' (.apply f error))))
16761653
d'))
16771654

16781655
(defn- to-completable-future [d]
@@ -1685,10 +1662,12 @@
16851662

16861663
result))
16871664

1688-
(defn- when-complete [d ^BiConsumer f]
1665+
(defn- when-complete [d ^BiConsumer f executor]
16891666
(assert-some f)
1667+
;; Can't use `shallow-chain-onto` here because it only covers
1668+
;; the success case.
16901669
(let [d' (deferred)]
1691-
(on-realized d
1670+
(on-realized (shallow-onto d executor)
16921671
(fn [val]
16931672
(try (.accept f val nil)
16941673
(success! d' val)
@@ -1701,8 +1680,6 @@
17011680
(error! d' err)))))
17021681
d'))
17031682

1704-
(def-async-for when-complete)
1705-
17061683
;;;
17071684

17081685
(alter-meta! #'->Deferred assoc :private true)

0 commit comments

Comments
 (0)