Skip to content

Commit ff8d2fa

Browse files
committed
[Concurrency] Support continuous waits in CooperativeExecutor.
This fixes some WASM tests.
1 parent 53ceb0d commit ff8d2fa

File tree

1 file changed

+94
-37
lines changed

1 file changed

+94
-37
lines changed

stdlib/public/Concurrency/CooperativeExecutor.swift

Lines changed: 94 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -97,18 +97,77 @@ extension ExecutorJob {
9797
}
9898
}
9999

100+
#if !$Embedded && !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
101+
/// A wait queue is a specialised priority queue used to run a timer.
102+
@available(StdlibDeploymentTarget 6.2, *)
103+
struct WaitQueue {
104+
var queue: PriorityQueue<UnownedJob>
105+
var clock: _ClockID
106+
107+
init(clock: _ClockID) {
108+
queue = PriorityQueue(compare: {
109+
ExecutorJob($0).cooperativeExecutorTimestamp
110+
< ExecutorJob($1).cooperativeExecutorTimestamp
111+
})
112+
self.clock = clock
113+
}
114+
115+
var currentTime: CooperativeExecutor.Timestamp {
116+
var now: CooperativeExecutor.Timestamp = .zero
117+
unsafe _getTime(seconds: &now.seconds,
118+
nanoseconds: &now.nanoseconds,
119+
clock: clock.rawValue)
120+
return now
121+
}
122+
123+
mutating func enqueue(_ job: consuming ExecutorJob,
124+
after delay: CooperativeExecutor.Duration) {
125+
let deadline = currentTime + delay
126+
job.setupCooperativeExecutorTimestamp()
127+
job.cooperativeExecutorTimestamp = deadline
128+
queue.push(UnownedJob(job))
129+
}
130+
131+
mutating func forEachReadyJob(body: (consuming ExecutorJob) -> ()) {
132+
let now = currentTime
133+
while let job = queue.pop(
134+
when: {
135+
ExecutorJob($0).cooperativeExecutorTimestamp < now
136+
}) {
137+
var theJob = ExecutorJob(job)
138+
theJob.clearCooperativeExecutorTimestamp()
139+
body(theJob)
140+
}
141+
}
142+
143+
var timeToNextJob: CooperativeExecutor.Duration? {
144+
if let job = queue.top {
145+
let deadline = ExecutorJob(job).cooperativeExecutorTimestamp
146+
let now = currentTime
147+
if deadline > now {
148+
return deadline - now
149+
} else {
150+
return CooperativeExecutor.Duration(seconds: 0, nanoseconds: 0)
151+
}
152+
}
153+
return nil
154+
}
155+
}
156+
#endif
157+
100158
/// A co-operative executor that can be used as the main executor or as a
101159
/// task executor.
102160
@available(StdlibDeploymentTarget 6.2, *)
103161
final class CooperativeExecutor: Executor, @unchecked Sendable {
104162
var runQueue: PriorityQueue<UnownedJob>
105163
#if !$Embedded && !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
106-
var waitQueue: PriorityQueue<UnownedJob>
164+
var suspendingWaitQueue = WaitQueue(clock: .suspending)
165+
var continuousWaitQueue = WaitQueue(clock: .continuous)
107166
#endif
108167
var shouldStop: Bool = false
109168

110169
/// Internal representation of a duration for CooperativeExecutor
111-
struct Duration {
170+
struct Duration: Comparable {
112171
var seconds: Int64
113172
var nanoseconds: Int64
114173

@@ -122,6 +181,16 @@ final class CooperativeExecutor: Executor, @unchecked Sendable {
122181
self.seconds = seconds
123182
self.nanoseconds = attoseconds / 1_000_000_000
124183
}
184+
185+
static func == (lhs: Duration, rhs: Duration) -> Bool {
186+
return lhs.seconds == rhs.seconds && lhs.nanoseconds == rhs.nanoseconds
187+
}
188+
static func < (lhs: Duration, rhs: Duration) -> Bool {
189+
return lhs.seconds < rhs.seconds || (
190+
lhs.seconds == rhs.seconds
191+
&& lhs.nanoseconds < rhs.nanoseconds
192+
)
193+
}
125194
}
126195

127196
/// Internal representation of a timestamp for CooperativeExecutor
@@ -165,14 +234,6 @@ final class CooperativeExecutor: Executor, @unchecked Sendable {
165234

166235
public init() {
167236
runQueue = PriorityQueue(compare: { $0.priority > $1.priority })
168-
169-
#if !$Embedded && !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
170-
waitQueue =
171-
PriorityQueue(compare: {
172-
ExecutorJob($0).cooperativeExecutorTimestamp
173-
< ExecutorJob($1).cooperativeExecutorTimestamp
174-
})
175-
#endif
176237
}
177238

178239
public func enqueue(_ job: consuming ExecutorJob) {
@@ -190,11 +251,11 @@ extension CooperativeExecutor: SchedulingExecutor {
190251
return self
191252
}
192253

193-
var currentTime: Timestamp {
254+
func currentTime(clock: _ClockID) -> Timestamp {
194255
var now: Timestamp = .zero
195256
unsafe _getTime(seconds: &now.seconds,
196257
nanoseconds: &now.nanoseconds,
197-
clock: _ClockID.suspending.rawValue)
258+
clock: clock.rawValue)
198259
return now
199260
}
200261

@@ -203,24 +264,19 @@ extension CooperativeExecutor: SchedulingExecutor {
203264
tolerance: C.Duration? = nil,
204265
clock: C) {
205266
// If it's a clock we know, get the duration to wait
206-
let duration: Duration
207267
if let _ = clock as? ContinuousClock {
208-
// We would need to add a second wait queue to support this
209-
fatalError("CooperativeExecutor currently only supports suspending waits")
268+
let continuousDuration = delay as! ContinuousClock.Duration
269+
let duration = Duration(from: continuousDuration)
270+
continuousWaitQueue.enqueue(job, after: duration)
210271
} else if let _ = clock as? SuspendingClock {
211272
let suspendingDuration = delay as! SuspendingClock.Duration
212-
duration = Duration(from: suspendingDuration)
273+
let duration = Duration(from: suspendingDuration)
274+
suspendingWaitQueue.enqueue(job, after: duration)
213275
} else {
214276
clock.enqueue(job, on: self, at: clock.now.advanced(by: delay),
215277
tolerance: tolerance)
216278
return
217279
}
218-
219-
let deadline = self.currentTime + duration
220-
221-
job.setupCooperativeExecutorTimestamp()
222-
job.cooperativeExecutorTimestamp = deadline
223-
waitQueue.push(UnownedJob(job))
224280
}
225281

226282
}
@@ -236,14 +292,12 @@ extension CooperativeExecutor: RunLoopExecutor {
236292
shouldStop = false
237293
while !shouldStop && !condition() {
238294
#if !$Embedded && !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
239-
// Process the timer queue
240-
let now = currentTime
241-
while let job = waitQueue.pop(when: {
242-
ExecutorJob($0).cooperativeExecutorTimestamp <= now
243-
}) {
244-
var theJob = ExecutorJob(job)
245-
theJob.clearCooperativeExecutorTimestamp()
246-
runQueue.push(job)
295+
// Process the timer queues
296+
suspendingWaitQueue.forEachReadyJob {
297+
runQueue.push(UnownedJob($0))
298+
}
299+
continuousWaitQueue.forEachReadyJob {
300+
runQueue.push(UnownedJob($0))
247301
}
248302
#endif
249303

@@ -257,14 +311,17 @@ extension CooperativeExecutor: RunLoopExecutor {
257311

258312
#if !$Embedded && !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY
259313
// Finally, wait until the next deadline
260-
if let job = waitQueue.top {
261-
let deadline = ExecutorJob(job).cooperativeExecutorTimestamp
262-
let now = self.currentTime
263-
if deadline > now {
264-
let toWait = deadline - now
265-
_sleep(seconds: toWait.seconds,
266-
nanoseconds: toWait.nanoseconds)
314+
var toWait: Duration? = suspendingWaitQueue.timeToNextJob
315+
316+
if let continuousToWait = continuousWaitQueue.timeToNextJob {
317+
if toWait == nil || continuousToWait < toWait! {
318+
toWait = continuousToWait
267319
}
320+
}
321+
322+
if let toWait {
323+
_sleep(seconds: toWait.seconds,
324+
nanoseconds: toWait.nanoseconds)
268325
} else if runQueue.isEmpty {
269326
// Stop if no more jobs are available
270327
break

0 commit comments

Comments
 (0)