diff --git a/Sources/TelemetryDeck/Signals/SignalCache.swift b/Sources/TelemetryDeck/Signals/SignalCache.swift index cc46d4c..fce4f4b 100644 --- a/Sources/TelemetryDeck/Signals/SignalCache.swift +++ b/Sources/TelemetryDeck/Signals/SignalCache.swift @@ -17,7 +17,7 @@ internal class SignalCache: @unchecked Sendable where T: Codable { /// How many Signals are cached func count() -> Int { - queue.sync(flags: .barrier) { + queue.sync { self.cachedSignals.count } } @@ -41,15 +41,12 @@ internal class SignalCache: @unchecked Sendable where T: Codable { /// You should hold on to the signals returned by this function. If the action you are trying to do with them fails /// (e.g. sending them to a server) you should reinsert them into the cache with the `push` function. func pop() -> [T] { - var poppedSignals: [T]! - - queue.sync { + queue.sync(flags: .barrier) { let sliceSize = min(maximumNumberOfSignalsToPopAtOnce, cachedSignals.count) - poppedSignals = Array(cachedSignals[.. URL { diff --git a/Sources/TelemetryDeck/Signals/SignalManager.swift b/Sources/TelemetryDeck/Signals/SignalManager.swift index 4cefd7d..5736212 100644 --- a/Sources/TelemetryDeck/Signals/SignalManager.swift +++ b/Sources/TelemetryDeck/Signals/SignalManager.swift @@ -29,7 +29,8 @@ final class SignalManager: SignalManageable, @unchecked Sendable { private var signalCache: SignalCache let configuration: TelemetryManagerConfiguration - private var sendTimer: Timer? + private var sendTimerSource: DispatchSourceTimer? + private let timerQueue = DispatchQueue(label: "com.telemetrydeck.SignalTimer", qos: .utility) init(configuration: TelemetryManagerConfiguration) { self.configuration = configuration @@ -96,14 +97,17 @@ final class SignalManager: SignalManageable, @unchecked Sendable { private func sendCachedSignalsRepeatedly() { attemptToSendNextBatchOfCachedSignals() - sendTimer?.invalidate() - sendTimer = Timer.scheduledTimer( - timeInterval: Self.minimumSecondsToPassBetweenRequests, - target: self, - selector: #selector(attemptToSendNextBatchOfCachedSignals), - userInfo: nil, - repeats: true + sendTimerSource?.cancel() + let source = DispatchSource.makeTimerSource(queue: timerQueue) + source.schedule( + deadline: .now() + Self.minimumSecondsToPassBetweenRequests, + repeating: Self.minimumSecondsToPassBetweenRequests ) + source.setEventHandler { [weak self] in + self?.attemptToSendNextBatchOfCachedSignals() + } + source.resume() + sendTimerSource = source } /// Adds a signal to the process queue @@ -149,7 +153,6 @@ final class SignalManager: SignalManageable, @unchecked Sendable { /// Sends one batch of signals from the cache if not empty. /// If signals fail to send, we put them back into the cache to try again later. - @objc @Sendable func attemptToSendNextBatchOfCachedSignals() { configuration.logHandler?.log(.debug, message: "Current signal cache count: \(signalCache.count())") @@ -232,8 +235,8 @@ extension SignalManager { @objc func didEnterBackground() { configuration.logHandler?.log(.debug, message: #function) - sendTimer?.invalidate() - sendTimer = nil + sendTimerSource?.cancel() + sendTimerSource = nil #if os(watchOS) || os(macOS) self.signalCache.backupCache() diff --git a/Tests/TelemetryDeckTests/SignalCacheConcurrencyTests.swift b/Tests/TelemetryDeckTests/SignalCacheConcurrencyTests.swift new file mode 100644 index 0000000..4b09e0e --- /dev/null +++ b/Tests/TelemetryDeckTests/SignalCacheConcurrencyTests.swift @@ -0,0 +1,146 @@ +import Foundation +import Testing + +@testable import TelemetryDeck + +struct SignalCacheConcurrencyTests { + + /// Repro for https://github.com/TelemetryDeck/SwiftSDK/issues/265: + /// + /// count() with barrier blocks because it waits for ALL pending GCD operations. + /// + /// The bug: When count() uses `.barrier`, it must wait for all prior async blocks + /// to complete before executing. If those blocks do work before calling push(), + /// count() is blocked for their entire duration. + /// + /// This test queues async blocks with artificial delays to create pending work, + /// then immediately calls count() to measure blocking. + @Test + func count_barrierCausesMainThreadBlock() { + if #available(iOS 16, macOS 13, tvOS 16, visionOS 1, watchOS 9, *) { + let cache = SignalCache(logHandler: nil) + let stressQueue = DispatchQueue(label: "com.telemetrydeck.stressdaqueue", attributes: .concurrent) + + // Queue 50 operations that each take 2ms BEFORE reaching push() + // With barrier bug: count() waits for ALL of these (~100ms total) + // With fix: count() returns immediately (~0ms) + for i in 0..<50 { + stressQueue.async { + Thread.sleep(forTimeInterval: 0.002) + cache.push(Self.makeSignal(id: "\(i)")) + } + } + + // Immediately call count() - this is what the timer callback does + let start = CFAbsoluteTimeGetCurrent() + _ = cache.count() + let elapsed = CFAbsoluteTimeGetCurrent() - start + + // With barrier bug: ~100ms (50 * 2ms serialized wait) + // With fix (no barrier): < 10ms (just reads array.count) + #expect(elapsed < 0.010, "count() blocked for \(elapsed)s - barrier flag causing contention") + } else { + print("skipping test on incompatible OS") + } + } + + /// Validates thread safety of concurrent push and pop operations. + /// After fix, pop() uses barrier flag to ensure exclusive access during mutation. + @Test + func concurrentPushAndPop_maintainsDataIntegrity() async { + if #available(iOS 16, macOS 13, tvOS 16, visionOS 1, watchOS 9, *) { + let cache = SignalCache(logHandler: nil) + let pushCount = 500 + + await withTaskGroup(of: Void.self) { group in + // Concurrent pushes + for i in 0..(logHandler: nil) + let signalCount = 200 + + for i in 0.. SignalPostBody { + SignalPostBody( + receivedAt: Date(), + appID: UUID().uuidString, + clientUser: id, + sessionID: id, + type: "test", + floatValue: nil, + payload: [:], + isTestMode: "true" + ) + } +}