Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions Sources/TelemetryDeck/Signals/SignalCache.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ internal class SignalCache<T>: @unchecked Sendable where T: Codable {

/// How many Signals are cached
func count() -> Int {
queue.sync(flags: .barrier) {
queue.sync {
self.cachedSignals.count
}
}
Expand All @@ -41,15 +41,12 @@ internal class SignalCache<T>: @unchecked Sendable where T: Codable {
/// You should hold on to the signals returned by this function. If the action you are trying to do with them fails
/// (e.g. sending them to a server) you should reinsert them into the cache with the `push` function.
func pop() -> [T] {
var poppedSignals: [T]!

queue.sync {
queue.sync(flags: .barrier) {
let sliceSize = min(maximumNumberOfSignalsToPopAtOnce, cachedSignals.count)
poppedSignals = Array(cachedSignals[..<sliceSize])
let poppedSignals = Array(cachedSignals[..<sliceSize])
cachedSignals.removeFirst(sliceSize)
return poppedSignals
}

return poppedSignals
}

private func fileURL() -> URL {
Expand Down
25 changes: 14 additions & 11 deletions Sources/TelemetryDeck/Signals/SignalManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ final class SignalManager: SignalManageable, @unchecked Sendable {
private var signalCache: SignalCache<SignalPostBody>
let configuration: TelemetryManagerConfiguration

private var sendTimer: Timer?
private var sendTimerSource: DispatchSourceTimer?
private let timerQueue = DispatchQueue(label: "com.telemetrydeck.SignalTimer", qos: .utility)

init(configuration: TelemetryManagerConfiguration) {
self.configuration = configuration
Expand Down Expand Up @@ -96,14 +97,17 @@ final class SignalManager: SignalManageable, @unchecked Sendable {
private func sendCachedSignalsRepeatedly() {
attemptToSendNextBatchOfCachedSignals()

sendTimer?.invalidate()
sendTimer = Timer.scheduledTimer(
timeInterval: Self.minimumSecondsToPassBetweenRequests,
target: self,
selector: #selector(attemptToSendNextBatchOfCachedSignals),
userInfo: nil,
repeats: true
sendTimerSource?.cancel()
let source = DispatchSource.makeTimerSource(queue: timerQueue)
source.schedule(
deadline: .now() + Self.minimumSecondsToPassBetweenRequests,
repeating: Self.minimumSecondsToPassBetweenRequests
)
source.setEventHandler { [weak self] in
self?.attemptToSendNextBatchOfCachedSignals()
}
source.resume()
sendTimerSource = source
}

/// Adds a signal to the process queue
Expand Down Expand Up @@ -149,7 +153,6 @@ final class SignalManager: SignalManageable, @unchecked Sendable {

/// Sends one batch of signals from the cache if not empty.
/// If signals fail to send, we put them back into the cache to try again later.
@objc
@Sendable
func attemptToSendNextBatchOfCachedSignals() {
configuration.logHandler?.log(.debug, message: "Current signal cache count: \(signalCache.count())")
Expand Down Expand Up @@ -232,8 +235,8 @@ extension SignalManager {
@objc func didEnterBackground() {
configuration.logHandler?.log(.debug, message: #function)

sendTimer?.invalidate()
sendTimer = nil
sendTimerSource?.cancel()
sendTimerSource = nil

#if os(watchOS) || os(macOS)
self.signalCache.backupCache()
Expand Down
146 changes: 146 additions & 0 deletions Tests/TelemetryDeckTests/SignalCacheConcurrencyTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import Foundation
import Testing

@testable import TelemetryDeck

struct SignalCacheConcurrencyTests {

/// Repro for https://github.com/TelemetryDeck/SwiftSDK/issues/265:
///
/// count() with barrier blocks because it waits for ALL pending GCD operations.
///
/// The bug: When count() uses `.barrier`, it must wait for all prior async blocks
/// to complete before executing. If those blocks do work before calling push(),
/// count() is blocked for their entire duration.
///
/// This test queues async blocks with artificial delays to create pending work,
/// then immediately calls count() to measure blocking.
@Test
func count_barrierCausesMainThreadBlock() {
if #available(iOS 16, macOS 13, tvOS 16, visionOS 1, watchOS 9, *) {
let cache = SignalCache<SignalPostBody>(logHandler: nil)
let stressQueue = DispatchQueue(label: "com.telemetrydeck.stressdaqueue", attributes: .concurrent)

// Queue 50 operations that each take 2ms BEFORE reaching push()
// With barrier bug: count() waits for ALL of these (~100ms total)
// With fix: count() returns immediately (~0ms)
for i in 0..<50 {
stressQueue.async {
Thread.sleep(forTimeInterval: 0.002)
cache.push(Self.makeSignal(id: "\(i)"))
}
}

// Immediately call count() - this is what the timer callback does
let start = CFAbsoluteTimeGetCurrent()
_ = cache.count()
let elapsed = CFAbsoluteTimeGetCurrent() - start

// With barrier bug: ~100ms (50 * 2ms serialized wait)
// With fix (no barrier): < 10ms (just reads array.count)
#expect(elapsed < 0.010, "count() blocked for \(elapsed)s - barrier flag causing contention")
} else {
print("skipping test on incompatible OS")
}
}

/// Validates thread safety of concurrent push and pop operations.
/// After fix, pop() uses barrier flag to ensure exclusive access during mutation.
@Test
func concurrentPushAndPop_maintainsDataIntegrity() async {
if #available(iOS 16, macOS 13, tvOS 16, visionOS 1, watchOS 9, *) {
let cache = SignalCache<SignalPostBody>(logHandler: nil)
let pushCount = 500

await withTaskGroup(of: Void.self) { group in
// Concurrent pushes
for i in 0..<pushCount {
group.addTask {
cache.push(Self.makeSignal(id: "\(i)"))
}
}

// Concurrent pops (some will return empty arrays, that's fine)
for _ in 0..<50 {
group.addTask {
_ = cache.pop()
}
}

await group.waitForAll()
}

// Drain remaining signals
var totalPopped = 0
var batch = cache.pop()
while !batch.isEmpty {
totalPopped += batch.count
batch = cache.pop()
}

// We should have popped some signals (exact count varies due to concurrency)
// The key assertion is that we don't crash or corrupt data
#expect(cache.count() == 0, "Cache should be empty after draining")
} else {
print("skipping test on incompatible OS")
}
}


/// Validates pop() correctly handles concurrent access without data races.
/// Without barrier on pop(), concurrent calls can corrupt the array.
/// Run multiple iterations to increase probability of catching race condition.
@Test
func pop_isThreadSafe() async {
if #available(iOS 16, macOS 13, tvOS 16, visionOS 1, watchOS 9, *) {
for iteration in 0..<10 {
let cache = SignalCache<SignalPostBody>(logHandler: nil)
let signalCount = 200

for i in 0..<signalCount {
cache.push(Self.makeSignal(id: "\(iteration)_\(i)"))
}

let allPopped = await withTaskGroup(of: [SignalPostBody].self, returning: [[SignalPostBody]].self) { group in
for _ in 0..<20 {
group.addTask {
cache.pop()
}
}

var collected: [[SignalPostBody]] = []
for await batch in group {
collected.append(batch)
}
return collected
}

let totalPopped = allPopped.flatMap { $0 }.count
let remaining = cache.count()

#expect(
totalPopped + remaining == signalCount,
"Iteration \(iteration): Total signals (popped + remaining) should equal original count"
)
}
} else {
print("skipping test on incompatible OS")
}

}

// MARK: - Helpers

private static func makeSignal(id: String) -> SignalPostBody {
SignalPostBody(
receivedAt: Date(),
appID: UUID().uuidString,
clientUser: id,
sessionID: id,
type: "test",
floatValue: nil,
payload: [:],
isTestMode: "true"
)
}
}
Loading