Skip to content

Commit bca8299

Browse files
committed
fix: fix data race with task cancellation
1 parent e3dcfeb commit bca8299

23 files changed

+1565
-493
lines changed

.github/workflows/main.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ on:
55
branches: [ main ]
66
pull_request:
77
branches: [ main ]
8-
schedule:
9-
- cron: '0 0 * * *'
8+
# schedule:
9+
# - cron: '0 0 * * *'
1010
workflow_dispatch:
1111
inputs:
1212
release:

AsyncObjects.podspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,6 @@ Pod::Spec.new do |s|
4141

4242
s.test_spec do |ts|
4343
ts.source_files = "Tests/#{s.name}Tests/**/*.swift"
44+
ts.scheme = { :parallelizable => true }
4445
end
4546
end

AsyncObjects.xcodeproj/project.pbxproj

Lines changed: 401 additions & 393 deletions
Large diffs are not rendered by default.

Sources/AsyncObjects/AsyncCountdownEvent.swift

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@ import OrderedCollections
2222
public actor AsyncCountdownEvent: AsyncObject {
2323
/// The suspended tasks continuation type.
2424
@usableFromInline
25-
typealias Continuation = GlobalContinuation<Void, Error>
25+
typealias Continuation = SafeContinuation<GlobalContinuation<Void, Error>>
26+
/// The platform dependent lock used to synchronize continuations tracking.
27+
@usableFromInline
28+
let locker: Locker = .init()
2629
/// The continuations stored with an associated key for all the suspended task that are waiting to be resumed.
2730
@usableFromInline
2831
private(set) var continuations: OrderedDictionary<UUID, Continuation> = [:]
@@ -47,6 +50,12 @@ public actor AsyncCountdownEvent: AsyncObject {
4750

4851
// MARK: Internal
4952

53+
/// Checks whether to wait for countdown to signal.
54+
///
55+
/// - Returns: Whether to wait to be resumed later.
56+
@inlinable
57+
func _wait() -> Bool { !isSet || !continuations.isEmpty }
58+
5059
/// Resume provided continuation with additional changes based on the associated flags.
5160
///
5261
/// - Parameter continuation: The queued continuation to resume.
@@ -66,10 +75,8 @@ public actor AsyncCountdownEvent: AsyncObject {
6675
_ continuation: Continuation,
6776
withKey key: UUID
6877
) {
69-
guard !isSet, continuations.isEmpty else {
70-
_resumeContinuation(continuation)
71-
return
72-
}
78+
guard !continuation.resumed else { return }
79+
guard _wait() else { _resumeContinuation(continuation); return }
7380
continuations[key] = continuation
7481
}
7582

@@ -79,8 +86,7 @@ public actor AsyncCountdownEvent: AsyncObject {
7986
/// - Parameter key: The key in the map.
8087
@inlinable
8188
func _removeContinuation(withKey key: UUID) {
82-
let continuation = continuations.removeValue(forKey: key)
83-
continuation?.cancel()
89+
continuations.removeValue(forKey: key)
8490
}
8591

8692
/// Decrements countdown count by the provided number.
@@ -113,15 +119,13 @@ public actor AsyncCountdownEvent: AsyncObject {
113119
@inlinable
114120
nonisolated func _withPromisedContinuation() async throws {
115121
let key = UUID()
116-
try await withTaskCancellationHandler { [weak self] in
122+
try await Continuation.withCancellation(synchronizedWith: locker) {
117123
Task { [weak self] in
118124
await self?._removeContinuation(withKey: key)
119125
}
120-
} operation: { () -> Continuation.Success in
121-
try await Continuation.with { continuation in
122-
Task { [weak self] in
123-
await self?._addContinuation(continuation, withKey: key)
124-
}
126+
} operation: { continuation in
127+
Task { [weak self] in
128+
await self?._addContinuation(continuation, withKey: key)
125129
}
126130
}
127131
}
@@ -205,7 +209,7 @@ public actor AsyncCountdownEvent: AsyncObject {
205209
/// Use this to wait for high priority tasks completion to start low priority ones.
206210
@Sendable
207211
public func wait() async {
208-
guard !isSet else { currentCount += 1; return }
212+
guard _wait() else { currentCount += 1; return }
209213
try? await _withPromisedContinuation()
210214
}
211215
}

Sources/AsyncObjects/AsyncEvent.swift

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ import Foundation
1313
public actor AsyncEvent: AsyncObject {
1414
/// The suspended tasks continuation type.
1515
@usableFromInline
16-
typealias Continuation = GlobalContinuation<Void, Error>
16+
typealias Continuation = SafeContinuation<GlobalContinuation<Void, Error>>
17+
/// The platform dependent lock used to synchronize continuations tracking.
18+
@usableFromInline
19+
let locker: Locker = .init()
1720
/// The continuations stored with an associated key for all the suspended task that are waiting for event signal.
1821
@usableFromInline
1922
private(set) var continuations: [UUID: Continuation] = [:]
@@ -33,6 +36,7 @@ public actor AsyncEvent: AsyncObject {
3336
_ continuation: Continuation,
3437
withKey key: UUID
3538
) {
39+
guard !continuation.resumed else { return }
3640
guard !signalled else { continuation.resume(); return }
3741
continuations[key] = continuation
3842
}
@@ -43,8 +47,7 @@ public actor AsyncEvent: AsyncObject {
4347
/// - Parameter key: The key in the map.
4448
@inlinable
4549
func _removeContinuation(withKey key: UUID) {
46-
let continuation = continuations.removeValue(forKey: key)
47-
continuation?.cancel()
50+
continuations.removeValue(forKey: key)
4851
}
4952

5053
/// Suspends the current task, then calls the given closure with a throwing continuation for the current task.
@@ -58,15 +61,13 @@ public actor AsyncEvent: AsyncObject {
5861
@inlinable
5962
nonisolated func _withPromisedContinuation() async throws {
6063
let key = UUID()
61-
try await withTaskCancellationHandler { [weak self] in
64+
try await Continuation.withCancellation(synchronizedWith: locker) {
6265
Task { [weak self] in
6366
await self?._removeContinuation(withKey: key)
6467
}
65-
} operation: { () -> Continuation.Success in
66-
try await Continuation.with { continuation in
67-
Task { [weak self] in
68-
await self?._addContinuation(continuation, withKey: key)
69-
}
68+
} operation: { continuation in
69+
Task { [weak self] in
70+
await self?._addContinuation(continuation, withKey: key)
7071
}
7172
}
7273
}

Sources/AsyncObjects/AsyncSemaphore.swift

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@ import OrderedCollections
1616
public actor AsyncSemaphore: AsyncObject {
1717
/// The suspended tasks continuation type.
1818
@usableFromInline
19-
typealias Continuation = GlobalContinuation<Void, Error>
19+
typealias Continuation = SafeContinuation<GlobalContinuation<Void, Error>>
20+
/// The platform dependent lock used to synchronize continuations tracking.
21+
@usableFromInline
22+
let locker: Locker = .init()
2023
/// The continuations stored with an associated key for all the suspended task that are waiting for access to resource.
2124
@usableFromInline
2225
private(set) var continuations: OrderedDictionary<UUID, Continuation> = [:]
2326
/// Pool size for concurrent resource access.
2427
/// Has value provided during initialization incremented by one.
2528
@usableFromInline
26-
private(set) var limit: UInt
29+
let limit: UInt
2730
/// Current count of semaphore.
2831
/// Can have maximum value up to `limit`.
2932
@usableFromInline
@@ -41,6 +44,8 @@ public actor AsyncSemaphore: AsyncObject {
4144
_ continuation: Continuation,
4245
withKey key: UUID
4346
) {
47+
count -= 1
48+
guard !continuation.resumed else { return }
4449
guard count <= 0 else { continuation.resume(); return }
4550
continuations[key] = continuation
4651
}
@@ -51,8 +56,7 @@ public actor AsyncSemaphore: AsyncObject {
5156
/// - Parameter key: The key in the map.
5257
@inlinable
5358
func _removeContinuation(withKey key: UUID) {
54-
let continuation = continuations.removeValue(forKey: key)
55-
continuation?.cancel()
59+
continuations.removeValue(forKey: key)
5660
_incrementCount()
5761
}
5862

@@ -74,15 +78,13 @@ public actor AsyncSemaphore: AsyncObject {
7478
@inlinable
7579
nonisolated func _withPromisedContinuation() async throws {
7680
let key = UUID()
77-
try await withTaskCancellationHandler { [weak self] in
81+
try await Continuation.withCancellation(synchronizedWith: locker) {
7882
Task { [weak self] in
7983
await self?._removeContinuation(withKey: key)
8084
}
81-
} operation: { () -> Continuation.Success in
82-
try await Continuation.with { continuation in
83-
Task { [weak self] in
84-
await self?._addContinuation(continuation, withKey: key)
85-
}
85+
} operation: { continuation in
86+
Task { [weak self] in
87+
await self?._addContinuation(continuation, withKey: key)
8688
}
8789
}
8890
}
@@ -123,8 +125,7 @@ public actor AsyncSemaphore: AsyncObject {
123125
/// current task is suspended until a signal occurs.
124126
@Sendable
125127
public func wait() async {
126-
count -= 1
127-
guard count <= 0 else { return }
128+
guard count <= 1 else { count -= 1; return }
128129
try? await _withPromisedContinuation()
129130
}
130131
}

0 commit comments

Comments
 (0)