Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ let swiftSettings: [SwiftSetting] = [
// this "experimental" feature flag without two subsequent releases. We assume they will respect that
// promise, so we enable this here. For more, see:
// https://forums.swift.org/t/experimental-support-for-lifetime-dependencies-in-swift-6-2-and-beyond/78638
.enableExperimentalFeature("Lifetimes")
.enableExperimentalFeature("Lifetimes"),
.enableUpcomingFeature("NonisolatedNonsendingByDefault"),
]

// This doesn't work when cross-compiling: the privacy manifest will be included in the Bundle and
Expand Down
10 changes: 9 additions & 1 deletion Sources/NIOCore/AsyncAwaitSupport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,15 @@ extension AsyncSequence where Element == ByteBuffer {
// this has also the benefit of not copying at all,
// if the async sequence contains only one element.
var iterator = self.makeAsyncIterator()
guard var head = try await iterator.next() else {
let head: ByteBuffer?
if #available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *) {
head = try await iterator.next(isolation: #isolation)
} else {
var box = UnsafeTransfer(iterator)
head = try await box.wrappedValue.next()
}

guard var head = head else {
return ByteBuffer()
}
guard head.readableBytes <= maxBytes else {
Expand Down
15 changes: 15 additions & 0 deletions Sources/NIOCore/AsyncChannel/AsyncChannelInboundStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ extension NIOAsyncChannelInboundStream: AsyncSequence {
}

@inlinable
@concurrent
public mutating func next() async throws -> Element? {
switch self._backing {
case .asyncStream(var iterator):
Expand All @@ -147,6 +148,20 @@ extension NIOAsyncChannelInboundStream: AsyncSequence {
return try await iterator.next()
}
}

@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
mutating public func next(isolation actor: isolated (any Actor)?) async throws(any Error) -> Inbound? {
switch self._backing {
case .asyncStream(var iterator):
defer {
self._backing = .asyncStream(iterator)
}
return try await iterator.next(isolation: actor)

case .producer(let iterator):
return try await iterator.next()
}
}
}

@inlinable
Expand Down
6 changes: 6 additions & 0 deletions Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,15 @@ public struct NIOAsyncChannelOutboundWriter<OutboundOut: Sendable>: Sendable {
self.iterator = iterator
}

@concurrent
public mutating func next() async -> Element? {
await self.iterator.next()
}

@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
public mutating func next(isolation actor: isolated (any Actor)?) async -> Element? {
await self.iterator.next(isolation: actor)
}
}
}

Expand Down
1 change: 1 addition & 0 deletions Sources/NIOCore/NIODecodedAsyncSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ extension NIODecodedAsyncSequence: AsyncSequence {
/// The same as `next(isolation:)` but not isolated to an actor, which allows
/// for less availability restrictions.
@inlinable
@concurrent
public mutating func next() async throws -> Element? {
while true {
switch self.state {
Expand Down
17 changes: 17 additions & 0 deletions Sources/NIOFS/DirectoryEntries.swift
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public struct DirectoryEntries: AsyncSequence, Sendable {
self.currentBatch = []
}

@concurrent
public mutating func next() async throws -> DirectoryEntry? {
if self.currentBatch.isEmpty {
let batch = try await self.iterator.next()
Expand All @@ -74,6 +75,16 @@ public struct DirectoryEntries: AsyncSequence, Sendable {

return self.currentBatch.popFirst()
}

@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
public mutating func next(isolation actor: isolated (any Actor)?) async throws(any Error) -> DirectoryEntry? {
if self.currentBatch.isEmpty {
let batch = try await self.iterator.next(isolation: actor)
self.currentBatch = (batch ?? [])[...]
}

return self.currentBatch.popFirst()
}
}
}

Expand Down Expand Up @@ -128,9 +139,15 @@ extension DirectoryEntries {
self.iterator = iterator
}

@concurrent
public mutating func next() async throws -> [DirectoryEntry]? {
try await self.iterator.next()
}

@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
public mutating func next(isolation actor: isolated (any Actor)?) async throws(any Error) -> [DirectoryEntry]? {
try await self.iterator.next(isolation: actor)
}
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions Sources/NIOFS/FileChunks.swift
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,15 @@ public struct FileChunks: AsyncSequence, Sendable {
self.iterator = iterator
}

@concurrent
public mutating func next() async throws -> ByteBuffer? {
try await self.iterator.next()
}

@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
public mutating func next(isolation actor: isolated (any Actor)?) async throws(any Error) -> ByteBuffer? {
try await self.iterator.next(isolation: actor)
}
}
}

Expand Down
21 changes: 21 additions & 0 deletions Sources/NIOFS/Internal/BufferedOrAnyStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ internal enum BufferedOrAnyStream<Element: Sendable, Delegate: NIOAsyncSequenceP
case bufferedStream(AsyncSequenceProducer.AsyncIterator)
case anyAsyncSequence(AnyAsyncSequence<Element>.AsyncIterator)

@concurrent
internal mutating func next() async throws -> Element? {
let element: Element?
switch self {
Expand All @@ -59,6 +60,20 @@ internal enum BufferedOrAnyStream<Element: Sendable, Delegate: NIOAsyncSequenceP
return element
}

@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
internal mutating func next(isolation actor: isolated (any Actor)?) async throws(any Error) -> Element? {
let element: Element?
switch self {
case var .bufferedStream(iterator):
defer { self = .bufferedStream(iterator) }
element = try await iterator.next(isolation: actor)
case var .anyAsyncSequence(iterator):
defer { self = .anyAsyncSequence(iterator) }
element = try await iterator.next(isolation: actor)
}
return element
}

internal init(wrapping iterator: AsyncSequenceProducer.AsyncIterator) {
self = .bufferedStream(iterator)
}
Expand Down Expand Up @@ -91,8 +106,14 @@ internal struct AnyAsyncSequence<Element>: AsyncSequence, Sendable {
self.iterator = iterator
}

@concurrent
internal mutating func next() async throws -> Element? {
try await self.iterator.next() as? Element
}

@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
internal mutating func next(isolation actor: isolated (any Actor)?) async throws -> Element? {
try await self.iterator.next(isolation: actor) as? Element
}
}
}
2 changes: 1 addition & 1 deletion Sources/NIOPerformanceTester/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public func measureAndPrint(desc: String, fn: () throws -> Int) rethrows {

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public func measure(_ fn: () async throws -> Int) async rethrows -> [Double] {
func measureOne(_ fn: () async throws -> Int) async rethrows -> Double {
nonisolated(nonsending) func measureOne(_ fn: () async throws -> Int) async rethrows -> Double {
let start = DispatchTime.now().uptimeNanoseconds
_ = try await fn()
let end = DispatchTime.now().uptimeNanoseconds
Expand Down
8 changes: 7 additions & 1 deletion Sources/_NIOFileSystem/Internal/BufferedOrAnyStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,13 @@ internal struct AnyAsyncSequence<Element>: AsyncSequence, Sendable {
}

internal mutating func next() async throws -> Element? {
try await self.iterator.next() as? Element
var box = UnsafeTransfer(self.iterator)
return try await box.wrappedValue.next() as? Element
}

@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
internal mutating func next(isolation actor: isolated (any Actor)?) async throws(any Error) -> Element? {
try await self.iterator.next(isolation: actor) as? Element
}
}
}
10 changes: 6 additions & 4 deletions Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ final class AsyncChannelTests: XCTestCase {
}
}

@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
func testErrorsArePropagatedButAfterReads() async throws {
let channel = NIOAsyncTestingChannel()
let wrapped = try await channel.testingEventLoop.executeInContext {
Expand All @@ -196,7 +197,7 @@ final class AsyncChannelTests: XCTestCase {
let first = try await iterator.next()
XCTAssertEqual(first, "hello")

try await XCTAssertThrowsError(await iterator.next()) { error in
try await XCTAssertThrowsError(await iterator.next(isolation: #isolation)) { error in
XCTAssertEqual(error as? TestError, .bang)
}
}
Expand Down Expand Up @@ -267,6 +268,7 @@ final class AsyncChannelTests: XCTestCase {
try await channel.closeIgnoringSuppression()
}

@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
func testManagingBackPressure() async throws {
let channel = NIOAsyncTestingChannel()
let readCounter = ReadCounter()
Expand Down Expand Up @@ -330,13 +332,13 @@ final class AsyncChannelTests: XCTestCase {
// Now consume three elements from the pipeline. This should not unbuffer the read, as 3 elements remain.
var reader = inbound.makeAsyncIterator()
for _ in 0..<3 {
try await XCTAsyncAssertNotNil(await reader.next())
try await XCTAsyncAssertNotNil(await reader.next(isolation: #isolation))
}
await channel.testingEventLoop.run()
XCTAssertEqual(readCounter.readCount, 6)

// Removing the next element should trigger an automatic read.
try await XCTAsyncAssertNotNil(await reader.next())
try await XCTAsyncAssertNotNil(await reader.next(isolation: #isolation))
await channel.testingEventLoop.run()
XCTAssertEqual(readCounter.readCount, 7)

Expand Down Expand Up @@ -366,7 +368,7 @@ final class AsyncChannelTests: XCTestCase {

// This time we'll consume 4 more elements, and we won't find a read at all.
for _ in 0..<4 {
try await XCTAsyncAssertNotNil(await reader.next())
try await XCTAsyncAssertNotNil(await reader.next(isolation: #isolation))
}
await channel.testingEventLoop.run()
XCTAssertEqual(readCounter.readCount, 13)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,7 @@ private func XCTAssertEqualWithoutAutoclosure<T>(
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension AsyncSequence {
/// Collect all elements in the sequence into an array.
@concurrent
fileprivate func collect() async rethrows -> [Element] {
try await self.reduce(into: []) { accumulated, next in
accumulated.append(next)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,13 +386,12 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {

var elements = [Int]()

await XCTAssertThrowsError(
try await {
for try await element in self.sequence {
elements.append(element)
}
}()
) { error in
do {
for try await element in self.sequence {
elements.append(element)
}
XCTFail("Expected that an error is thrown in the loop above")
} catch {
XCTAssertEqual(error as? ChannelError, .alreadyClosed)
}

Expand Down Expand Up @@ -923,6 +922,7 @@ private func XCTAssertEqualWithoutAutoclosure<T>(
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension AsyncSequence {
/// Collect all elements in the sequence into an array.
@concurrent
fileprivate func collect() async rethrows -> [Element] {
try await self.reduce(into: []) { accumulated, next in
accumulated.append(next)
Expand Down
10 changes: 5 additions & 5 deletions Tests/NIOCoreTests/XCTest+AsyncAwait.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import XCTest

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
internal func XCTAssertThrowsError<T>(
nonisolated(nonsending) internal func XCTAssertThrowsError<T>(
_ expression: @autoclosure () async throws -> T,
file: StaticString = #filePath,
line: UInt = #line,
Expand All @@ -57,7 +57,7 @@ internal func XCTAssertThrowsError<T>(
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
internal func XCTAssertNoThrow<T>(
nonisolated(nonsending) internal func XCTAssertNoThrow<T>(
_ expression: @autoclosure () async throws -> T,
file: StaticString = #filePath,
line: UInt = #line
Expand All @@ -70,7 +70,7 @@ internal func XCTAssertNoThrow<T>(
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
internal func XCTAssertNoThrowWithResult<Result>(
nonisolated(nonsending) internal func XCTAssertNoThrowWithResult<Result>(
_ expression: @autoclosure () async throws -> Result,
file: StaticString = #filePath,
line: UInt = #line
Expand All @@ -84,7 +84,7 @@ internal func XCTAssertNoThrowWithResult<Result>(
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
internal func XCTAsyncAssertNotNil(
nonisolated(nonsending) internal func XCTAsyncAssertNotNil(
_ expression: @autoclosure () async throws -> Any?,
file: StaticString = #filePath,
line: UInt = #line
Expand All @@ -94,7 +94,7 @@ internal func XCTAsyncAssertNotNil(
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
internal func XCTAsyncAssertNil(
nonisolated(nonsending) internal func XCTAsyncAssertNil(
_ expression: @autoclosure () async throws -> Any?,
file: StaticString = #filePath,
line: UInt = #line
Expand Down
2 changes: 2 additions & 0 deletions Tests/NIOEmbeddedTests/XCTest+AsyncAwait.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import XCTest

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
nonisolated(nonsending)
internal func XCTAssertThrowsError<T>(
_ expression: @autoclosure () async throws -> T,
file: StaticString = #filePath,
Expand All @@ -57,6 +58,7 @@ internal func XCTAssertThrowsError<T>(
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
nonisolated(nonsending)
internal func XCTAssertNoThrowWithResult<Result>(
_ expression: @autoclosure () async throws -> Result,
file: StaticString = #filePath,
Expand Down
5 changes: 4 additions & 1 deletion Tests/NIOFSIntegrationTests/FileHandleTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -699,10 +699,12 @@ final class FileHandleTests: XCTestCase {
try await self.testCloseOrDetachMidRead(close: false)
}

@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
func testCloseBeforeReadingFromFile() async throws {
try await self.testCloseOrDetachBeforeRead(close: true)
}

@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
func testDetachBeforeReadingFromFile() async throws {
try await self.testCloseOrDetachBeforeRead(close: false)
}
Expand Down Expand Up @@ -748,6 +750,7 @@ final class FileHandleTests: XCTestCase {
}
}

@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, *)
private func testCloseOrDetachBeforeRead(close: Bool) async throws {
try await self.withHandle(forFileAtPath: Self.thisFile, autoClose: false) { handle in
if close {
Expand All @@ -759,7 +762,7 @@ final class FileHandleTests: XCTestCase {

var iterator = handle.readChunks().makeAsyncIterator()
await XCTAssertThrowsFileSystemErrorAsync {
try await iterator.next()
try await iterator.next(isolation: #isolation)
} onError: { error in
XCTAssertEqual(error.code, .closed)
}
Expand Down
Loading
Loading