Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// swift-tools-version: 5.8
// swift-tools-version: 6.2

import PackageDescription
import CompilerPluginSupport
Expand Down
62 changes: 62 additions & 0 deletions Package@swift-5.8.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// swift-tools-version: 5.8

import PackageDescription
import CompilerPluginSupport

// Availability Macros

let availabilityMacros: [SwiftSetting] = [
.enableExperimentalFeature("AvailabilityMacro=AsyncAlgorithms 1.0:macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0"),
.enableExperimentalFeature("AvailabilityMacro=AsyncAlgorithms 1.1:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0"),
]

let package = Package(
name: "swift-async-algorithms",
products: [
.library(name: "AsyncAlgorithms", targets: ["AsyncAlgorithms"])
],
targets: [
.target(
name: "AsyncAlgorithms",
dependencies: [
.product(name: "OrderedCollections", package: "swift-collections"),
.product(name: "DequeModule", package: "swift-collections"),
],
swiftSettings: availabilityMacros + [
.enableExperimentalFeature("StrictConcurrency=complete")
]
),
.target(
name: "AsyncSequenceValidation",
dependencies: ["_CAsyncSequenceValidationSupport", "AsyncAlgorithms"],
swiftSettings: availabilityMacros + [
.enableExperimentalFeature("StrictConcurrency=complete")
]
),
.systemLibrary(name: "_CAsyncSequenceValidationSupport"),
.target(
name: "AsyncAlgorithms_XCTest",
dependencies: ["AsyncAlgorithms", "AsyncSequenceValidation"],
swiftSettings: availabilityMacros + [
.enableExperimentalFeature("StrictConcurrency=complete")
]
),
.testTarget(
name: "AsyncAlgorithmsTests",
dependencies: ["AsyncAlgorithms", "AsyncSequenceValidation", "AsyncAlgorithms_XCTest"],
swiftSettings: availabilityMacros + [
.enableExperimentalFeature("StrictConcurrency=complete")
]
),
]
)

if Context.environment["SWIFTCI_USE_LOCAL_DEPS"] == nil {
package.dependencies += [
.package(url: "https://github.com/apple/swift-collections.git", from: "1.1.0"),
]
} else {
package.dependencies += [
.package(path: "../swift-collections")
]
}
4 changes: 2 additions & 2 deletions Sources/AsyncAlgorithms/Buffer/AsyncBufferSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ public struct AsyncBufferSequence<Base: AsyncSequence & Sendable>: AsyncSequence
self.storageType = .transparent(iterator)
return element
case .bounded(let storage):
return try await storage.next()?._rethrowGet()
return try await storage.next().wrapped?._rethrowGet()
case .unbounded(let storage):
return try await storage.next()?._rethrowGet()
return try await storage.next().wrapped?._rethrowGet()
Comment on lines -124 to +126
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we leave a comment here why the unsafe transfer is needed and why it is sound?

}
}
}
Expand Down
10 changes: 5 additions & 5 deletions Sources/AsyncAlgorithms/Buffer/BoundedBufferStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import DequeModule
struct BoundedBufferStateMachine<Base: AsyncSequence> {
typealias Element = Base.Element
typealias SuspendedProducer = UnsafeContinuation<Void, Never>
typealias SuspendedConsumer = UnsafeContinuation<Result<Base.Element, Error>?, Never>
typealias SuspendedConsumer = UnsafeContinuation<UnsafeTransfer<Result<Base.Element, Error>?>, Never>

// We are using UnsafeTransfer here since we have to get the elements from the task
// into the consumer task. This is a transfer but we cannot prove this to the compiler at this point
Expand Down Expand Up @@ -137,7 +137,7 @@ struct BoundedBufferStateMachine<Base: AsyncSequence> {

enum ElementProducedAction {
case none
case resumeConsumer(continuation: SuspendedConsumer, result: Result<Element, Error>)
case resumeConsumer(continuation: SuspendedConsumer, result: UnsafeTransfer<Result<Base.Element, Error>?>)
}

mutating func elementProduced(element: Element) -> ElementProducedAction {
Expand All @@ -161,7 +161,7 @@ struct BoundedBufferStateMachine<Base: AsyncSequence> {
// we have an awaiting consumer, we can resume it with the element and exit
precondition(buffer.isEmpty, "Invalid state. The buffer should be empty.")
self.state = .buffering(task: task, buffer: buffer, suspendedProducer: nil, suspendedConsumer: nil)
return .resumeConsumer(continuation: suspendedConsumer, result: .success(element))
return .resumeConsumer(continuation: suspendedConsumer, result: UnsafeTransfer(.success(element)))

case .buffering(_, _, .some, _):
preconditionFailure("Invalid state. There should not be a suspended producer.")
Expand All @@ -177,7 +177,7 @@ struct BoundedBufferStateMachine<Base: AsyncSequence> {
enum FinishAction {
case none
case resumeConsumer(
continuation: UnsafeContinuation<Result<Base.Element, Error>?, Never>?
continuation: UnsafeContinuation<UnsafeTransfer<Result<Base.Element, Error>?>, Never>?
)
}

Expand Down Expand Up @@ -295,7 +295,7 @@ struct BoundedBufferStateMachine<Base: AsyncSequence> {
case resumeProducerAndConsumer(
task: Task<Void, Never>,
producerContinuation: UnsafeContinuation<Void, Never>?,
consumerContinuation: UnsafeContinuation<Result<Base.Element, Error>?, Never>?
consumerContinuation: UnsafeContinuation<UnsafeTransfer<Result<Base.Element, Error>?>, Never>?
)
}

Expand Down
15 changes: 8 additions & 7 deletions Sources/AsyncAlgorithms/Buffer/BoundedBufferStorage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ final class BoundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Send
self.stateMachine = ManagedCriticalState(BoundedBufferStateMachine(base: base, limit: limit))
}

func next() async -> Result<Base.Element, Error>? {
func next() async -> UnsafeTransfer<Result<Base.Element, Error>?> {
return await withTaskCancellationHandler {
let action: BoundedBufferStateMachine<Base>.NextAction? = self.stateMachine.withCriticalRegion {
stateMachine in
Expand Down Expand Up @@ -45,14 +45,14 @@ final class BoundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Send

case .returnResult(let producerContinuation, let result):
producerContinuation?.resume()
return result
return UnsafeTransfer(result)

case .none:
break
}

return await withUnsafeContinuation {
(continuation: UnsafeContinuation<Result<Base.Element, Error>?, Never>) in
(continuation: UnsafeContinuation<UnsafeTransfer<Result<Base.Element, Error>?>, Never>) in
let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.nextSuspended(continuation: continuation)
}
Expand All @@ -61,7 +61,7 @@ final class BoundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Send
break
case .returnResult(let producerContinuation, let result):
producerContinuation?.resume()
continuation.resume(returning: result)
continuation.resume(returning: UnsafeTransfer(result))
}
}
} onCancel: {
Expand Down Expand Up @@ -109,6 +109,7 @@ final class BoundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Send
case .none:
break
case .resumeConsumer(let continuation, let result):

continuation.resume(returning: result)
}
}
Expand All @@ -120,7 +121,7 @@ final class BoundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Send
case .none:
break
case .resumeConsumer(let continuation):
continuation?.resume(returning: nil)
continuation?.resume(returning: UnsafeTransfer(nil))
}
} catch {
let action = self.stateMachine.withCriticalRegion { stateMachine in
Expand All @@ -130,7 +131,7 @@ final class BoundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Send
case .none:
break
case .resumeConsumer(let continuation):
continuation?.resume(returning: .failure(error))
continuation?.resume(returning: UnsafeTransfer(Result<Base.Element, Error>.failure(error)))
}
}
}
Expand All @@ -148,7 +149,7 @@ final class BoundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Send
case .resumeProducerAndConsumer(let task, let producerContinuation, let consumerContinuation):
task.cancel()
producerContinuation?.resume()
consumerContinuation?.resume(returning: nil)
consumerContinuation?.resume(returning: UnsafeTransfer(nil))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import DequeModule
@available(AsyncAlgorithms 1.0, *)
struct UnboundedBufferStateMachine<Base: AsyncSequence> {
typealias Element = Base.Element
typealias SuspendedConsumer = UnsafeContinuation<Result<Element, Error>?, Never>
typealias SuspendedConsumer = UnsafeContinuation<UnsafeTransfer<Result<Element, Error>?>, Never>

enum Policy {
case unlimited
Expand Down Expand Up @@ -73,7 +73,7 @@ struct UnboundedBufferStateMachine<Base: AsyncSequence> {
case none
case resumeConsumer(
continuation: SuspendedConsumer,
result: Result<Element, Error>
result: UnsafeTransfer<Result<Element, Error>?>
)
}

Expand Down Expand Up @@ -108,7 +108,7 @@ struct UnboundedBufferStateMachine<Base: AsyncSequence> {
self.state = .buffering(task: task, buffer: buffer, suspendedConsumer: nil)
return .resumeConsumer(
continuation: suspendedConsumer,
result: .success(element)
result: UnsafeTransfer(.success(element))
)

case .modifying:
Expand Down
14 changes: 7 additions & 7 deletions Sources/AsyncAlgorithms/Buffer/UnboundedBufferStorage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ final class UnboundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Se
self.stateMachine = ManagedCriticalState(UnboundedBufferStateMachine<Base>(base: base, policy: policy))
}

func next() async -> Result<Base.Element, Error>? {
func next() async -> UnsafeTransfer<Result<Base.Element, Error>?> {
return await withTaskCancellationHandler {

let action: UnboundedBufferStateMachine<Base>.NextAction? = self.stateMachine.withCriticalRegion {
Expand All @@ -42,21 +42,21 @@ final class UnboundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Se
case .suspend:
break
case .returnResult(let result):
return result
return UnsafeTransfer(result)
case .none:
break
}

return await withUnsafeContinuation {
(continuation: UnsafeContinuation<Result<Base.Element, Error>?, Never>) in
(continuation: UnsafeContinuation<UnsafeTransfer<Result<Base.Element, Error>?>, Never>) in
let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.nextSuspended(continuation: continuation)
}
switch action {
case .none:
break
case .resumeConsumer(let result):
continuation.resume(returning: result)
continuation.resume(returning: UnsafeTransfer(result))
}
}
} onCancel: {
Expand Down Expand Up @@ -89,7 +89,7 @@ final class UnboundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Se
case .none:
break
case .resumeConsumer(let continuation):
continuation?.resume(returning: nil)
continuation?.resume(returning: UnsafeTransfer(nil))
}
} catch {
let action = self.stateMachine.withCriticalRegion { stateMachine in
Expand All @@ -99,7 +99,7 @@ final class UnboundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Se
case .none:
break
case .resumeConsumer(let continuation):
continuation?.resume(returning: .failure(error))
continuation?.resume(returning: UnsafeTransfer(Result<Base.Element, Error>.failure(error)))
}
}
}
Expand All @@ -116,7 +116,7 @@ final class UnboundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Se
break
case .resumeConsumer(let task, let continuation):
task.cancel()
continuation?.resume(returning: nil)
continuation?.resume(returning: UnsafeTransfer(nil))
}
}

Expand Down
2 changes: 1 addition & 1 deletion Sources/AsyncSequenceValidation/Clock.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import AsyncAlgorithms

@available(AsyncAlgorithms 1.0, *)
extension AsyncSequenceValidationDiagram {
public struct Clock {
public struct Clock: Sendable {
let queue: WorkQueue

init(queue: WorkQueue) {
Expand Down
10 changes: 5 additions & 5 deletions Sources/AsyncSequenceValidation/TaskDriver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,18 @@ func start_thread(_ raw: UnsafeMutableRawPointer) -> UnsafeMutableRawPointer {
#endif

@available(AsyncAlgorithms 1.0, *)
final class TaskDriver {
let work: (TaskDriver) -> Void
final class TaskDriver: Sendable {
let work: @Sendable (TaskDriver) -> Void
let queue: WorkQueue
#if canImport(Darwin)
var thread: pthread_t?
nonisolated(unsafe) var thread: pthread_t?
#elseif canImport(Glibc) || canImport(Musl) || canImport(Bionic)
var thread = pthread_t()
nonisolated(unsafe) var thread = pthread_t()
#elseif canImport(WinSDK)
#error("TODO: Port TaskDriver threading to windows")
#endif

init(queue: WorkQueue, _ work: @escaping (TaskDriver) -> Void) {
init(queue: WorkQueue, _ work: @Sendable @escaping (TaskDriver) -> Void) {
self.queue = queue
self.work = work
}
Expand Down
10 changes: 5 additions & 5 deletions Sources/AsyncSequenceValidation/Test.swift
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ extension AsyncSequenceValidationDiagram {
}
}

private static let _executor: AnyObject = {
private static let _executor: AnyObject & Sendable = {
guard #available(macOS 14.0, iOS 17.0, watchOS 10.0, tvOS 17.0, *) else {
return ClockExecutor_Pre5_9()
}
Expand All @@ -134,13 +134,13 @@ extension AsyncSequenceValidationDiagram {
}
#endif

static var clock: Clock?
nonisolated(unsafe) static var clock: Clock?

static var driver: TaskDriver?
nonisolated(unsafe) static var driver: TaskDriver?

static var currentJob: Job?
nonisolated(unsafe) static var currentJob: Job?

static var specificationFailures = [ExpectationFailure]()
nonisolated(unsafe) static var specificationFailures = [ExpectationFailure]()
}

enum ActualResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,10 @@

typedef struct _Job* JobRef;

#define NONISOLATED_UNSAFE __attribute__((swift_attr("nonisolated(unsafe)")))

typedef SWIFT_CC(swift) void (*swift_task_enqueueGlobal_original)(JobRef _Nonnull job);
SWIFT_EXPORT_FROM(swift_Concurrency)
SWIFT_CC(swift) void (* _Nullable swift_task_enqueueGlobal_hook)(
NONISOLATED_UNSAFE SWIFT_CC(swift) void (* _Nullable swift_task_enqueueGlobal_hook)(
JobRef _Nonnull job, swift_task_enqueueGlobal_original _Nonnull original);

Loading