From 42950e082ff11a38ecd2eabd61f9d2130f4a00bf Mon Sep 17 00:00:00 2001 From: Stefano Mondino Date: Mon, 22 Sep 2025 23:43:24 +0200 Subject: [PATCH 01/15] Basic setup to compile below iOS 18.0 and similar --- .../AsyncAlgorithms/AsyncShareSequence.swift | 59 ++++---- Sources/AsyncAlgorithms/Shims.swift | 18 +++ Tests/AsyncAlgorithmsTests/TestShare.swift | 140 +++++++++--------- 3 files changed, 116 insertions(+), 101 deletions(-) create mode 100644 Sources/AsyncAlgorithms/Shims.swift diff --git a/Sources/AsyncAlgorithms/AsyncShareSequence.swift b/Sources/AsyncAlgorithms/AsyncShareSequence.swift index 6c76a4d1..72561092 100644 --- a/Sources/AsyncAlgorithms/AsyncShareSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncShareSequence.swift @@ -9,14 +9,12 @@ // //===----------------------------------------------------------------------===// -#if compiler(>=6.2) - import Synchronization import DequeModule -@available(AsyncAlgorithms 1.1, *) +@available(AsyncAlgorithms 1.0, *) extension AsyncSequence -where Element: Sendable, Self: SendableMetatype, AsyncIterator: SendableMetatype { +where Element: Sendable, Self: _SendableMetatype, AsyncIterator: _SendableMetatype { /// Creates a shared async sequence that allows multiple concurrent iterations over a single source. /// /// The `share` method transforms an async sequence into a shareable sequence that can be safely @@ -67,7 +65,7 @@ where Element: Sendable, Self: SendableMetatype, AsyncIterator: SendableMetatype /// public func share( bufferingPolicy: AsyncBufferSequencePolicy = .bounded(1) - ) -> some AsyncSequence & Sendable { + ) -> AsyncShareSequence { // The iterator is transferred to the isolation of the iterating task // this has to be done "unsafely" since we cannot annotate the transfer // however since iterating an AsyncSequence types twice has been defined @@ -114,9 +112,9 @@ where Element: Sendable, Self: SendableMetatype, AsyncIterator: SendableMetatype // // This type is typically not used directly; instead, use the `share()` method on any // async sequence that meets the sendability requirements. -@available(AsyncAlgorithms 1.1, *) -struct AsyncShareSequence: Sendable -where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: SendableMetatype { +@available(AsyncAlgorithms 1.0, *) +public struct AsyncShareSequence: Sendable +where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _SendableMetatype { // Represents a single consumer's connection to the shared sequence. // // Each iterator of the shared sequence creates its own `Side` instance, which tracks @@ -135,7 +133,7 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab // - `continuation`: The continuation waiting for the next element (nil if not waiting) // - `position`: The consumer's current position in the shared buffer struct State { - var continuation: UnsafeContinuation, Never>? + var continuation: UnsafeContinuation, Never>? var position = 0 // Creates a new state with the position adjusted by the given offset. @@ -162,7 +160,7 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab iteration.unregisterSide(id) } - func next(isolation actor: isolated (any Actor)?) async throws(Failure) -> Element? { + func next(isolation actor: isolated (any Actor)?) async throws -> Base.Element? { try await iteration.next(isolation: actor, id: id) } } @@ -230,9 +228,9 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab var generation = 0 var sides = [Int: Side.State]() var iteratingTask: IteratingTask - private(set) var buffer = Deque() + private(set) var buffer = Deque() private(set) var finished = false - private(set) var failure: Failure? + private(set) var failure: Error? var cancelled = false var limit: UnsafeContinuation? var demand: UnsafeContinuation? @@ -311,7 +309,7 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab // **Buffering Newest**: Appends if under the limit, otherwise removes the oldest and appends // // - Parameter element: The element to add to the buffer - mutating func enqueue(_ element: Element) { + mutating func enqueue(_ element: Base.Element) { let count = buffer.count switch storagePolicy { @@ -335,20 +333,20 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab finished = true } - mutating func fail(_ error: Failure) { + mutating func fail(_ error: Error) { finished = true failure = error } } - let state: Mutex + let state: ManagedCriticalState let limit: Int? init( _ iteratorFactory: @escaping @Sendable () -> sending Base.AsyncIterator, bufferingPolicy: AsyncBufferSequencePolicy ) { - state = Mutex(State(iteratorFactory, bufferingPolicy: bufferingPolicy)) + state = ManagedCriticalState(State(iteratorFactory, bufferingPolicy: bufferingPolicy)) switch bufferingPolicy.policy { case .bounded(let limit): self.limit = limit @@ -478,15 +476,15 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab } struct Resumption { - let continuation: UnsafeContinuation, Never> - let result: Result + let continuation: UnsafeContinuation, Never> + let result: Result func resume() { continuation.resume(returning: result) } } - func emit(_ result: Result) { + func emit(_ result: Result) { let (resumptions, limitContinuation, demandContinuation, cancelled) = state.withLock { state -> ([Resumption], UnsafeContinuation?, UnsafeContinuation?, Bool) in var resumptions = [Resumption]() @@ -533,12 +531,12 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab private func nextIteration( _ id: Int - ) async -> Result.Element?, AsyncShareSequence.Failure> { + ) async -> Result { return await withTaskCancellationHandler { await withUnsafeContinuation { continuation in let (res, limitContinuation, demandContinuation, cancelled) = state.withLock { state -> ( - Result?, UnsafeContinuation?, UnsafeContinuation?, Bool + Result?, UnsafeContinuation?, UnsafeContinuation?, Bool ) in guard let side = state.sides[id] else { return state.emit(.success(nil), limit: limit) @@ -587,11 +585,11 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab } } } catch { - emit(.failure(error as! Failure)) + emit(.failure(error)) } } - func next(isolation actor: isolated (any Actor)?, id: Int) async throws(Failure) -> Element? { + func next(isolation actor: isolated (any Actor)?, id: Int) async throws -> Base.Element? { let (factory, cancelled) = state.withLock { state -> ((@Sendable () -> sending Base.AsyncIterator)?, Bool) in switch state.iteratingTask { case .pending(let factory): @@ -697,30 +695,29 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab } } -@available(AsyncAlgorithms 1.1, *) +@available(AsyncAlgorithms 1.0, *) extension AsyncShareSequence: AsyncSequence { - typealias Element = Base.Element - typealias Failure = Base.Failure + public typealias Element = Base.Element + public typealias Failure = Swift.Error - struct Iterator: AsyncIteratorProtocol { + public struct Iterator: AsyncIteratorProtocol { let side: Side init(_ iteration: Iteration) { side = Side(iteration) } - mutating func next() async rethrows -> Element? { + mutating public func next() async rethrows -> Element? { try await side.next(isolation: nil) } - mutating func next(isolation actor: isolated (any Actor)?) async throws(Failure) -> Element? { + mutating public func next(isolation actor: isolated (any Actor)?) async throws(Failure) -> Element? { try await side.next(isolation: actor) } } - func makeAsyncIterator() -> Iterator { + public func makeAsyncIterator() -> Iterator { Iterator(extent.iteration) } } -#endif diff --git a/Sources/AsyncAlgorithms/Shims.swift b/Sources/AsyncAlgorithms/Shims.swift new file mode 100644 index 00000000..3e33b76b --- /dev/null +++ b/Sources/AsyncAlgorithms/Shims.swift @@ -0,0 +1,18 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +import Foundation + +#if compiler(>=6.2) +public typealias _SendableMetatype = SendableMetatype +#else +public typealias _SendableMetatype = Any +#endif diff --git a/Tests/AsyncAlgorithmsTests/TestShare.swift b/Tests/AsyncAlgorithmsTests/TestShare.swift index ac817536..6818903c 100644 --- a/Tests/AsyncAlgorithmsTests/TestShare.swift +++ b/Tests/AsyncAlgorithmsTests/TestShare.swift @@ -20,7 +20,7 @@ final class TestShare: XCTestCase { // MARK: - Basic Functionality Tests - func test_share_delivers_elements_to_multiple_consumers() async { + func test_share_delivers_elements_to_multiple_consumers() async throws { let source = [1, 2, 3, 4, 5] let shared = source.async.share() let gate1 = Gate() @@ -31,7 +31,7 @@ final class TestShare: XCTestCase { var iterator = shared.makeAsyncIterator() gate1.open() await gate2.enter() - while let value = await iterator.next(isolation: nil) { + while let value = try await iterator.next(isolation: nil) { results.append(value) } return results @@ -42,13 +42,13 @@ final class TestShare: XCTestCase { var iterator = shared.makeAsyncIterator() gate2.open() await gate1.enter() - while let value = await iterator.next(isolation: nil) { + while let value = try await iterator.next(isolation: nil) { results.append(value) } return results } - let results1 = await consumer1.value - let results2 = await consumer2.value + let results1 = try await consumer1.value + let results2 = try await consumer2.value XCTAssertEqual(results1, [1, 2, 3, 4, 5]) XCTAssertEqual(results2, [1, 2, 3, 4, 5]) @@ -80,12 +80,12 @@ final class TestShare: XCTestCase { // MARK: - Buffering Policy Tests - func test_share_with_bounded_buffering() async { + func test_share_with_bounded_buffering() async throws { var gated = GatedSequence([1, 2, 3, 4, 5]) let shared = gated.share(bufferingPolicy: .bounded(2)) - let results1 = Mutex([Int]()) - let results2 = Mutex([Int]()) + let results1 = ManagedCriticalState([Int]()) + let results2 = ManagedCriticalState([Int]()) let gate1 = Gate() let gate2 = Gate() @@ -94,13 +94,13 @@ final class TestShare: XCTestCase { gate1.open() await gate2.enter() // Consumer 1 reads first element - if let value = await iterator.next(isolation: nil) { + if let value = try await iterator.next(isolation: nil) { results1.withLock { $0.append(value) } } // Delay to allow consumer 2 to get ahead - try? await Task.sleep(for: .milliseconds(10)) + try? await Task.sleep(nanoseconds: 10_000_000) // Continue reading - while let value = await iterator.next(isolation: nil) { + while let value = try await iterator.next(isolation: nil) { results1.withLock { $0.append(value) } } } @@ -110,7 +110,7 @@ final class TestShare: XCTestCase { gate2.open() await gate1.enter() // Consumer 2 reads all elements quickly - while let value = await iterator.next(isolation: nil) { + while let value = try await iterator.next(isolation: nil) { results2.withLock { $0.append(value) } } } @@ -122,20 +122,20 @@ final class TestShare: XCTestCase { gated.advance() // 4 gated.advance() // 5 - await consumer1.value - await consumer2.value + try await consumer1.value + try await consumer2.value // Both consumers should receive all elements XCTAssertEqual(results1.withLock { $0 }.sorted(), [1, 2, 3, 4, 5]) XCTAssertEqual(results2.withLock { $0 }.sorted(), [1, 2, 3, 4, 5]) } - func test_share_with_unbounded_buffering() async { + func test_share_with_unbounded_buffering() async throws { let source = [1, 2, 3, 4, 5] let shared = source.async.share(bufferingPolicy: .unbounded) - let results1 = Mutex([Int]()) - let results2 = Mutex([Int]()) + let results1 = ManagedCriticalState([Int]()) + let results2 = ManagedCriticalState([Int]()) let gate1 = Gate() let gate2 = Gate() @@ -143,10 +143,10 @@ final class TestShare: XCTestCase { var iterator = shared.makeAsyncIterator() gate2.open() await gate1.enter() - while let value = await iterator.next(isolation: nil) { + while let value = try await iterator.next(isolation: nil) { results1.withLock { $0.append(value) } // Add some delay to consumer 1 - try? await Task.sleep(for: .milliseconds(1)) + try? await Task.sleep(nanoseconds: 1_000_000) } } @@ -154,24 +154,24 @@ final class TestShare: XCTestCase { var iterator = shared.makeAsyncIterator() gate1.open() await gate2.enter() - while let value = await iterator.next(isolation: nil) { + while let value = try await iterator.next(isolation: nil) { results2.withLock { $0.append(value) } } } - await consumer1.value - await consumer2.value + try await consumer1.value + try await consumer2.value XCTAssertEqual(results1.withLock { $0 }, [1, 2, 3, 4, 5]) XCTAssertEqual(results2.withLock { $0 }, [1, 2, 3, 4, 5]) } - func test_share_with_bufferingLatest_buffering() async { + func test_share_with_bufferingLatest_buffering() async throws { var gated = GatedSequence([1, 2, 3, 4, 5]) let shared = gated.share(bufferingPolicy: .bufferingLatest(2)) - let fastResults = Mutex([Int]()) - let slowResults = Mutex([Int]()) + let fastResults = ManagedCriticalState([Int]()) + let slowResults = ManagedCriticalState([Int]()) let gate1 = Gate() let gate2 = Gate() @@ -179,7 +179,7 @@ final class TestShare: XCTestCase { var iterator = shared.makeAsyncIterator() gate2.open() await gate1.enter() - while let value = await iterator.next(isolation: nil) { + while let value = try await iterator.next(isolation: nil) { fastResults.withLock { $0.append(value) } } } @@ -189,30 +189,30 @@ final class TestShare: XCTestCase { gate1.open() await gate2.enter() // Read first element immediately - if let value = await iterator.next(isolation: nil) { + if let value = try await iterator.next(isolation: nil) { slowResults.withLock { $0.append(value) } } // Add significant delay to let buffer fill up and potentially overflow - try? await Task.sleep(for: .milliseconds(50)) + try? await Task.sleep(nanoseconds: 50_000_000) // Continue reading remaining elements - while let value = await iterator.next(isolation: nil) { + while let value = try await iterator.next(isolation: nil) { slowResults.withLock { $0.append(value) } } } // Release all elements quickly to test buffer overflow behavior gated.advance() // 1 - try? await Task.sleep(for: .milliseconds(5)) + try? await Task.sleep(nanoseconds: 5_000_000) gated.advance() // 2 - try? await Task.sleep(for: .milliseconds(5)) + try? await Task.sleep(nanoseconds: 5_000_000) gated.advance() // 3 - try? await Task.sleep(for: .milliseconds(5)) + try? await Task.sleep(nanoseconds: 5_000_000) gated.advance() // 4 - try? await Task.sleep(for: .milliseconds(5)) + try? await Task.sleep(nanoseconds: 5_000_000) gated.advance() // 5 - await fastConsumer.value - await slowConsumer.value + try await fastConsumer.value + try await slowConsumer.value let slowResultsArray = slowResults.withLock { $0 } @@ -238,12 +238,12 @@ final class TestShare: XCTestCase { } } - func test_share_with_bufferingOldest_buffering() async { + func test_share_with_bufferingOldest_buffering() async throws { var gated = GatedSequence([1, 2, 3, 4, 5]) let shared = gated.share(bufferingPolicy: .bufferingOldest(2)) - let fastResults = Mutex([Int]()) - let slowResults = Mutex([Int]()) + let fastResults = ManagedCriticalState([Int]()) + let slowResults = ManagedCriticalState([Int]()) let gate1 = Gate() let gate2 = Gate() @@ -251,7 +251,7 @@ final class TestShare: XCTestCase { var iterator = shared.makeAsyncIterator() gate2.open() await gate1.enter() - while let value = await iterator.next(isolation: nil) { + while let value = try await iterator.next(isolation: nil) { fastResults.withLock { $0.append(value) } } } @@ -261,30 +261,30 @@ final class TestShare: XCTestCase { gate1.open() await gate2.enter() // Read first element immediately - if let value = await iterator.next(isolation: nil) { + if let value = try await iterator.next(isolation: nil) { slowResults.withLock { $0.append(value) } } // Add significant delay to let buffer fill up and potentially overflow - try? await Task.sleep(for: .milliseconds(50)) + try? await Task.sleep(nanoseconds: 50_000_000) // Continue reading remaining elements - while let value = await iterator.next(isolation: nil) { + while let value = try await iterator.next(isolation: nil) { slowResults.withLock { $0.append(value) } } } // Release all elements quickly to test buffer overflow behavior gated.advance() // 1 - try? await Task.sleep(for: .milliseconds(5)) + try? await Task.sleep(nanoseconds: 5_000_000) gated.advance() // 2 - try? await Task.sleep(for: .milliseconds(5)) + try? await Task.sleep(nanoseconds: 5_000_000) gated.advance() // 3 - try? await Task.sleep(for: .milliseconds(5)) + try? await Task.sleep(nanoseconds: 5_000_000) gated.advance() // 4 - try? await Task.sleep(for: .milliseconds(5)) + try? await Task.sleep(nanoseconds: 5_000_000) gated.advance() // 5 - await fastConsumer.value - await slowConsumer.value + try await fastConsumer.value + try await slowConsumer.value let slowResultsArray = slowResults.withLock { $0 } @@ -388,7 +388,7 @@ final class TestShare: XCTestCase { await fulfillment(of: [consumer2Finished], timeout: 1.0) } - func test_share_cancellation_cancels_source_when_no_consumers() async { + func test_share_cancellation_cancels_source_when_no_consumers() async throws { let source = Indefinite(value: 1).async let shared = source.share() @@ -397,11 +397,11 @@ final class TestShare: XCTestCase { let task = Task { var iterator = shared.makeAsyncIterator() - if await iterator.next(isolation: nil) != nil { + if try await iterator.next(isolation: nil) != nil { iterated.fulfill() } // Task will be cancelled here, so iteration should stop - while await iterator.next(isolation: nil) != nil { + while try await iterator.next(isolation: nil) != nil { // Continue iterating until cancelled } finished.fulfill() @@ -423,10 +423,10 @@ final class TestShare: XCTestCase { } let shared = source.share() - let consumer1Results = Mutex([Int]()) - let consumer2Results = Mutex([Int]()) - let consumer1Error = Mutex(nil) - let consumer2Error = Mutex(nil) + let consumer1Results = ManagedCriticalState([Int]()) + let consumer2Results = ManagedCriticalState([Int]()) + let consumer1Error = ManagedCriticalState(nil) + let consumer2Error = ManagedCriticalState(nil) let gate1 = Gate() let gate2 = Gate() @@ -470,17 +470,17 @@ final class TestShare: XCTestCase { // MARK: - Timing and Race Condition Tests - func test_share_with_late_joining_consumer() async { + func test_share_with_late_joining_consumer() async throws { var gated = GatedSequence([1, 2, 3, 4, 5]) let shared = gated.share(bufferingPolicy: .unbounded) - let earlyResults = Mutex([Int]()) - let lateResults = Mutex([Int]()) + let earlyResults = ManagedCriticalState([Int]()) + let lateResults = ManagedCriticalState([Int]()) // Start early consumer let earlyConsumer = Task { var iterator = shared.makeAsyncIterator() - while let value = await iterator.next(isolation: nil) { + while let value = try await iterator.next(isolation: nil) { earlyResults.withLock { $0.append(value) } } } @@ -490,12 +490,12 @@ final class TestShare: XCTestCase { gated.advance() // 2 // Give early consumer time to consume - try? await Task.sleep(for: .milliseconds(10)) + try? await Task.sleep(nanoseconds: 10_000_000) // Start late consumer let lateConsumer = Task { var iterator = shared.makeAsyncIterator() - while let value = await iterator.next(isolation: nil) { + while let value = try await iterator.next(isolation: nil) { lateResults.withLock { $0.append(value) } } } @@ -505,8 +505,8 @@ final class TestShare: XCTestCase { gated.advance() // 4 gated.advance() // 5 - await earlyConsumer.value - await lateConsumer.value + try await earlyConsumer.value + try await lateConsumer.value // Early consumer gets all elements XCTAssertEqual(earlyResults.withLock { $0 }, [1, 2, 3, 4, 5]) @@ -514,7 +514,7 @@ final class TestShare: XCTestCase { XCTAssertTrue(lateResults.withLock { $0.count <= 5 }) } - func test_share_iterator_independence() async { + func test_share_iterator_independence() async throws { let source = [1, 2, 3, 4, 5] let shared = source.async.share() @@ -522,11 +522,11 @@ final class TestShare: XCTestCase { var iterator2 = shared.makeAsyncIterator() // Both iterators should independently get the same elements - let value1a = await iterator1.next(isolation: nil) - let value2a = await iterator2.next(isolation: nil) + let value1a = try await iterator1.next(isolation: nil) + let value2a = try await iterator2.next(isolation: nil) - let value1b = await iterator1.next(isolation: nil) - let value2b = await iterator2.next(isolation: nil) + let value1b = try await iterator1.next(isolation: nil) + let value2b = try await iterator2.next(isolation: nil) XCTAssertEqual(value1a, 1) XCTAssertEqual(value2a, 1) @@ -536,7 +536,7 @@ final class TestShare: XCTestCase { // MARK: - Memory and Resource Management Tests - func test_share_cleans_up_when_all_consumers_finish() async { + func test_share_cleans_up_when_all_consumers_finish() async throws { let source = [1, 2, 3] let shared = source.async.share() @@ -549,7 +549,7 @@ final class TestShare: XCTestCase { // Create a new iterator after the sequence finished var newIterator = shared.makeAsyncIterator() - let value = await newIterator.next(isolation: nil) + let value = try await newIterator.next(isolation: nil) XCTAssertNil(value) // Should return nil since source is exhausted } From 8c13d4be17799c2b97c79af5fa087356637a4913 Mon Sep 17 00:00:00 2001 From: Stefano Mondino Date: Tue, 7 Oct 2025 21:59:42 +0200 Subject: [PATCH 02/15] Update Sources/AsyncAlgorithms/AsyncShareSequence.swift Co-authored-by: Pyry Jahkola --- Sources/AsyncAlgorithms/AsyncShareSequence.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AsyncAlgorithms/AsyncShareSequence.swift b/Sources/AsyncAlgorithms/AsyncShareSequence.swift index 72561092..bac11f8a 100644 --- a/Sources/AsyncAlgorithms/AsyncShareSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncShareSequence.swift @@ -228,7 +228,7 @@ where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _Send var generation = 0 var sides = [Int: Side.State]() var iteratingTask: IteratingTask - private(set) var buffer = Deque() + private(set) var buffer = Deque() private(set) var finished = false private(set) var failure: Error? var cancelled = false From 3c07b64f902a16f228d3458b4637d0ead4e20ebc Mon Sep 17 00:00:00 2001 From: Stefano Mondino Date: Tue, 7 Oct 2025 21:59:49 +0200 Subject: [PATCH 03/15] Update Sources/AsyncAlgorithms/AsyncShareSequence.swift Co-authored-by: Pyry Jahkola --- Sources/AsyncAlgorithms/AsyncShareSequence.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AsyncAlgorithms/AsyncShareSequence.swift b/Sources/AsyncAlgorithms/AsyncShareSequence.swift index bac11f8a..5a6caf77 100644 --- a/Sources/AsyncAlgorithms/AsyncShareSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncShareSequence.swift @@ -309,7 +309,7 @@ where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _Send // **Buffering Newest**: Appends if under the limit, otherwise removes the oldest and appends // // - Parameter element: The element to add to the buffer - mutating func enqueue(_ element: Base.Element) { + mutating func enqueue(_ element: Base.Element) { let count = buffer.count switch storagePolicy { From 22bb49ed6e2adebd184b607ff2c65b6f702eb545 Mon Sep 17 00:00:00 2001 From: Stefano Mondino Date: Tue, 7 Oct 2025 22:00:03 +0200 Subject: [PATCH 04/15] Update Sources/AsyncAlgorithms/AsyncShareSequence.swift Co-authored-by: Pyry Jahkola --- Sources/AsyncAlgorithms/AsyncShareSequence.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/AsyncAlgorithms/AsyncShareSequence.swift b/Sources/AsyncAlgorithms/AsyncShareSequence.swift index 5a6caf77..250088f3 100644 --- a/Sources/AsyncAlgorithms/AsyncShareSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncShareSequence.swift @@ -476,8 +476,8 @@ where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _Send } struct Resumption { - let continuation: UnsafeContinuation, Never> - let result: Result + let continuation: UnsafeContinuation, Never> + let result: Result func resume() { continuation.resume(returning: result) From 474e461b740a7228bb14c2f16d318c23823c84bf Mon Sep 17 00:00:00 2001 From: Stefano Mondino Date: Tue, 7 Oct 2025 22:00:12 +0200 Subject: [PATCH 05/15] Update Sources/AsyncAlgorithms/AsyncShareSequence.swift Co-authored-by: Pyry Jahkola --- Sources/AsyncAlgorithms/AsyncShareSequence.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AsyncAlgorithms/AsyncShareSequence.swift b/Sources/AsyncAlgorithms/AsyncShareSequence.swift index 250088f3..5d828307 100644 --- a/Sources/AsyncAlgorithms/AsyncShareSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncShareSequence.swift @@ -536,7 +536,7 @@ where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _Send await withUnsafeContinuation { continuation in let (res, limitContinuation, demandContinuation, cancelled) = state.withLock { state -> ( - Result?, UnsafeContinuation?, UnsafeContinuation?, Bool + Result?, UnsafeContinuation?, UnsafeContinuation?, Bool ) in guard let side = state.sides[id] else { return state.emit(.success(nil), limit: limit) From 7d7ee6f65ec922505d9b34b074a8179f59b97dcb Mon Sep 17 00:00:00 2001 From: Stefano Mondino Date: Tue, 7 Oct 2025 22:00:19 +0200 Subject: [PATCH 06/15] Update Sources/AsyncAlgorithms/AsyncShareSequence.swift Co-authored-by: Pyry Jahkola --- Sources/AsyncAlgorithms/AsyncShareSequence.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AsyncAlgorithms/AsyncShareSequence.swift b/Sources/AsyncAlgorithms/AsyncShareSequence.swift index 5d828307..1f26cd01 100644 --- a/Sources/AsyncAlgorithms/AsyncShareSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncShareSequence.swift @@ -484,7 +484,7 @@ where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _Send } } - func emit(_ result: Result) { + func emit(_ result: Result) { let (resumptions, limitContinuation, demandContinuation, cancelled) = state.withLock { state -> ([Resumption], UnsafeContinuation?, UnsafeContinuation?, Bool) in var resumptions = [Resumption]() From 1b6b715fcb24f20d904c277d122256f2fd3bff8b Mon Sep 17 00:00:00 2001 From: Stefano Mondino Date: Tue, 7 Oct 2025 22:06:41 +0200 Subject: [PATCH 07/15] fix: crash on Base.Element? { - let (factory, cancelled) = state.withLock { state -> ((@Sendable () -> sending Base.AsyncIterator)?, Bool) in - switch state.iteratingTask { - case .pending(let factory): - state.iteratingTask = .starting - return (factory, false) - case .cancelled: - return (nil, true) - default: - return (nil, false) - } - } - if cancelled { return nil } - if let factory { - let task: Task - // for the fancy dance of availability and canImport see the comment on the next check for details - #if swift(>=6.2) - if #available(macOS 26.0, iOS 26.0, tvOS 26.0, visionOS 26.0, *) { - task = Task(name: "Share Iteration") { [factory, self] in - await iterationLoop(factory: factory) - } - } else { - task = Task.detached(name: "Share Iteration") { [factory, self] in - await iterationLoop(factory: factory) + let iteratingTask = state.withLock { state -> IteratingTask in + defer { + if case .pending = state.iteratingTask { + state.iteratingTask = .starting + } } + return state.iteratingTask } - #else - task = Task.detached { [factory, self] in - await iterationLoop(factory: factory) - } - #endif - // Known Issue: there is a very small race where the task may not get a priority escalation during startup - // this unfortuantely cannot be avoided since the task should ideally not be formed within the critical - // region of the state. Since that could lead to potential deadlocks in low-core-count systems. - // That window is relatively small and can be revisited if a suitable proof of safe behavior can be - // determined. - state.withLock { state in - precondition(state.iteratingTask.isStarting) - state.iteratingTask = .running(task) - } + + if case .cancelled = iteratingTask { return nil } + + if case .pending(let factory) = iteratingTask { + let task: Task + // for the fancy dance of availability and canImport see the comment on the next check for details + #if swift(>=6.2) + if #available(macOS 26.0, iOS 26.0, tvOS 26.0, visionOS 26.0, *) { + task = Task(name: "Share Iteration") { [factory, self] in + await iterationLoop(factory: factory) + } + } else { + task = Task.detached(name: "Share Iteration") { [factory, self] in + await iterationLoop(factory: factory) + } + } + #else + task = Task.detached { [factory, self] in + await iterationLoop(factory: factory) + } + #endif + // Known Issue: there is a very small race where the task may not get a priority escalation during startup + // this unfortuantely cannot be avoided since the task should ideally not be formed within the critical + // region of the state. Since that could lead to potential deadlocks in low-core-count systems. + // That window is relatively small and can be revisited if a suitable proof of safe behavior can be + // determined. + state.withLock { state in + precondition(state.iteratingTask.isStarting) + state.iteratingTask = .running(task) + } } // withTaskPriorityEscalationHandler is only available for the '26 releases and the 6.2 version of From 1626281ce9ffbef8863ba49a704283d87115234c Mon Sep 17 00:00:00 2001 From: Stefano Mondino Date: Tue, 7 Oct 2025 22:17:47 +0200 Subject: [PATCH 08/15] Update Sources/AsyncAlgorithms/AsyncShareSequence.swift Co-authored-by: Pyry Jahkola --- Sources/AsyncAlgorithms/AsyncShareSequence.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AsyncAlgorithms/AsyncShareSequence.swift b/Sources/AsyncAlgorithms/AsyncShareSequence.swift index 7e785a7c..44b7e2da 100644 --- a/Sources/AsyncAlgorithms/AsyncShareSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncShareSequence.swift @@ -133,7 +133,7 @@ where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _Send // - `continuation`: The continuation waiting for the next element (nil if not waiting) // - `position`: The consumer's current position in the shared buffer struct State { - var continuation: UnsafeContinuation, Never>? + var continuation: UnsafeContinuation, Never>? var position = 0 // Creates a new state with the position adjusted by the given offset. From 38e635e4d42994dc0720a8ce561dfd67a30b975a Mon Sep 17 00:00:00 2001 From: Stefano Mondino Date: Tue, 7 Oct 2025 22:28:48 +0200 Subject: [PATCH 09/15] fix: indentation issues --- .../AsyncAlgorithms/AsyncShareSequence.swift | 73 +++++++++---------- Tests/AsyncAlgorithmsTests/TestShare.swift | 2 +- 2 files changed, 37 insertions(+), 38 deletions(-) diff --git a/Sources/AsyncAlgorithms/AsyncShareSequence.swift b/Sources/AsyncAlgorithms/AsyncShareSequence.swift index 44b7e2da..f43c89a9 100644 --- a/Sources/AsyncAlgorithms/AsyncShareSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncShareSequence.swift @@ -590,44 +590,44 @@ where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _Send } func next(isolation actor: isolated (any Actor)?, id: Int) async throws -> Base.Element? { - let iteratingTask = state.withLock { state -> IteratingTask in - defer { - if case .pending = state.iteratingTask { - state.iteratingTask = .starting - } + let iteratingTask = state.withLock { state -> IteratingTask in + defer { + if case .pending = state.iteratingTask { + state.iteratingTask = .starting } - return state.iteratingTask } - - if case .cancelled = iteratingTask { return nil } - - if case .pending(let factory) = iteratingTask { - let task: Task - // for the fancy dance of availability and canImport see the comment on the next check for details - #if swift(>=6.2) - if #available(macOS 26.0, iOS 26.0, tvOS 26.0, visionOS 26.0, *) { - task = Task(name: "Share Iteration") { [factory, self] in - await iterationLoop(factory: factory) - } - } else { - task = Task.detached(name: "Share Iteration") { [factory, self] in - await iterationLoop(factory: factory) - } - } - #else - task = Task.detached { [factory, self] in - await iterationLoop(factory: factory) - } - #endif - // Known Issue: there is a very small race where the task may not get a priority escalation during startup - // this unfortuantely cannot be avoided since the task should ideally not be formed within the critical - // region of the state. Since that could lead to potential deadlocks in low-core-count systems. - // That window is relatively small and can be revisited if a suitable proof of safe behavior can be - // determined. - state.withLock { state in - precondition(state.iteratingTask.isStarting) - state.iteratingTask = .running(task) - } + return state.iteratingTask + } + + if case .cancelled = iteratingTask { return nil } + + if case .pending(let factory) = iteratingTask { + let task: Task + // for the fancy dance of availability and canImport see the comment on the next check for details + #if swift(>=6.2) + if #available(macOS 26.0, iOS 26.0, tvOS 26.0, visionOS 26.0, *) { + task = Task(name: "Share Iteration") { [factory, self] in + await iterationLoop(factory: factory) + } + } else { + task = Task.detached(name: "Share Iteration") { [factory, self] in + await iterationLoop(factory: factory) + } + } + #else + task = Task.detached { [factory, self] in + await iterationLoop(factory: factory) + } + #endif + // Known Issue: there is a very small race where the task may not get a priority escalation during startup + // this unfortuantely cannot be avoided since the task should ideally not be formed within the critical + // region of the state. Since that could lead to potential deadlocks in low-core-count systems. + // That window is relatively small and can be revisited if a suitable proof of safe behavior can be + // determined. + state.withLock { state in + precondition(state.iteratingTask.isStarting) + state.iteratingTask = .running(task) + } } // withTaskPriorityEscalationHandler is only available for the '26 releases and the 6.2 version of @@ -656,7 +656,6 @@ where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _Send #else return try await nextIteration(id).get() #endif - } } diff --git a/Tests/AsyncAlgorithmsTests/TestShare.swift b/Tests/AsyncAlgorithmsTests/TestShare.swift index 6818903c..e7b9a250 100644 --- a/Tests/AsyncAlgorithmsTests/TestShare.swift +++ b/Tests/AsyncAlgorithmsTests/TestShare.swift @@ -98,7 +98,7 @@ final class TestShare: XCTestCase { results1.withLock { $0.append(value) } } // Delay to allow consumer 2 to get ahead - try? await Task.sleep(nanoseconds: 10_000_000) + try? await Task.sleep(nanoseconds: 10_000_000) // Continue reading while let value = try await iterator.next(isolation: nil) { results1.withLock { $0.append(value) } From 6b1b6f5356415bb7c9691c23708a0d5e13c3b0ca Mon Sep 17 00:00:00 2001 From: Stefano Mondino Date: Sat, 11 Oct 2025 15:42:06 +0200 Subject: [PATCH 10/15] fix: restore generic Failure --- Package.swift | 2 +- .../AsyncAlgorithms/AsyncShareSequence.swift | 31 ++++---- .../FailableAsyncSequence.swift | 29 ++++++++ Tests/AsyncAlgorithmsTests/TestShare.swift | 72 +++++++++---------- 4 files changed, 81 insertions(+), 53 deletions(-) create mode 100644 Sources/AsyncAlgorithms/FailableAsyncSequence.swift diff --git a/Package.swift b/Package.swift index c6a2894c..e000b89d 100644 --- a/Package.swift +++ b/Package.swift @@ -6,7 +6,7 @@ import CompilerPluginSupport let AsyncAlgorithms_v1_0 = "AvailabilityMacro=AsyncAlgorithms 1.0:macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0" #if compiler(>=6.0) && swift(>=6.0) // 5.10 doesnt support visionOS availability let AsyncAlgorithms_v1_1 = - "AvailabilityMacro=AsyncAlgorithms 1.1:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0" + "AvailabilityMacro=AsyncAlgorithms 1.1:macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, visionOS 1.0" #else let AsyncAlgorithms_v1_1 = "AvailabilityMacro=AsyncAlgorithms 1.1:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0" #endif diff --git a/Sources/AsyncAlgorithms/AsyncShareSequence.swift b/Sources/AsyncAlgorithms/AsyncShareSequence.swift index f43c89a9..41f0092c 100644 --- a/Sources/AsyncAlgorithms/AsyncShareSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncShareSequence.swift @@ -12,7 +12,7 @@ import Synchronization import DequeModule -@available(AsyncAlgorithms 1.0, *) +@available(AsyncAlgorithms 1.1, *) extension AsyncSequence where Element: Sendable, Self: _SendableMetatype, AsyncIterator: _SendableMetatype { /// Creates a shared async sequence that allows multiple concurrent iterations over a single source. @@ -112,7 +112,7 @@ where Element: Sendable, Self: _SendableMetatype, AsyncIterator: _SendableMetaty // // This type is typically not used directly; instead, use the `share()` method on any // async sequence that meets the sendability requirements. -@available(AsyncAlgorithms 1.0, *) +@available(AsyncAlgorithms 1.1, *) public struct AsyncShareSequence: Sendable where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _SendableMetatype { // Represents a single consumer's connection to the shared sequence. @@ -133,7 +133,7 @@ where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _Send // - `continuation`: The continuation waiting for the next element (nil if not waiting) // - `position`: The consumer's current position in the shared buffer struct State { - var continuation: UnsafeContinuation, Never>? + var continuation: UnsafeContinuation, Never>? var position = 0 // Creates a new state with the position adjusted by the given offset. @@ -160,7 +160,7 @@ where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _Send iteration.unregisterSide(id) } - func next(isolation actor: isolated (any Actor)?) async throws -> Base.Element? { + func next(isolation actor: isolated (any Actor)?) async throws(Failure) -> Base.Element? { try await iteration.next(isolation: actor, id: id) } } @@ -230,7 +230,7 @@ where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _Send var iteratingTask: IteratingTask private(set) var buffer = Deque() private(set) var finished = false - private(set) var failure: Error? + private(set) var failure: Failure? var cancelled = false var limit: UnsafeContinuation? var demand: UnsafeContinuation? @@ -333,7 +333,7 @@ where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _Send finished = true } - mutating func fail(_ error: Error) { + mutating func fail(_ error: Failure) { finished = true failure = error } @@ -476,15 +476,15 @@ where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _Send } struct Resumption { - let continuation: UnsafeContinuation, Never> - let result: Result + let continuation: UnsafeContinuation, Never> + let result: Result func resume() { continuation.resume(returning: result) } } - func emit(_ result: Result) { + func emit(_ result: Result) { let (resumptions, limitContinuation, demandContinuation, cancelled) = state.withLock { state -> ([Resumption], UnsafeContinuation?, UnsafeContinuation?, Bool) in var resumptions = [Resumption]() @@ -531,12 +531,12 @@ where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _Send private func nextIteration( _ id: Int - ) async -> Result { + ) async -> Result { return await withTaskCancellationHandler { await withUnsafeContinuation { continuation in let (res, limitContinuation, demandContinuation, cancelled) = state.withLock { state -> ( - Result?, UnsafeContinuation?, UnsafeContinuation?, Bool + Result?, UnsafeContinuation?, UnsafeContinuation?, Bool ) in guard let side = state.sides[id] else { return state.emit(.success(nil), limit: limit) @@ -589,7 +589,7 @@ where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _Send } } - func next(isolation actor: isolated (any Actor)?, id: Int) async throws -> Base.Element? { + func next(isolation actor: isolated (any Actor)?, id: Int) async rethrows -> Base.Element? { let iteratingTask = state.withLock { state -> IteratingTask in defer { if case .pending = state.iteratingTask { @@ -693,10 +693,9 @@ where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _Send } } -@available(AsyncAlgorithms 1.0, *) -extension AsyncShareSequence: AsyncSequence { +@available(AsyncAlgorithms 1.1, *) +extension AsyncShareSequence: AsyncSequence, FailableAsyncSequence { public typealias Element = Base.Element - public typealias Failure = Swift.Error public struct Iterator: AsyncIteratorProtocol { let side: Side @@ -709,7 +708,7 @@ extension AsyncShareSequence: AsyncSequence { try await side.next(isolation: nil) } - mutating public func next(isolation actor: isolated (any Actor)?) async throws(Failure) -> Element? { + mutating public func next(isolation actor: isolated (any Actor)?) async rethrows -> Element? { try await side.next(isolation: actor) } } diff --git a/Sources/AsyncAlgorithms/FailableAsyncSequence.swift b/Sources/AsyncAlgorithms/FailableAsyncSequence.swift new file mode 100644 index 00000000..a57f6cd1 --- /dev/null +++ b/Sources/AsyncAlgorithms/FailableAsyncSequence.swift @@ -0,0 +1,29 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +/// A backportable protocol / hack to allow `Failure` associated type on older iOS/macOS/etc. versions. +/// +/// By assigning this protocol to any value conforming to `AsyncSequence`, they will both have access to `Failure` +/// > There could be a possible issue with mangled name of the entire object as discussed +/// [here](https://forums.swift.org/t/how-to-use-asyncsequence-on-macos-14-5-in-xcode-16-beta-need-help-with-availability-check-since-failure-is-unavailb-e/72439/5). +/// However, the issue should only happen if the object conforming to this protocol follows (_Concurrency, AsyncSequence) +/// in lexicographic order. (AsyncAlgorithms, MySequence) should always be after it. +/// +/// Example: +/// ```swift +/// class MySequence: AsyncSequence, FailableAsyncSequence { ... } +/// +/// ``` +@available(AsyncAlgorithms 1.1, *) +public protocol FailableAsyncSequence { + typealias _Failure = Failure + associatedtype Failure: Error +} diff --git a/Tests/AsyncAlgorithmsTests/TestShare.swift b/Tests/AsyncAlgorithmsTests/TestShare.swift index e7b9a250..9d3b7257 100644 --- a/Tests/AsyncAlgorithmsTests/TestShare.swift +++ b/Tests/AsyncAlgorithmsTests/TestShare.swift @@ -9,13 +9,13 @@ // //===----------------------------------------------------------------------===// -#if compiler(>=6.2) +#if compiler(>=6.0) import XCTest import AsyncAlgorithms import Synchronization -@available(macOS 15.0, *) +@available(AsyncAlgorithms 1.1, *) final class TestShare: XCTestCase { // MARK: - Basic Functionality Tests @@ -31,7 +31,7 @@ final class TestShare: XCTestCase { var iterator = shared.makeAsyncIterator() gate1.open() await gate2.enter() - while let value = try await iterator.next(isolation: nil) { + while let value = await iterator.next(isolation: nil) { results.append(value) } return results @@ -42,13 +42,13 @@ final class TestShare: XCTestCase { var iterator = shared.makeAsyncIterator() gate2.open() await gate1.enter() - while let value = try await iterator.next(isolation: nil) { + while let value = await iterator.next(isolation: nil) { results.append(value) } return results } - let results1 = try await consumer1.value - let results2 = try await consumer2.value + let results1 = await consumer1.value + let results2 = await consumer2.value XCTAssertEqual(results1, [1, 2, 3, 4, 5]) XCTAssertEqual(results2, [1, 2, 3, 4, 5]) @@ -94,13 +94,13 @@ final class TestShare: XCTestCase { gate1.open() await gate2.enter() // Consumer 1 reads first element - if let value = try await iterator.next(isolation: nil) { + if let value = await iterator.next(isolation: nil) { results1.withLock { $0.append(value) } } // Delay to allow consumer 2 to get ahead try? await Task.sleep(nanoseconds: 10_000_000) // Continue reading - while let value = try await iterator.next(isolation: nil) { + while let value = await iterator.next(isolation: nil) { results1.withLock { $0.append(value) } } } @@ -110,7 +110,7 @@ final class TestShare: XCTestCase { gate2.open() await gate1.enter() // Consumer 2 reads all elements quickly - while let value = try await iterator.next(isolation: nil) { + while let value = await iterator.next(isolation: nil) { results2.withLock { $0.append(value) } } } @@ -122,8 +122,8 @@ final class TestShare: XCTestCase { gated.advance() // 4 gated.advance() // 5 - try await consumer1.value - try await consumer2.value + await consumer1.value + await consumer2.value // Both consumers should receive all elements XCTAssertEqual(results1.withLock { $0 }.sorted(), [1, 2, 3, 4, 5]) @@ -143,7 +143,7 @@ final class TestShare: XCTestCase { var iterator = shared.makeAsyncIterator() gate2.open() await gate1.enter() - while let value = try await iterator.next(isolation: nil) { + while let value = await iterator.next(isolation: nil) { results1.withLock { $0.append(value) } // Add some delay to consumer 1 try? await Task.sleep(nanoseconds: 1_000_000) @@ -154,13 +154,13 @@ final class TestShare: XCTestCase { var iterator = shared.makeAsyncIterator() gate1.open() await gate2.enter() - while let value = try await iterator.next(isolation: nil) { + while let value = await iterator.next(isolation: nil) { results2.withLock { $0.append(value) } } } - try await consumer1.value - try await consumer2.value + await consumer1.value + await consumer2.value XCTAssertEqual(results1.withLock { $0 }, [1, 2, 3, 4, 5]) XCTAssertEqual(results2.withLock { $0 }, [1, 2, 3, 4, 5]) @@ -179,7 +179,7 @@ final class TestShare: XCTestCase { var iterator = shared.makeAsyncIterator() gate2.open() await gate1.enter() - while let value = try await iterator.next(isolation: nil) { + while let value = await iterator.next(isolation: nil) { fastResults.withLock { $0.append(value) } } } @@ -189,13 +189,13 @@ final class TestShare: XCTestCase { gate1.open() await gate2.enter() // Read first element immediately - if let value = try await iterator.next(isolation: nil) { + if let value = await iterator.next(isolation: nil) { slowResults.withLock { $0.append(value) } } // Add significant delay to let buffer fill up and potentially overflow try? await Task.sleep(nanoseconds: 50_000_000) // Continue reading remaining elements - while let value = try await iterator.next(isolation: nil) { + while let value = await iterator.next(isolation: nil) { slowResults.withLock { $0.append(value) } } } @@ -211,8 +211,8 @@ final class TestShare: XCTestCase { try? await Task.sleep(nanoseconds: 5_000_000) gated.advance() // 5 - try await fastConsumer.value - try await slowConsumer.value + await fastConsumer.value + await slowConsumer.value let slowResultsArray = slowResults.withLock { $0 } @@ -251,7 +251,7 @@ final class TestShare: XCTestCase { var iterator = shared.makeAsyncIterator() gate2.open() await gate1.enter() - while let value = try await iterator.next(isolation: nil) { + while let value = await iterator.next(isolation: nil) { fastResults.withLock { $0.append(value) } } } @@ -261,13 +261,13 @@ final class TestShare: XCTestCase { gate1.open() await gate2.enter() // Read first element immediately - if let value = try await iterator.next(isolation: nil) { + if let value = await iterator.next(isolation: nil) { slowResults.withLock { $0.append(value) } } // Add significant delay to let buffer fill up and potentially overflow try? await Task.sleep(nanoseconds: 50_000_000) // Continue reading remaining elements - while let value = try await iterator.next(isolation: nil) { + while let value = await iterator.next(isolation: nil) { slowResults.withLock { $0.append(value) } } } @@ -283,8 +283,8 @@ final class TestShare: XCTestCase { try? await Task.sleep(nanoseconds: 5_000_000) gated.advance() // 5 - try await fastConsumer.value - try await slowConsumer.value + await fastConsumer.value + await slowConsumer.value let slowResultsArray = slowResults.withLock { $0 } @@ -397,11 +397,11 @@ final class TestShare: XCTestCase { let task = Task { var iterator = shared.makeAsyncIterator() - if try await iterator.next(isolation: nil) != nil { + if await iterator.next(isolation: nil) != nil { iterated.fulfill() } // Task will be cancelled here, so iteration should stop - while try await iterator.next(isolation: nil) != nil { + while await iterator.next(isolation: nil) != nil { // Continue iterating until cancelled } finished.fulfill() @@ -480,7 +480,7 @@ final class TestShare: XCTestCase { // Start early consumer let earlyConsumer = Task { var iterator = shared.makeAsyncIterator() - while let value = try await iterator.next(isolation: nil) { + while let value = await iterator.next(isolation: nil) { earlyResults.withLock { $0.append(value) } } } @@ -495,7 +495,7 @@ final class TestShare: XCTestCase { // Start late consumer let lateConsumer = Task { var iterator = shared.makeAsyncIterator() - while let value = try await iterator.next(isolation: nil) { + while let value = await iterator.next(isolation: nil) { lateResults.withLock { $0.append(value) } } } @@ -505,8 +505,8 @@ final class TestShare: XCTestCase { gated.advance() // 4 gated.advance() // 5 - try await earlyConsumer.value - try await lateConsumer.value + await earlyConsumer.value + await lateConsumer.value // Early consumer gets all elements XCTAssertEqual(earlyResults.withLock { $0 }, [1, 2, 3, 4, 5]) @@ -522,11 +522,11 @@ final class TestShare: XCTestCase { var iterator2 = shared.makeAsyncIterator() // Both iterators should independently get the same elements - let value1a = try await iterator1.next(isolation: nil) - let value2a = try await iterator2.next(isolation: nil) + let value1a = await iterator1.next(isolation: nil) + let value2a = await iterator2.next(isolation: nil) - let value1b = try await iterator1.next(isolation: nil) - let value2b = try await iterator2.next(isolation: nil) + let value1b = await iterator1.next(isolation: nil) + let value2b = await iterator2.next(isolation: nil) XCTAssertEqual(value1a, 1) XCTAssertEqual(value2a, 1) @@ -549,7 +549,7 @@ final class TestShare: XCTestCase { // Create a new iterator after the sequence finished var newIterator = shared.makeAsyncIterator() - let value = try await newIterator.next(isolation: nil) + let value = await newIterator.next(isolation: nil) XCTAssertNil(value) // Should return nil since source is exhausted } From c02d4ed557f26235cb524d72431d1a8295a88fde Mon Sep 17 00:00:00 2001 From: Stefano Mondino Date: Sat, 11 Oct 2025 23:39:02 +0200 Subject: [PATCH 11/15] - enable 1.2 and change 1.1 with backwards compatibility to iOS 16 - attempt to throw errors properly --- Package.swift | 7 ++ ...e.swift => AsyncFailureBackportable.swift} | 6 +- .../AsyncAlgorithms/AsyncShareSequence.swift | 29 +++--- Tests/AsyncAlgorithmsTests/TestShare.swift | 92 ++++++++++++++----- 4 files changed, 98 insertions(+), 36 deletions(-) rename Sources/AsyncAlgorithms/{FailableAsyncSequence.swift => AsyncFailureBackportable.swift} (88%) diff --git a/Package.swift b/Package.swift index e000b89d..82dae0fa 100644 --- a/Package.swift +++ b/Package.swift @@ -7,8 +7,12 @@ let AsyncAlgorithms_v1_0 = "AvailabilityMacro=AsyncAlgorithms 1.0:macOS 10.15, i #if compiler(>=6.0) && swift(>=6.0) // 5.10 doesnt support visionOS availability let AsyncAlgorithms_v1_1 = "AvailabilityMacro=AsyncAlgorithms 1.1:macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, visionOS 1.0" +let AsyncAlgorithms_v1_2 = + "AvailabilityMacro=AsyncAlgorithms 1.2:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0" #else let AsyncAlgorithms_v1_1 = "AvailabilityMacro=AsyncAlgorithms 1.1:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0" +let AsyncAlgorithms_v1_2 = + "AvailabilityMacro=AsyncAlgorithms 1.2:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0" #endif let availabilityMacros: [SwiftSetting] = [ @@ -18,6 +22,9 @@ let availabilityMacros: [SwiftSetting] = [ .enableExperimentalFeature( AsyncAlgorithms_v1_1 ), + .enableExperimentalFeature( + AsyncAlgorithms_v1_2 + ) ] let package = Package( diff --git a/Sources/AsyncAlgorithms/FailableAsyncSequence.swift b/Sources/AsyncAlgorithms/AsyncFailureBackportable.swift similarity index 88% rename from Sources/AsyncAlgorithms/FailableAsyncSequence.swift rename to Sources/AsyncAlgorithms/AsyncFailureBackportable.swift index a57f6cd1..0feeb47b 100644 --- a/Sources/AsyncAlgorithms/FailableAsyncSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncFailureBackportable.swift @@ -19,11 +19,11 @@ /// /// Example: /// ```swift -/// class MySequence: AsyncSequence, FailableAsyncSequence { ... } +/// class MySequence: AsyncSequence, AsyncFailureBackportable { ... } /// /// ``` @available(AsyncAlgorithms 1.1, *) -public protocol FailableAsyncSequence { - typealias _Failure = Failure +public protocol AsyncFailureBackportable { + typealias BackportableFailure = Failure associatedtype Failure: Error } diff --git a/Sources/AsyncAlgorithms/AsyncShareSequence.swift b/Sources/AsyncAlgorithms/AsyncShareSequence.swift index 41f0092c..4d6743f2 100644 --- a/Sources/AsyncAlgorithms/AsyncShareSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncShareSequence.swift @@ -8,6 +8,7 @@ // See https://swift.org/LICENSE.txt for license information // //===----------------------------------------------------------------------===// +#if compiler(>=6.2) import Synchronization import DequeModule @@ -133,7 +134,7 @@ where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _Send // - `continuation`: The continuation waiting for the next element (nil if not waiting) // - `position`: The consumer's current position in the shared buffer struct State { - var continuation: UnsafeContinuation, Never>? + var continuation: UnsafeContinuation, Never>? var position = 0 // Creates a new state with the position adjusted by the given offset. @@ -585,11 +586,11 @@ where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _Send } } } catch { - emit(.failure(error)) + emit(.failure(error as! Failure)) } } - func next(isolation actor: isolated (any Actor)?, id: Int) async rethrows -> Base.Element? { + func next(isolation actor: isolated (any Actor)?, id: Int) async throws(Failure) -> Base.Element? { let iteratingTask = state.withLock { state -> IteratingTask in defer { if case .pending = state.iteratingTask { @@ -694,23 +695,22 @@ where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _Send } @available(AsyncAlgorithms 1.1, *) -extension AsyncShareSequence: AsyncSequence, FailableAsyncSequence { +extension AsyncShareSequence: AsyncSequence, AsyncFailureBackportable { public typealias Element = Base.Element - - public struct Iterator: AsyncIteratorProtocol { + public struct Iterator: AsyncIteratorProtocol, _SendableMetatype { let side: Side init(_ iteration: Iteration) { side = Side(iteration) } - + mutating public func next() async rethrows -> Element? { try await side.next(isolation: nil) } - - mutating public func next(isolation actor: isolated (any Actor)?) async rethrows -> Element? { - try await side.next(isolation: actor) - } + +// mutating public func next(isolation actor: isolated (any Actor)?) async throws(Self.BackportableFailure) -> Element? { +// try await side.next(isolation: actor) +// } } public func makeAsyncIterator() -> Iterator { @@ -718,3 +718,10 @@ extension AsyncShareSequence: AsyncSequence, FailableAsyncSequence { } } +@available(AsyncAlgorithms 1.2, *) +extension AsyncShareSequence.Iterator { + mutating public func next(isolation actor: isolated (any Actor)?) async throws(Base.Failure) -> Base.Element? { + try await side.next(isolation: actor) + } +} +#endif diff --git a/Tests/AsyncAlgorithmsTests/TestShare.swift b/Tests/AsyncAlgorithmsTests/TestShare.swift index 9d3b7257..4c2cec83 100644 --- a/Tests/AsyncAlgorithmsTests/TestShare.swift +++ b/Tests/AsyncAlgorithmsTests/TestShare.swift @@ -31,7 +31,7 @@ final class TestShare: XCTestCase { var iterator = shared.makeAsyncIterator() gate1.open() await gate2.enter() - while let value = await iterator.next(isolation: nil) { + while let value = await iterator.next() { results.append(value) } return results @@ -42,7 +42,7 @@ final class TestShare: XCTestCase { var iterator = shared.makeAsyncIterator() gate2.open() await gate1.enter() - while let value = await iterator.next(isolation: nil) { + while let value = await iterator.next() { results.append(value) } return results @@ -94,13 +94,13 @@ final class TestShare: XCTestCase { gate1.open() await gate2.enter() // Consumer 1 reads first element - if let value = await iterator.next(isolation: nil) { + if let value = await iterator.next() { results1.withLock { $0.append(value) } } // Delay to allow consumer 2 to get ahead try? await Task.sleep(nanoseconds: 10_000_000) // Continue reading - while let value = await iterator.next(isolation: nil) { + while let value = await iterator.next() { results1.withLock { $0.append(value) } } } @@ -110,7 +110,7 @@ final class TestShare: XCTestCase { gate2.open() await gate1.enter() // Consumer 2 reads all elements quickly - while let value = await iterator.next(isolation: nil) { + while let value = await iterator.next() { results2.withLock { $0.append(value) } } } @@ -143,7 +143,7 @@ final class TestShare: XCTestCase { var iterator = shared.makeAsyncIterator() gate2.open() await gate1.enter() - while let value = await iterator.next(isolation: nil) { + while let value = await iterator.next() { results1.withLock { $0.append(value) } // Add some delay to consumer 1 try? await Task.sleep(nanoseconds: 1_000_000) @@ -154,7 +154,7 @@ final class TestShare: XCTestCase { var iterator = shared.makeAsyncIterator() gate1.open() await gate2.enter() - while let value = await iterator.next(isolation: nil) { + while let value = await iterator.next() { results2.withLock { $0.append(value) } } } @@ -179,7 +179,7 @@ final class TestShare: XCTestCase { var iterator = shared.makeAsyncIterator() gate2.open() await gate1.enter() - while let value = await iterator.next(isolation: nil) { + while let value = await iterator.next() { fastResults.withLock { $0.append(value) } } } @@ -189,13 +189,13 @@ final class TestShare: XCTestCase { gate1.open() await gate2.enter() // Read first element immediately - if let value = await iterator.next(isolation: nil) { + if let value = await iterator.next() { slowResults.withLock { $0.append(value) } } // Add significant delay to let buffer fill up and potentially overflow try? await Task.sleep(nanoseconds: 50_000_000) // Continue reading remaining elements - while let value = await iterator.next(isolation: nil) { + while let value = await iterator.next() { slowResults.withLock { $0.append(value) } } } @@ -251,7 +251,7 @@ final class TestShare: XCTestCase { var iterator = shared.makeAsyncIterator() gate2.open() await gate1.enter() - while let value = await iterator.next(isolation: nil) { + while let value = await iterator.next() { fastResults.withLock { $0.append(value) } } } @@ -261,13 +261,13 @@ final class TestShare: XCTestCase { gate1.open() await gate2.enter() // Read first element immediately - if let value = await iterator.next(isolation: nil) { + if let value = await iterator.next() { slowResults.withLock { $0.append(value) } } // Add significant delay to let buffer fill up and potentially overflow try? await Task.sleep(nanoseconds: 50_000_000) // Continue reading remaining elements - while let value = await iterator.next(isolation: nil) { + while let value = await iterator.next() { slowResults.withLock { $0.append(value) } } } @@ -397,11 +397,11 @@ final class TestShare: XCTestCase { let task = Task { var iterator = shared.makeAsyncIterator() - if await iterator.next(isolation: nil) != nil { + if await iterator.next() != nil { iterated.fulfill() } // Task will be cancelled here, so iteration should stop - while await iterator.next(isolation: nil) != nil { + while await iterator.next() != nil { // Continue iterating until cancelled } finished.fulfill() @@ -480,7 +480,7 @@ final class TestShare: XCTestCase { // Start early consumer let earlyConsumer = Task { var iterator = shared.makeAsyncIterator() - while let value = await iterator.next(isolation: nil) { + while let value = await iterator.next() { earlyResults.withLock { $0.append(value) } } } @@ -495,7 +495,7 @@ final class TestShare: XCTestCase { // Start late consumer let lateConsumer = Task { var iterator = shared.makeAsyncIterator() - while let value = await iterator.next(isolation: nil) { + while let value = await iterator.next() { lateResults.withLock { $0.append(value) } } } @@ -522,11 +522,11 @@ final class TestShare: XCTestCase { var iterator2 = shared.makeAsyncIterator() // Both iterators should independently get the same elements - let value1a = await iterator1.next(isolation: nil) - let value2a = await iterator2.next(isolation: nil) + let value1a = await iterator1.next() + let value2a = await iterator2.next() - let value1b = await iterator1.next(isolation: nil) - let value2b = await iterator2.next(isolation: nil) + let value1b = await iterator1.next() + let value2b = await iterator2.next() XCTAssertEqual(value1a, 1) XCTAssertEqual(value2a, 1) @@ -549,7 +549,7 @@ final class TestShare: XCTestCase { // Create a new iterator after the sequence finished var newIterator = shared.makeAsyncIterator() - let value = await newIterator.next(isolation: nil) + let value = await newIterator.next() XCTAssertNil(value) // Should return nil since source is exhausted } @@ -572,6 +572,37 @@ final class TestShare: XCTestCase { XCTAssertEqual(results1, [1, 2, 3, 4, 5]) XCTAssertEqual(results2, []) // Should be empty since source is exhausted } + + @available(AsyncAlgorithms 1.1, *) + func test_share_rethrows_failure_type_on_backported() async { + let shared = AsyncThrowingStream { + $0.finish(throwing: TestError.failure) + }.share() + do { + for try await _ in shared { + XCTFail("Expected to not get here") + } + } catch { + XCTAssertEqual(error as? TestError, .failure) + } + } + + func test_share_rethrows_failure_type_without_falling_back_to_any_error() async { + if #available(AsyncAlgorithms 1.2, *) { + // Ensure - at compile time - that error is effectively a TestError + let shared: some AsyncSequence = AlwaysFailingSequence().share() + do { + for try await _ in shared { + XCTFail("Expected to not get here") + } + } catch { + + XCTAssertEqual(error, TestError.failure) + } + } else { + // not available + } + } } // MARK: - Helper Types @@ -580,4 +611,21 @@ private enum TestError: Error, Equatable { case failure } +@available(AsyncAlgorithms 1.2, *) +/// A sequence used to properly test concrete error on 1.2 +private struct AlwaysFailingSequence: AsyncSequence, Sendable { + init() {} + + func makeAsyncIterator() -> AsyncIterator { AsyncIterator() } + + struct AsyncIterator: AsyncIteratorProtocol, Sendable { + + func next() async throws(TestError) -> Void? { + throw TestError.failure + } + mutating func next(completion: @escaping (Result) -> Void) async throws(TestError) -> Element? { + throw TestError.failure + } + } +} #endif From aa7a6c5902eb785144fd15cc104567db1cebf090 Mon Sep 17 00:00:00 2001 From: Stefano Mondino Date: Wed, 15 Oct 2025 14:11:06 +0200 Subject: [PATCH 12/15] fix: crash on pre 1.2 --- .../AsyncFailureBackportable.swift | 29 --------------- .../AsyncAlgorithms/AsyncShareSequence.swift | 33 ++++++++++------- .../Support/FailingSequence.swift | 26 ++++++++++++++ Tests/AsyncAlgorithmsTests/TestShare.swift | 36 ++++++------------- 4 files changed, 57 insertions(+), 67 deletions(-) delete mode 100644 Sources/AsyncAlgorithms/AsyncFailureBackportable.swift create mode 100644 Tests/AsyncAlgorithmsTests/Support/FailingSequence.swift diff --git a/Sources/AsyncAlgorithms/AsyncFailureBackportable.swift b/Sources/AsyncAlgorithms/AsyncFailureBackportable.swift deleted file mode 100644 index 0feeb47b..00000000 --- a/Sources/AsyncAlgorithms/AsyncFailureBackportable.swift +++ /dev/null @@ -1,29 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift Async Algorithms open source project -// -// Copyright (c) 2022 Apple Inc. and the Swift project authors -// Licensed under Apache License v2.0 with Runtime Library Exception -// -// See https://swift.org/LICENSE.txt for license information -// -//===----------------------------------------------------------------------===// - -/// A backportable protocol / hack to allow `Failure` associated type on older iOS/macOS/etc. versions. -/// -/// By assigning this protocol to any value conforming to `AsyncSequence`, they will both have access to `Failure` -/// > There could be a possible issue with mangled name of the entire object as discussed -/// [here](https://forums.swift.org/t/how-to-use-asyncsequence-on-macos-14-5-in-xcode-16-beta-need-help-with-availability-check-since-failure-is-unavailb-e/72439/5). -/// However, the issue should only happen if the object conforming to this protocol follows (_Concurrency, AsyncSequence) -/// in lexicographic order. (AsyncAlgorithms, MySequence) should always be after it. -/// -/// Example: -/// ```swift -/// class MySequence: AsyncSequence, AsyncFailureBackportable { ... } -/// -/// ``` -@available(AsyncAlgorithms 1.1, *) -public protocol AsyncFailureBackportable { - typealias BackportableFailure = Failure - associatedtype Failure: Error -} diff --git a/Sources/AsyncAlgorithms/AsyncShareSequence.swift b/Sources/AsyncAlgorithms/AsyncShareSequence.swift index 4d6743f2..45d4e672 100644 --- a/Sources/AsyncAlgorithms/AsyncShareSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncShareSequence.swift @@ -129,6 +129,10 @@ where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _Send // **Usage**: Tracks buffer position and manages async continuations // **Cleanup**: Automatically unregisters and cancels pending operations on deinit final class Side { + // Due to a runtime crash in 1.0 compatible versions, it's not possible to handle + // a generic failure constrained to Base.Failure. We handle inner failure with a `any Error` + // and force unwrap it to the generic 1.2 generic type on the outside Iterator. + typealias Failure = any Error // Tracks the state of a single consumer's iteration. // // - `continuation`: The continuation waiting for the next element (nil if not waiting) @@ -180,6 +184,7 @@ where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _Send // All operations are synchronized using a `Mutex` to ensure thread-safe access // to the shared state across multiple concurrent consumers. final class Iteration: Sendable { + typealias Failure = Side.Failure // Represents the state of the background task that consumes the source sequence. // // The iteration task goes through several states during its lifecycle: @@ -586,7 +591,7 @@ where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _Send } } } catch { - emit(.failure(error as! Failure)) + emit(.failure(error)) } } @@ -695,8 +700,10 @@ where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _Send } @available(AsyncAlgorithms 1.1, *) -extension AsyncShareSequence: AsyncSequence, AsyncFailureBackportable { +extension AsyncShareSequence: AsyncSequence { public typealias Element = Base.Element + @available(AsyncAlgorithms 1.2, *) + public typealias Failure = Base.Failure public struct Iterator: AsyncIteratorProtocol, _SendableMetatype { let side: Side @@ -707,10 +714,18 @@ extension AsyncShareSequence: AsyncSequence, AsyncFailureBackportable { mutating public func next() async rethrows -> Element? { try await side.next(isolation: nil) } - -// mutating public func next(isolation actor: isolated (any Actor)?) async throws(Self.BackportableFailure) -> Element? { -// try await side.next(isolation: actor) -// } + + @available(AsyncAlgorithms 1.2, *) + mutating public func next(isolation actor: isolated (any Actor)?) async throws(Failure) -> Element? { + do { + return try await side.next(isolation: actor) + } catch { + // It's guaranteed to match `Failure` but we are keeping the internal `Side` and `Iteration` + // constrained to `any Error` to prevent a compiler bug visible at runtime + // on pre 1.2 operating systems + throw error as! Failure + } + } } public func makeAsyncIterator() -> Iterator { @@ -718,10 +733,4 @@ extension AsyncShareSequence: AsyncSequence, AsyncFailureBackportable { } } -@available(AsyncAlgorithms 1.2, *) -extension AsyncShareSequence.Iterator { - mutating public func next(isolation actor: isolated (any Actor)?) async throws(Base.Failure) -> Base.Element? { - try await side.next(isolation: actor) - } -} #endif diff --git a/Tests/AsyncAlgorithmsTests/Support/FailingSequence.swift b/Tests/AsyncAlgorithmsTests/Support/FailingSequence.swift new file mode 100644 index 00000000..d683b06e --- /dev/null +++ b/Tests/AsyncAlgorithmsTests/Support/FailingSequence.swift @@ -0,0 +1,26 @@ +// +// FailingSequence.swift +// swift-async-algorithms +// +// Created by Stefano Mondino on 15/10/25. +// + +@available(AsyncAlgorithms 1.2, *) +struct FailingSequence: AsyncSequence, Sendable { + typealias Element = Void + let error: Failure + init(_ error: Failure) { + self.error = error + } + func makeAsyncIterator() -> AsyncIterator { AsyncIterator(error: error) } + + struct AsyncIterator: AsyncIteratorProtocol, Sendable { + let error: Failure + func next() async throws(Failure) -> Void? { + throw error + } + mutating func next(completion: @escaping (Result) -> Void) async throws(Failure) -> Element? { + throw error + } + } +} diff --git a/Tests/AsyncAlgorithmsTests/TestShare.swift b/Tests/AsyncAlgorithmsTests/TestShare.swift index 4c2cec83..f4f5910a 100644 --- a/Tests/AsyncAlgorithmsTests/TestShare.swift +++ b/Tests/AsyncAlgorithmsTests/TestShare.swift @@ -589,18 +589,19 @@ final class TestShare: XCTestCase { func test_share_rethrows_failure_type_without_falling_back_to_any_error() async { if #available(AsyncAlgorithms 1.2, *) { + + // Ensure - at compile time - that error is effectively a TestError - let shared: some AsyncSequence = AlwaysFailingSequence().share() - do { - for try await _ in shared { - XCTFail("Expected to not get here") - } - } catch { - - XCTAssertEqual(error, TestError.failure) + let shared: some AsyncSequence = FailingSequence(TestError.failure).share() + do { + for try await _ in shared { + XCTFail("Expected to not get here") + } + } catch { + XCTAssertEqual(error, TestError.failure) } } else { - // not available + _ = XCTSkip("This test is not available before 1.2") } } } @@ -611,21 +612,4 @@ private enum TestError: Error, Equatable { case failure } -@available(AsyncAlgorithms 1.2, *) -/// A sequence used to properly test concrete error on 1.2 -private struct AlwaysFailingSequence: AsyncSequence, Sendable { - init() {} - - func makeAsyncIterator() -> AsyncIterator { AsyncIterator() } - - struct AsyncIterator: AsyncIteratorProtocol, Sendable { - - func next() async throws(TestError) -> Void? { - throw TestError.failure - } - mutating func next(completion: @escaping (Result) -> Void) async throws(TestError) -> Element? { - throw TestError.failure - } - } -} #endif From 7c9dba78c00187b6f0606c0c73b987f49e66fab3 Mon Sep 17 00:00:00 2001 From: Stefano Mondino Date: Wed, 15 Oct 2025 14:47:07 +0200 Subject: [PATCH 13/15] refactor 1.1 to go back until ios 14.0 and similar --- Package.swift | 4 ++-- Tests/AsyncAlgorithmsTests/TestShare.swift | 3 --- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/Package.swift b/Package.swift index 82dae0fa..62595603 100644 --- a/Package.swift +++ b/Package.swift @@ -6,11 +6,11 @@ import CompilerPluginSupport let AsyncAlgorithms_v1_0 = "AvailabilityMacro=AsyncAlgorithms 1.0:macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0" #if compiler(>=6.0) && swift(>=6.0) // 5.10 doesnt support visionOS availability let AsyncAlgorithms_v1_1 = - "AvailabilityMacro=AsyncAlgorithms 1.1:macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, visionOS 1.0" + "AvailabilityMacro=AsyncAlgorithms 1.1:macOS 11.0, iOS 14.0, tvOS 14.0, watchOS 7.0, visionOS 1.0" let AsyncAlgorithms_v1_2 = "AvailabilityMacro=AsyncAlgorithms 1.2:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0" #else -let AsyncAlgorithms_v1_1 = "AvailabilityMacro=AsyncAlgorithms 1.1:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0" +let AsyncAlgorithms_v1_1 = "AvailabilityMacro=AsyncAlgorithms 1.1:macOS 11.0, iOS 14.0, tvOS 14.0, watchOS 7.0" let AsyncAlgorithms_v1_2 = "AvailabilityMacro=AsyncAlgorithms 1.2:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0" #endif diff --git a/Tests/AsyncAlgorithmsTests/TestShare.swift b/Tests/AsyncAlgorithmsTests/TestShare.swift index f4f5910a..fab8bc75 100644 --- a/Tests/AsyncAlgorithmsTests/TestShare.swift +++ b/Tests/AsyncAlgorithmsTests/TestShare.swift @@ -573,7 +573,6 @@ final class TestShare: XCTestCase { XCTAssertEqual(results2, []) // Should be empty since source is exhausted } - @available(AsyncAlgorithms 1.1, *) func test_share_rethrows_failure_type_on_backported() async { let shared = AsyncThrowingStream { $0.finish(throwing: TestError.failure) @@ -589,8 +588,6 @@ final class TestShare: XCTestCase { func test_share_rethrows_failure_type_without_falling_back_to_any_error() async { if #available(AsyncAlgorithms 1.2, *) { - - // Ensure - at compile time - that error is effectively a TestError let shared: some AsyncSequence = FailingSequence(TestError.failure).share() do { From 81493bee5bb6422c5584cbcaaee9a0ad997c0731 Mon Sep 17 00:00:00 2001 From: Stefano Mondino Date: Wed, 15 Oct 2025 15:38:52 +0200 Subject: [PATCH 14/15] Update Tests/AsyncAlgorithmsTests/TestShare.swift Co-authored-by: Pyry Jahkola --- Tests/AsyncAlgorithmsTests/TestShare.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/AsyncAlgorithmsTests/TestShare.swift b/Tests/AsyncAlgorithmsTests/TestShare.swift index fab8bc75..d7c9b42b 100644 --- a/Tests/AsyncAlgorithmsTests/TestShare.swift +++ b/Tests/AsyncAlgorithmsTests/TestShare.swift @@ -598,7 +598,7 @@ final class TestShare: XCTestCase { XCTAssertEqual(error, TestError.failure) } } else { - _ = XCTSkip("This test is not available before 1.2") + throw XCTSkip("This test is not available before 1.2") } } } From 4345c0dedf6bb3ecbf77ffdd2d2c95936958c655 Mon Sep 17 00:00:00 2001 From: Stefano Mondino Date: Thu, 16 Oct 2025 09:59:09 +0200 Subject: [PATCH 15/15] fix copyrights, remove typos, change sleep to a custom shorthand backportable shorthand without nanoseconds --- .../AsyncAlgorithms/AsyncShareSequence.swift | 4 +- Sources/AsyncAlgorithms/Shims.swift | 2 +- .../Support/FailingSequence.swift | 10 ++-- .../Support/Task+Extensions.swift | 35 ++++++++++++++ Tests/AsyncAlgorithmsTests/TestShare.swift | 48 +++++++++---------- 5 files changed, 69 insertions(+), 30 deletions(-) create mode 100644 Tests/AsyncAlgorithmsTests/Support/Task+Extensions.swift diff --git a/Sources/AsyncAlgorithms/AsyncShareSequence.swift b/Sources/AsyncAlgorithms/AsyncShareSequence.swift index 45d4e672..f0f28ffa 100644 --- a/Sources/AsyncAlgorithms/AsyncShareSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncShareSequence.swift @@ -2,7 +2,7 @@ // // This source file is part of the Swift Async Algorithms open source project // -// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Copyright (c) 2025 Apple Inc. and the Swift project authors // Licensed under Apache License v2.0 with Runtime Library Exception // // See https://swift.org/LICENSE.txt for license information @@ -130,7 +130,7 @@ where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _Send // **Cleanup**: Automatically unregisters and cancels pending operations on deinit final class Side { // Due to a runtime crash in 1.0 compatible versions, it's not possible to handle - // a generic failure constrained to Base.Failure. We handle inner failure with a `any Error` + // a generic failure constrained to Base.Failure. We handle inner failure with a `any Error` // and force unwrap it to the generic 1.2 generic type on the outside Iterator. typealias Failure = any Error // Tracks the state of a single consumer's iteration. diff --git a/Sources/AsyncAlgorithms/Shims.swift b/Sources/AsyncAlgorithms/Shims.swift index 3e33b76b..e4b10f78 100644 --- a/Sources/AsyncAlgorithms/Shims.swift +++ b/Sources/AsyncAlgorithms/Shims.swift @@ -2,7 +2,7 @@ // // This source file is part of the Swift Async Algorithms open source project // -// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Copyright (c) 2025 Apple Inc. and the Swift project authors // Licensed under Apache License v2.0 with Runtime Library Exception // // See https://swift.org/LICENSE.txt for license information diff --git a/Tests/AsyncAlgorithmsTests/Support/FailingSequence.swift b/Tests/AsyncAlgorithmsTests/Support/FailingSequence.swift index d683b06e..d05bb682 100644 --- a/Tests/AsyncAlgorithmsTests/Support/FailingSequence.swift +++ b/Tests/AsyncAlgorithmsTests/Support/FailingSequence.swift @@ -1,9 +1,13 @@ +//===----------------------------------------------------------------------===// // -// FailingSequence.swift -// swift-async-algorithms +// This source file is part of the Swift Async Algorithms open source project // -// Created by Stefano Mondino on 15/10/25. +// Copyright (c) 2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception // +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// @available(AsyncAlgorithms 1.2, *) struct FailingSequence: AsyncSequence, Sendable { diff --git a/Tests/AsyncAlgorithmsTests/Support/Task+Extensions.swift b/Tests/AsyncAlgorithmsTests/Support/Task+Extensions.swift new file mode 100644 index 00000000..447e43bb --- /dev/null +++ b/Tests/AsyncAlgorithmsTests/Support/Task+Extensions.swift @@ -0,0 +1,35 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +/// Backportable versions of `sleep(for:)` for legacy platforms where this kind of method is not available +extension Task where Success == Never, Failure == Never { + + static func sleep(milliseconds duration: UInt64) async throws { + try await sleep(duration, multiplier: 1_000_000) + } + static func sleep(microseconds duration: UInt64) async throws { + try await sleep(duration, multiplier: 1_000) + } + static func sleep(seconds duration: UInt64) async throws { + try await sleep(duration, multiplier: 1_000_000_000) + } + + private static func sleep(_ value: UInt64, multiplier: UInt64) async throws { + guard UInt64.max / multiplier > value else { + throw SleepError.durationOutOfBounds + } + try await sleep(nanoseconds: value * multiplier) + } +} + +fileprivate enum SleepError: Error { + case durationOutOfBounds +} diff --git a/Tests/AsyncAlgorithmsTests/TestShare.swift b/Tests/AsyncAlgorithmsTests/TestShare.swift index d7c9b42b..95db012a 100644 --- a/Tests/AsyncAlgorithmsTests/TestShare.swift +++ b/Tests/AsyncAlgorithmsTests/TestShare.swift @@ -98,7 +98,7 @@ final class TestShare: XCTestCase { results1.withLock { $0.append(value) } } // Delay to allow consumer 2 to get ahead - try? await Task.sleep(nanoseconds: 10_000_000) + try? await Task.sleep(milliseconds: 10) // Continue reading while let value = await iterator.next() { results1.withLock { $0.append(value) } @@ -146,7 +146,7 @@ final class TestShare: XCTestCase { while let value = await iterator.next() { results1.withLock { $0.append(value) } // Add some delay to consumer 1 - try? await Task.sleep(nanoseconds: 1_000_000) + try? await Task.sleep(milliseconds: 1) } } @@ -193,7 +193,7 @@ final class TestShare: XCTestCase { slowResults.withLock { $0.append(value) } } // Add significant delay to let buffer fill up and potentially overflow - try? await Task.sleep(nanoseconds: 50_000_000) + try? await Task.sleep(milliseconds: 50) // Continue reading remaining elements while let value = await iterator.next() { slowResults.withLock { $0.append(value) } @@ -202,13 +202,13 @@ final class TestShare: XCTestCase { // Release all elements quickly to test buffer overflow behavior gated.advance() // 1 - try? await Task.sleep(nanoseconds: 5_000_000) + try? await Task.sleep(milliseconds: 5) gated.advance() // 2 - try? await Task.sleep(nanoseconds: 5_000_000) + try? await Task.sleep(milliseconds: 5) gated.advance() // 3 - try? await Task.sleep(nanoseconds: 5_000_000) + try? await Task.sleep(milliseconds: 5) gated.advance() // 4 - try? await Task.sleep(nanoseconds: 5_000_000) + try? await Task.sleep(milliseconds: 5) gated.advance() // 5 await fastConsumer.value @@ -265,7 +265,7 @@ final class TestShare: XCTestCase { slowResults.withLock { $0.append(value) } } // Add significant delay to let buffer fill up and potentially overflow - try? await Task.sleep(nanoseconds: 50_000_000) + try? await Task.sleep(milliseconds: 50) // Continue reading remaining elements while let value = await iterator.next() { slowResults.withLock { $0.append(value) } @@ -274,13 +274,13 @@ final class TestShare: XCTestCase { // Release all elements quickly to test buffer overflow behavior gated.advance() // 1 - try? await Task.sleep(nanoseconds: 5_000_000) + try? await Task.sleep(milliseconds: 5) gated.advance() // 2 - try? await Task.sleep(nanoseconds: 5_000_000) + try? await Task.sleep(milliseconds: 5) gated.advance() // 3 - try? await Task.sleep(nanoseconds: 5_000_000) + try? await Task.sleep(milliseconds: 5) gated.advance() // 4 - try? await Task.sleep(nanoseconds: 5_000_000) + try? await Task.sleep(milliseconds: 5) gated.advance() // 5 await fastConsumer.value @@ -490,7 +490,7 @@ final class TestShare: XCTestCase { gated.advance() // 2 // Give early consumer time to consume - try? await Task.sleep(nanoseconds: 10_000_000) + try? await Task.sleep(milliseconds: 10) // Start late consumer let lateConsumer = Task { @@ -587,18 +587,18 @@ final class TestShare: XCTestCase { } func test_share_rethrows_failure_type_without_falling_back_to_any_error() async { - if #available(AsyncAlgorithms 1.2, *) { - // Ensure - at compile time - that error is effectively a TestError - let shared: some AsyncSequence = FailingSequence(TestError.failure).share() - do { - for try await _ in shared { - XCTFail("Expected to not get here") - } - } catch { - XCTAssertEqual(error, TestError.failure) + guard #available(AsyncAlgorithms 1.2, *) else { + _ = XCTSkip("This test is not available before 1.2") + return + } + // Ensure - at compile time - that error is effectively a TestError + let shared: some AsyncSequence = FailingSequence(TestError.failure).share() + do { + for try await _ in shared { + XCTFail("Expected to not get here") } - } else { - throw XCTSkip("This test is not available before 1.2") + } catch { + XCTAssertEqual(error, TestError.failure) } } }