@@ -28,8 +28,8 @@ import Dispatch
2828/// // or wait synchronously for completion
2929/// operation.waitUntilFinished()
3030/// ```
31- public final class TaskOperation < R: Sendable > : Operation , AsyncObject ,
32- ContinuableCollection , Loggable , @unchecked Sendable
31+ public final class TaskOperation < R: Sendable > : Operation , AsyncObject , Loggable ,
32+ @unchecked Sendable
3333{
3434 /// The asynchronous action to perform as part of the operation..
3535 private let underlyingAction : @Sendable ( ) async throws -> R
@@ -40,6 +40,12 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
4040 /// synchronize data access and modifications.
4141 @usableFromInline
4242 internal let locker : Locker
43+ /// The stream that propagates the operation
44+ /// finish event.
45+ internal var event : AsyncStream < Void > !
46+ /// The continuation of `stream` that controls the asynchronous wait
47+ /// for operation completion.
48+ internal var waiter : AsyncStream < Void > . Continuation !
4349
4450 /// A type representing a set of behaviors for the executed
4551 /// task type and task completion behavior.
@@ -116,9 +122,8 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
116122 willChangeValue ( forKey: " isFinished " )
117123 locker. perform {
118124 _isFinished = newValue
119- guard newValue, !continuations. isEmpty else { return }
120- continuations. forEach { $1. resume ( ) }
121- continuations = [ : ]
125+ guard newValue else { return }
126+ waiter. finish ( )
122127 }
123128 didChangeValue ( forKey: " isFinished " )
124129 }
@@ -166,11 +171,14 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
166171 self . flags = flags
167172 self . underlyingAction = operation
168173 super. init ( )
174+ self . event = AsyncStream (
175+ bufferingPolicy: . bufferingOldest( 1 )
176+ ) { self . waiter = $0 }
169177 }
170178
171179 deinit {
172180 execTask? . cancel ( )
173- locker . perform { self . continuations . forEach { $1 . cancel ( ) } }
181+ waiter . finish ( )
174182 }
175183
176184 /// Begins the execution of the operation.
@@ -222,99 +230,6 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
222230 }
223231
224232 // MARK: AsyncObject
225- /// The suspended tasks continuation type.
226- @usableFromInline
227- internal typealias Continuation = TrackedContinuation <
228- GlobalContinuation < Void , Error >
229- >
230- /// The continuations stored with an associated key for all the suspended task that are waiting for operation completion.
231- @usableFromInline
232- internal private( set) var continuations : [ UUID : Continuation ] = [ : ]
233-
234- /// Add continuation with the provided key in `continuations` map.
235- ///
236- /// - Parameters:
237- /// - continuation: The `continuation` to add.
238- /// - key: The key in the map.
239- /// - file: The file add request originates from (there's usually no need to pass it
240- /// explicitly as it defaults to `#fileID`).
241- /// - function: The function add request originates from (there's usually no need to
242- /// pass it explicitly as it defaults to `#function`).
243- /// - line: The line add request originates from (there's usually no need to pass it
244- /// explicitly as it defaults to `#line`).
245- /// - preinit: The pre-initialization handler to run
246- /// in the beginning of this method.
247- ///
248- /// - Important: The pre-initialization handler must run
249- /// before any logic in this method.
250- @inlinable
251- internal func addContinuation(
252- _ continuation: Continuation ,
253- withKey key: UUID ,
254- file: String , function: String , line: UInt ,
255- preinit: @Sendable ( ) -> Void
256- ) {
257- locker. perform {
258- preinit ( )
259- log ( " Adding " , id: key, file: file, function: function, line: line)
260- guard !continuation. resumed else {
261- log (
262- " Already resumed, not tracking " , id: key,
263- file: file, function: function, line: line
264- )
265- return
266- }
267-
268- guard !isFinished else {
269- continuation. resume ( )
270- log (
271- " Resumed " , id: key,
272- file: file, function: function, line: line
273- )
274- return
275- }
276-
277- continuations [ key] = continuation
278- log ( " Tracking " , id: key, file: file, function: function, line: line)
279- }
280- }
281-
282- /// Remove continuation associated with provided key
283- /// from `continuations` map.
284- ///
285- /// - Parameters:
286- /// - continuation: The continuation to remove and cancel.
287- /// - key: The key in the map.
288- /// - file: The file remove request originates from (there's usually no need to pass it
289- /// explicitly as it defaults to `#fileID`).
290- /// - function: The function remove request originates from (there's usually no need to
291- /// pass it explicitly as it defaults to `#function`).
292- /// - line: The line remove request originates from (there's usually no need to pass it
293- /// explicitly as it defaults to `#line`).
294- @inlinable
295- internal func removeContinuation(
296- _ continuation: Continuation ,
297- withKey key: UUID ,
298- file: String , function: String , line: UInt
299- ) {
300- locker. perform {
301- log ( " Removing " , id: key, file: file, function: function, line: line)
302- continuations. removeValue ( forKey: key)
303- guard !continuation. resumed else {
304- log (
305- " Already resumed, not cancelling " , id: key,
306- file: file, function: function, line: line
307- )
308- return
309- }
310-
311- continuation. cancel ( )
312- log (
313- " Cancelled " , id: key,
314- file: file, function: function, line: line
315- )
316- }
317- }
318233
319234 /// Starts operation asynchronously
320235 /// as part of a new top-level task on behalf of the current actor.
@@ -356,18 +271,28 @@ public final class TaskOperation<R: Sendable>: Operation, AsyncObject,
356271 function: String = #function,
357272 line: UInt = #line
358273 ) async throws {
359- guard !isFinished else {
360- log ( " Finished " , file: file, function: function, line: line)
361- return
362- }
363-
364274 let key = UUID ( )
365275 log ( " Waiting " , id: key, file: file, function: function, line: line)
366- try await withPromisedContinuation (
367- withKey: key,
368- file: file, function: function, line: line
369- )
370- log ( " Finished " , id: key, file: file, function: function, line: line)
276+ for await _ in event { break }
277+ do {
278+ try Task . checkCancellation ( )
279+ } catch {
280+ log (
281+ " Cancelled " , id: key,
282+ file: file, function: function, line: line
283+ )
284+ throw error
285+ }
286+ do {
287+ let _ = try await execTask? . value
288+ log ( " Finished " , id: key, file: file, function: function, line: line)
289+ } catch {
290+ log (
291+ " Finished with error: \( error) " , id: key,
292+ file: file, function: function, line: line
293+ )
294+ throw error
295+ }
371296 }
372297}
373298
0 commit comments