Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ import CompilerPluginSupport
let AsyncAlgorithms_v1_0 = "AvailabilityMacro=AsyncAlgorithms 1.0:macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0"
#if compiler(>=6.0) && swift(>=6.0) // 5.10 doesnt support visionOS availability
let AsyncAlgorithms_v1_1 =
"AvailabilityMacro=AsyncAlgorithms 1.1:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0"
"AvailabilityMacro=AsyncAlgorithms 1.1:macOS 11.0, iOS 14.0, tvOS 14.0, watchOS 7.0, visionOS 1.0"
let AsyncAlgorithms_v1_2 =
"AvailabilityMacro=AsyncAlgorithms 1.2:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0"
#else
let AsyncAlgorithms_v1_1 = "AvailabilityMacro=AsyncAlgorithms 1.1:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0"
let AsyncAlgorithms_v1_1 = "AvailabilityMacro=AsyncAlgorithms 1.1:macOS 11.0, iOS 14.0, tvOS 14.0, watchOS 7.0"
let AsyncAlgorithms_v1_2 =
"AvailabilityMacro=AsyncAlgorithms 1.2:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0"
#endif

let availabilityMacros: [SwiftSetting] = [
Expand All @@ -18,6 +22,9 @@ let availabilityMacros: [SwiftSetting] = [
.enableExperimentalFeature(
AsyncAlgorithms_v1_1
),
.enableExperimentalFeature(
AsyncAlgorithms_v1_2
)
]

let package = Package(
Expand Down
90 changes: 50 additions & 40 deletions Sources/AsyncAlgorithms/AsyncShareSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//

#if compiler(>=6.2)

import Synchronization
import DequeModule

@available(AsyncAlgorithms 1.1, *)
extension AsyncSequence
where Element: Sendable, Self: SendableMetatype, AsyncIterator: SendableMetatype {
where Element: Sendable, Self: _SendableMetatype, AsyncIterator: _SendableMetatype {
/// Creates a shared async sequence that allows multiple concurrent iterations over a single source.
///
/// The `share` method transforms an async sequence into a shareable sequence that can be safely
Expand Down Expand Up @@ -67,7 +66,7 @@ where Element: Sendable, Self: SendableMetatype, AsyncIterator: SendableMetatype
///
public func share(
bufferingPolicy: AsyncBufferSequencePolicy = .bounded(1)
) -> some AsyncSequence<Element, Failure> & Sendable {
) -> AsyncShareSequence<Self> {
// The iterator is transferred to the isolation of the iterating task
// this has to be done "unsafely" since we cannot annotate the transfer
// however since iterating an AsyncSequence types twice has been defined
Expand Down Expand Up @@ -115,8 +114,8 @@ where Element: Sendable, Self: SendableMetatype, AsyncIterator: SendableMetatype
// This type is typically not used directly; instead, use the `share()` method on any
// async sequence that meets the sendability requirements.
@available(AsyncAlgorithms 1.1, *)
struct AsyncShareSequence<Base: AsyncSequence>: Sendable
where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: SendableMetatype {
public struct AsyncShareSequence<Base: AsyncSequence>: Sendable
where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _SendableMetatype {
// Represents a single consumer's connection to the shared sequence.
//
// Each iterator of the shared sequence creates its own `Side` instance, which tracks
Expand All @@ -130,12 +129,16 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
// **Usage**: Tracks buffer position and manages async continuations
// **Cleanup**: Automatically unregisters and cancels pending operations on deinit
final class Side {
// Due to a runtime crash in 1.0 compatible versions, it's not possible to handle
// a generic failure constrained to Base.Failure. We handle inner failure with a `any Error`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// a generic failure constrained to Base.Failure. We handle inner failure with a �`any Error`
// a generic failure constrained to Base.Failure. We handle inner failure with a `any Error`

// and force unwrap it to the generic 1.2 generic type on the outside Iterator.
typealias Failure = any Error
// Tracks the state of a single consumer's iteration.
//
// - `continuation`: The continuation waiting for the next element (nil if not waiting)
// - `position`: The consumer's current position in the shared buffer
struct State {
var continuation: UnsafeContinuation<Result<Element?, Failure>, Never>?
var continuation: UnsafeContinuation<Result<Base.Element?, Failure>, Never>?
var position = 0

// Creates a new state with the position adjusted by the given offset.
Expand All @@ -162,7 +165,7 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
iteration.unregisterSide(id)
}

func next(isolation actor: isolated (any Actor)?) async throws(Failure) -> Element? {
func next(isolation actor: isolated (any Actor)?) async throws(Failure) -> Base.Element? {
try await iteration.next(isolation: actor, id: id)
}
}
Expand All @@ -181,6 +184,7 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
// All operations are synchronized using a `Mutex` to ensure thread-safe access
// to the shared state across multiple concurrent consumers.
final class Iteration: Sendable {
typealias Failure = Side.Failure
// Represents the state of the background task that consumes the source sequence.
//
// The iteration task goes through several states during its lifecycle:
Expand Down Expand Up @@ -230,7 +234,7 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
var generation = 0
var sides = [Int: Side.State]()
var iteratingTask: IteratingTask
private(set) var buffer = Deque<Element>()
private(set) var buffer = Deque<Base.Element>()
private(set) var finished = false
private(set) var failure: Failure?
var cancelled = false
Expand Down Expand Up @@ -311,7 +315,7 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
// **Buffering Newest**: Appends if under the limit, otherwise removes the oldest and appends
//
// - Parameter element: The element to add to the buffer
mutating func enqueue(_ element: Element) {
mutating func enqueue(_ element: Base.Element) {
let count = buffer.count

switch storagePolicy {
Expand Down Expand Up @@ -341,14 +345,14 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
}
}

let state: Mutex<State>
let state: ManagedCriticalState<State>
let limit: Int?

init(
_ iteratorFactory: @escaping @Sendable () -> sending Base.AsyncIterator,
bufferingPolicy: AsyncBufferSequencePolicy
) {
state = Mutex(State(iteratorFactory, bufferingPolicy: bufferingPolicy))
state = ManagedCriticalState(State(iteratorFactory, bufferingPolicy: bufferingPolicy))
switch bufferingPolicy.policy {
case .bounded(let limit):
self.limit = limit
Expand Down Expand Up @@ -478,15 +482,15 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
}

struct Resumption {
let continuation: UnsafeContinuation<Result<Element?, Failure>, Never>
let result: Result<Element?, Failure>
let continuation: UnsafeContinuation<Result<Base.Element?, Failure>, Never>
let result: Result<Base.Element?, Failure>

func resume() {
continuation.resume(returning: result)
}
}

func emit(_ result: Result<Element?, Failure>) {
func emit(_ result: Result<Base.Element?, Failure>) {
let (resumptions, limitContinuation, demandContinuation, cancelled) = state.withLock {
state -> ([Resumption], UnsafeContinuation<Bool, Never>?, UnsafeContinuation<Void, Never>?, Bool) in
var resumptions = [Resumption]()
Expand Down Expand Up @@ -533,12 +537,12 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab

private func nextIteration(
_ id: Int
) async -> Result<AsyncShareSequence<Base>.Element?, AsyncShareSequence<Base>.Failure> {
) async -> Result<Base.Element?, Failure> {
return await withTaskCancellationHandler {
await withUnsafeContinuation { continuation in
let (res, limitContinuation, demandContinuation, cancelled) = state.withLock {
state -> (
Result<Element?, Failure>?, UnsafeContinuation<Bool, Never>?, UnsafeContinuation<Void, Never>?, Bool
Result<Base.Element?, Failure>?, UnsafeContinuation<Bool, Never>?, UnsafeContinuation<Void, Never>?, Bool
) in
guard let side = state.sides[id] else {
return state.emit(.success(nil), limit: limit)
Expand Down Expand Up @@ -587,24 +591,23 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
}
}
} catch {
emit(.failure(error as! Failure))
emit(.failure(error))
}
}

func next(isolation actor: isolated (any Actor)?, id: Int) async throws(Failure) -> Element? {
let (factory, cancelled) = state.withLock { state -> ((@Sendable () -> sending Base.AsyncIterator)?, Bool) in
switch state.iteratingTask {
case .pending(let factory):
state.iteratingTask = .starting
return (factory, false)
case .cancelled:
return (nil, true)
default:
return (nil, false)
func next(isolation actor: isolated (any Actor)?, id: Int) async throws(Failure) -> Base.Element? {
let iteratingTask = state.withLock { state -> IteratingTask in
defer {
if case .pending = state.iteratingTask {
state.iteratingTask = .starting
}
}
return state.iteratingTask
}
if cancelled { return nil }
if let factory {

if case .cancelled = iteratingTask { return nil }

if case .pending(let factory) = iteratingTask {
let task: Task<Void, Never>
// for the fancy dance of availability and canImport see the comment on the next check for details
#if swift(>=6.2)
Expand Down Expand Up @@ -659,7 +662,6 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab
#else
return try await nextIteration(id).get()
#endif

}
}

Expand Down Expand Up @@ -699,26 +701,34 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab

@available(AsyncAlgorithms 1.1, *)
extension AsyncShareSequence: AsyncSequence {
typealias Element = Base.Element
typealias Failure = Base.Failure

struct Iterator: AsyncIteratorProtocol {
public typealias Element = Base.Element
@available(AsyncAlgorithms 1.2, *)
public typealias Failure = Base.Failure
public struct Iterator: AsyncIteratorProtocol, _SendableMetatype {
let side: Side

init(_ iteration: Iteration) {
side = Side(iteration)
}

mutating func next() async rethrows -> Element? {
mutating public func next() async rethrows -> Element? {
try await side.next(isolation: nil)
}

mutating func next(isolation actor: isolated (any Actor)?) async throws(Failure) -> Element? {
try await side.next(isolation: actor)

@available(AsyncAlgorithms 1.2, *)
mutating public func next(isolation actor: isolated (any Actor)?) async throws(Failure) -> Element? {
do {
return try await side.next(isolation: actor)
} catch {
// It's guaranteed to match `Failure` but we are keeping the internal `Side` and `Iteration`
// constrained to `any Error` to prevent a compiler bug visible at runtime
// on pre 1.2 operating systems
throw error as! Failure
}
}
}

func makeAsyncIterator() -> Iterator {
public func makeAsyncIterator() -> Iterator {
Iterator(extent.iteration)
}
}
Expand Down
18 changes: 18 additions & 0 deletions Sources/AsyncAlgorithms/Shims.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2022 Apple Inc. and the Swift project authors
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Copyright (c) 2022 Apple Inc. and the Swift project authors
// Copyright (c) 2025 Apple Inc. and the Swift project authors

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be some other ones that I copy/pasta'd too .... oops...

// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//

import Foundation

#if compiler(>=6.2)
public typealias _SendableMetatype = SendableMetatype
#else
public typealias _SendableMetatype = Any
#endif
26 changes: 26 additions & 0 deletions Tests/AsyncAlgorithmsTests/Support/FailingSequence.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//
// FailingSequence.swift
// swift-async-algorithms
//
// Created by Stefano Mondino on 15/10/25.
//
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also be updated to the right attribution header:

//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2025 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//


@available(AsyncAlgorithms 1.2, *)
struct FailingSequence<Failure: Error>: AsyncSequence, Sendable {
typealias Element = Void
let error: Failure
init(_ error: Failure) {
self.error = error
}
func makeAsyncIterator() -> AsyncIterator { AsyncIterator(error: error) }

struct AsyncIterator: AsyncIteratorProtocol, Sendable {
let error: Failure
func next() async throws(Failure) -> Void? {
throw error
}
mutating func next(completion: @escaping (Result<Element?, Failure>) -> Void) async throws(Failure) -> Element? {
throw error
}
}
}
Loading