From a518df743b76ca99c6fd5650c84c217d561f059b Mon Sep 17 00:00:00 2001 From: Konstantin Kostov Date: Fri, 12 Dec 2025 10:59:54 +0100 Subject: [PATCH 1/3] feat: improve signal cache concurrency --- .../TelemetryDeck/Signals/SignalCache.swift | 11 +- .../TelemetryDeck/Signals/SignalManager.swift | 25 +-- .../SignalCacheConcurrencyTests.swift | 179 ++++++++++++++++++ 3 files changed, 197 insertions(+), 18 deletions(-) create mode 100644 Tests/TelemetryDeckTests/SignalCacheConcurrencyTests.swift diff --git a/Sources/TelemetryDeck/Signals/SignalCache.swift b/Sources/TelemetryDeck/Signals/SignalCache.swift index cc46d4c..fce4f4b 100644 --- a/Sources/TelemetryDeck/Signals/SignalCache.swift +++ b/Sources/TelemetryDeck/Signals/SignalCache.swift @@ -17,7 +17,7 @@ internal class SignalCache: @unchecked Sendable where T: Codable { /// How many Signals are cached func count() -> Int { - queue.sync(flags: .barrier) { + queue.sync { self.cachedSignals.count } } @@ -41,15 +41,12 @@ internal class SignalCache: @unchecked Sendable where T: Codable { /// You should hold on to the signals returned by this function. If the action you are trying to do with them fails /// (e.g. sending them to a server) you should reinsert them into the cache with the `push` function. func pop() -> [T] { - var poppedSignals: [T]! - - queue.sync { + queue.sync(flags: .barrier) { let sliceSize = min(maximumNumberOfSignalsToPopAtOnce, cachedSignals.count) - poppedSignals = Array(cachedSignals[.. URL { diff --git a/Sources/TelemetryDeck/Signals/SignalManager.swift b/Sources/TelemetryDeck/Signals/SignalManager.swift index 4cefd7d..5736212 100644 --- a/Sources/TelemetryDeck/Signals/SignalManager.swift +++ b/Sources/TelemetryDeck/Signals/SignalManager.swift @@ -29,7 +29,8 @@ final class SignalManager: SignalManageable, @unchecked Sendable { private var signalCache: SignalCache let configuration: TelemetryManagerConfiguration - private var sendTimer: Timer? + private var sendTimerSource: DispatchSourceTimer? + private let timerQueue = DispatchQueue(label: "com.telemetrydeck.SignalTimer", qos: .utility) init(configuration: TelemetryManagerConfiguration) { self.configuration = configuration @@ -96,14 +97,17 @@ final class SignalManager: SignalManageable, @unchecked Sendable { private func sendCachedSignalsRepeatedly() { attemptToSendNextBatchOfCachedSignals() - sendTimer?.invalidate() - sendTimer = Timer.scheduledTimer( - timeInterval: Self.minimumSecondsToPassBetweenRequests, - target: self, - selector: #selector(attemptToSendNextBatchOfCachedSignals), - userInfo: nil, - repeats: true + sendTimerSource?.cancel() + let source = DispatchSource.makeTimerSource(queue: timerQueue) + source.schedule( + deadline: .now() + Self.minimumSecondsToPassBetweenRequests, + repeating: Self.minimumSecondsToPassBetweenRequests ) + source.setEventHandler { [weak self] in + self?.attemptToSendNextBatchOfCachedSignals() + } + source.resume() + sendTimerSource = source } /// Adds a signal to the process queue @@ -149,7 +153,6 @@ final class SignalManager: SignalManageable, @unchecked Sendable { /// Sends one batch of signals from the cache if not empty. /// If signals fail to send, we put them back into the cache to try again later. - @objc @Sendable func attemptToSendNextBatchOfCachedSignals() { configuration.logHandler?.log(.debug, message: "Current signal cache count: \(signalCache.count())") @@ -232,8 +235,8 @@ extension SignalManager { @objc func didEnterBackground() { configuration.logHandler?.log(.debug, message: #function) - sendTimer?.invalidate() - sendTimer = nil + sendTimerSource?.cancel() + sendTimerSource = nil #if os(watchOS) || os(macOS) self.signalCache.backupCache() diff --git a/Tests/TelemetryDeckTests/SignalCacheConcurrencyTests.swift b/Tests/TelemetryDeckTests/SignalCacheConcurrencyTests.swift new file mode 100644 index 0000000..7f94923 --- /dev/null +++ b/Tests/TelemetryDeckTests/SignalCacheConcurrencyTests.swift @@ -0,0 +1,179 @@ +import Foundation +import Testing + +@testable import TelemetryDeck + +struct SignalCacheConcurrencyTests { + + /// Repro for https://github.com/TelemetryDeck/SwiftSDK/issues/265: + /// + /// count() with barrier blocks because it waits for ALL pending GCD operations. + /// + /// The bug: When count() uses `.barrier`, it must wait for all prior async blocks + /// to complete before executing. If those blocks do work before calling push(), + /// count() is blocked for their entire duration. + /// + /// This test queues async blocks with artificial delays to create pending work, + /// then immediately calls count() to measure blocking. + @Test + func count_barrierCausesMainThreadBlock() { + if #available(iOS 16, macOS 13, tvOS 16, visionOS 1, watchOS 9, *) { + let cache = SignalCache(logHandler: nil) + let stressQueue = DispatchQueue(label: "com.telemetrydeck.stressdaqueue", attributes: .concurrent) + + // Queue 50 operations that each take 2ms BEFORE reaching push() + // With barrier bug: count() waits for ALL of these (~100ms total) + // With fix: count() returns immediately (~0ms) + for i in 0..<50 { + stressQueue.async { + Thread.sleep(forTimeInterval: 0.002) + cache.push(Self.makeSignal(id: "\(i)")) + } + } + + // Immediately call count() - this is what the timer callback does + let start = CFAbsoluteTimeGetCurrent() + _ = cache.count() + let elapsed = CFAbsoluteTimeGetCurrent() - start + + // With barrier bug: ~100ms (50 * 2ms serialized wait) + // With fix (no barrier): < 10ms (just reads array.count) + #expect(elapsed < 0.010, "count() blocked for \(elapsed)s - barrier flag causing contention") + } else { + print("skipping test on incompatible OS") + } + } + + /// Validates thread safety of concurrent push and pop operations. + /// After fix, pop() uses barrier flag to ensure exclusive access during mutation. + @Test + func concurrentPushAndPop_maintainsDataIntegrity() async { + if #available(iOS 16, macOS 13, tvOS 16, visionOS 1, watchOS 9, *) { + let cache = SignalCache(logHandler: nil) + let pushCount = 500 + + await withTaskGroup(of: Void.self) { group in + // Concurrent pushes + for i in 0..(logHandler: nil) + + // Pre-populate cache + for i in 0..<100 { + cache.push(Self.makeSignal(id: "\(i)")) + } + + let startTime = ContinuousClock.now + + await withTaskGroup(of: Void.self) { group in + // Many concurrent count() calls - should NOT serialize + for _ in 0..<1000 { + group.addTask { + _ = cache.count() + } + } + await group.waitForAll() + } + + let elapsed = ContinuousClock.now - startTime + + // 1000 concurrent reads should complete quickly (< 1 second) + // With barrier bug, this would take much longer due to serialization + #expect(elapsed < .seconds(1), "Concurrent count() calls should complete quickly") + } else { + print("skipping test on incompatible OS") + } + } + + /// Validates pop() correctly handles concurrent access without data races. + /// Without barrier on pop(), concurrent calls can corrupt the array. + /// Run multiple iterations to increase probability of catching race condition. + @Test + func pop_isThreadSafe() async { + if #available(iOS 16, macOS 13, tvOS 16, visionOS 1, watchOS 9, *) { + for iteration in 0..<10 { + let cache = SignalCache(logHandler: nil) + let signalCount = 200 + + for i in 0.. SignalPostBody { + SignalPostBody( + receivedAt: Date(), + appID: UUID().uuidString, + clientUser: id, + sessionID: id, + type: "test", + floatValue: nil, + payload: [:], + isTestMode: "true" + ) + } +} From 0ec49446e0be2758e287c432367357703e41be6a Mon Sep 17 00:00:00 2001 From: Konstantin Kostov Date: Fri, 12 Dec 2025 12:50:16 +0100 Subject: [PATCH 2/3] fix: give the flacku tests a bit more time --- Tests/TelemetryDeckTests/SignalCacheConcurrencyTests.swift | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Tests/TelemetryDeckTests/SignalCacheConcurrencyTests.swift b/Tests/TelemetryDeckTests/SignalCacheConcurrencyTests.swift index 7f94923..12804e8 100644 --- a/Tests/TelemetryDeckTests/SignalCacheConcurrencyTests.swift +++ b/Tests/TelemetryDeckTests/SignalCacheConcurrencyTests.swift @@ -88,6 +88,7 @@ struct SignalCacheConcurrencyTests { /// Validates that high contention on count() completes in reasonable time. /// Pre-fix: barrier on count() causes blocking. Post-fix: reads are concurrent. + /// This is probably a "flaky" test since we rely on timing @Test func count_performsUnderHighContention() async { if #available(iOS 16, macOS 13, tvOS 16, visionOS 1, watchOS 9, *) { @@ -114,7 +115,7 @@ struct SignalCacheConcurrencyTests { // 1000 concurrent reads should complete quickly (< 1 second) // With barrier bug, this would take much longer due to serialization - #expect(elapsed < .seconds(1), "Concurrent count() calls should complete quickly") + #expect(elapsed < .seconds(5), "Concurrent count() calls should complete quickly") } else { print("skipping test on incompatible OS") } From a12f4b9a3d18677817653284122050d5832550de Mon Sep 17 00:00:00 2001 From: Konstantin Kostov Date: Fri, 12 Dec 2025 13:44:02 +0100 Subject: [PATCH 3/3] fix: remove flaky test --- .../SignalCacheConcurrencyTests.swift | 34 ------------------- 1 file changed, 34 deletions(-) diff --git a/Tests/TelemetryDeckTests/SignalCacheConcurrencyTests.swift b/Tests/TelemetryDeckTests/SignalCacheConcurrencyTests.swift index 12804e8..4b09e0e 100644 --- a/Tests/TelemetryDeckTests/SignalCacheConcurrencyTests.swift +++ b/Tests/TelemetryDeckTests/SignalCacheConcurrencyTests.swift @@ -86,40 +86,6 @@ struct SignalCacheConcurrencyTests { } } - /// Validates that high contention on count() completes in reasonable time. - /// Pre-fix: barrier on count() causes blocking. Post-fix: reads are concurrent. - /// This is probably a "flaky" test since we rely on timing - @Test - func count_performsUnderHighContention() async { - if #available(iOS 16, macOS 13, tvOS 16, visionOS 1, watchOS 9, *) { - let cache = SignalCache(logHandler: nil) - - // Pre-populate cache - for i in 0..<100 { - cache.push(Self.makeSignal(id: "\(i)")) - } - - let startTime = ContinuousClock.now - - await withTaskGroup(of: Void.self) { group in - // Many concurrent count() calls - should NOT serialize - for _ in 0..<1000 { - group.addTask { - _ = cache.count() - } - } - await group.waitForAll() - } - - let elapsed = ContinuousClock.now - startTime - - // 1000 concurrent reads should complete quickly (< 1 second) - // With barrier bug, this would take much longer due to serialization - #expect(elapsed < .seconds(5), "Concurrent count() calls should complete quickly") - } else { - print("skipping test on incompatible OS") - } - } /// Validates pop() correctly handles concurrent access without data races. /// Without barrier on pop(), concurrent calls can corrupt the array.