Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
**v0.5.5 - Sodium:**

This version add support for a throwing `onStart` closure in the `AsyncHandleEventsSequence`.

**v0.5.4 - Neon:**

This release conforms AnyAsyncSequence to Sendable.

**v0.5.3 - Fluorine:**

The release fixes a conflict on the Failure primary associated type newly defined in Swift.

- Fix Xcode 16 build failure caused by AsyncSubject (https://github.com/sideeffect-io/AsyncExtensions/pull/42)

**v0.5.2 - Oxygen:**

This version is a bug fix version.
Expand Down
32 changes: 16 additions & 16 deletions Sources/Operators/AsyncHandleEventsSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public extension AsyncSequence {
/// whether it is due to an error or a normal termination.
/// - Returns: The AsyncSequence that executes the `receiveElement` operation for each element of the source sequence.
func handleEvents(
onStart: (@Sendable () async -> Void)? = nil,
onStart: (@Sendable () async throws -> Void)? = nil,
onElement: (@Sendable (Element) async -> Void)? = nil,
onCancel: (@Sendable () async -> Void)? = nil,
onFinish: (@Sendable (Termination<Error>) async -> Void)? = nil
Expand All @@ -60,14 +60,14 @@ public struct AsyncHandleEventsSequence<Base: AsyncSequence>: AsyncSequence {
public typealias AsyncIterator = Iterator

var base: Base
let onStart: (@Sendable () async -> Void)?
let onStart: (@Sendable () async throws -> Void)?
let onElement: (@Sendable (Base.Element) async -> Void)?
let onCancel: (@Sendable () async -> Void)?
let onFinish: (@Sendable (Termination<Error>) async -> Void)?

public init(
_ base: Base,
onStart: (@Sendable () async -> Void)?,
onStart: (@Sendable () async throws -> Void)?,
onElement: (@Sendable (Base.Element) async -> Void)?,
onCancel: (@Sendable () async -> Void)?,
onFinish: (@Sendable (Termination<Error>) async -> Void)?
Expand All @@ -92,7 +92,7 @@ public struct AsyncHandleEventsSequence<Base: AsyncSequence>: AsyncSequence {
public struct Iterator: AsyncIteratorProtocol {
var base: Base.AsyncIterator

let onStart: (@Sendable () async -> Void)?
let onStart: (@Sendable () async throws -> Void)?
let onElement: (@Sendable (Base.Element) async -> Void)?
let onCancel: (@Sendable () async -> Void)?
let onFinish: (@Sendable (Termination<Error>) async -> Void)?
Expand All @@ -101,7 +101,7 @@ public struct AsyncHandleEventsSequence<Base: AsyncSequence>: AsyncSequence {

public init(
base: Base.AsyncIterator,
onStart: (@Sendable () async -> Void)?,
onStart: (@Sendable () async throws -> Void)?,
onElement: (@Sendable (Base.Element) async -> Void)?,
onCancel: (@Sendable () async -> Void)?,
onFinish: (@Sendable (Termination<Error>) async -> Void)?
Expand All @@ -113,25 +113,25 @@ public struct AsyncHandleEventsSequence<Base: AsyncSequence>: AsyncSequence {
self.onFinish = onFinish
}

public mutating func next() async rethrows -> Element? {
public mutating func next() async throws -> Element? {
guard !Task.isCancelled else {
await self.onCancel?()
return nil
}

let shouldCallOnStart = self.onStartExecuted.withCriticalRegion { onStartExecuted -> Bool in
if !onStartExecuted {
onStartExecuted = true
return true
do {
let shouldCallOnStart = self.onStartExecuted.withCriticalRegion { onStartExecuted -> Bool in
if !onStartExecuted {
onStartExecuted = true
return true
}
return false
}
return false
}

if shouldCallOnStart {
await self.onStart?()
}
if shouldCallOnStart {
try await self.onStart?()
}

do {
let nextElement = try await self.base.next()

if let element = nextElement {
Expand Down
29 changes: 29 additions & 0 deletions Tests/Operators/AsyncHandleEventsSequenceTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,35 @@ final class AsyncHandleEventsSequenceTests: XCTestCase {
XCTAssertEqual(received.criticalState, ["start", "1", "2", "3", "4", "5", "finish finished"])
}

func test_onStart_calls_onFinish_when_throwing() async throws {
let received = ManagedCriticalState([String]())

let sourceSequence = [1, 2, 3, 4, 5].async
let sut = sourceSequence.handleEvents {
received.withCriticalRegion { $0.append("start") }
throw NSError(domain: "error", code: 0)
} onElement: { element in
received.withCriticalRegion { $0.append("\(element)") }
} onCancel: {
received.withCriticalRegion { $0.append("cancelled") }
} onFinish: { completion in
switch completion {
case .finished:
received.withCriticalRegion { $0.append("finish finished") }
case let .failure(error):
received.withCriticalRegion { $0.append("finish error \((error as NSError).code)") }
}
}

do {
for try await _ in sut {}
XCTFail("The stream should have thrown")
} catch {
XCTAssertEqual((error as NSError).code, 0)
XCTAssertEqual(received.criticalState, ["start", "finish error 0"])
}
}

func test_iteration_calls_onCancel_when_task_is_cancelled() {
let firstElementHasBeenReceivedExpectation = expectation(description: "First element has been emitted")
let taskHasBeenCancelledExpectation = expectation(description: "The task has been cancelled")
Expand Down