diff --git a/KMPNativeCoroutinesCombine/Publisher.swift b/KMPNativeCoroutinesCombine/Publisher.swift index 6051623a..464b3eb2 100644 --- a/KMPNativeCoroutinesCombine/Publisher.swift +++ b/KMPNativeCoroutinesCombine/Publisher.swift @@ -8,6 +8,7 @@ import Combine import Dispatch import KMPNativeCoroutinesCore +import Foundation /// Creates an `AnyPublisher` for the provided `NativeFlow`. /// - Parameter nativeFlow: The native flow to collect. @@ -43,68 +44,153 @@ internal struct NativeFlowPublisher: Publisher { } } -internal class NativeFlowSubscription: Subscription where S.Input == Output, S.Failure == Failure { - - private let semaphore = DispatchSemaphore(value: 1) - private var nativeFlow: NativeFlow? - private var nativeCancellable: NativeCancellable? = nil - private var subscriber: S? - private var demand: Subscribers.Demand = .none - private var hasDemand: Bool { demand >= 1 } - private var next: (() -> Unit)? = nil - - init(nativeFlow: @escaping NativeFlow, subscriber: S) { - self.nativeFlow = nativeFlow - self.subscriber = subscriber +internal final class NativeFlowSubscription: Subscription +where S.Input == Output, S.Failure == Failure { + + private let lock = NSLock() + private var nativeFlow: NativeFlow? + private var nativeCancellable: NativeCancellable? + private var subscriber: S? + private var demand: Subscribers.Demand = .none + private var pendingNext: (() -> Unit)? + private var finished = false + private var cancelled = false + + init(nativeFlow: @escaping NativeFlow, subscriber: S) { + self.nativeFlow = nativeFlow + self.subscriber = subscriber + } + + func request(_ newDemand: Subscribers.Demand) { + var toStartFlow: NativeFlow? + var toResumeNext: (() -> Unit)? + + lock.lock() + guard !cancelled, !finished else { + lock.unlock() + return } - - func request(_ demand: Subscribers.Demand) { - semaphore.wait() - self.demand += demand - guard hasDemand else { - semaphore.signal() - return + + demand += newDemand + + if demand > .none, let nxt = pendingNext { + toResumeNext = nxt + pendingNext = nil + } + + if let nf = nativeFlow { + toStartFlow = nf + nativeFlow = nil + } + lock.unlock() + + // Execute callbacks outside the lock to prevent deadlocks + // when callbacks synchronously invoke completion/error handlers + if let resume = toResumeNext { + _ = resume() + } + + if let flow = toStartFlow { + start(flow: flow) + } + } + + private func start(flow: @escaping NativeFlow) { + nativeCancellable = flow( + { [weak self] item, next, unit in + guard let self else { return unit } + + var deliverTo: S? + var callNext: (() -> Unit)? + + self.lock.lock() + if self.cancelled || self.finished || self.subscriber == nil { + self.lock.unlock() + return unit } - guard let nativeFlow = nativeFlow else { - if let next = self.next { - _ = next() - self.next = nil - } - semaphore.signal() - return + + if self.demand == .none { + self.pendingNext = next + self.lock.unlock() + return unit } - semaphore.signal() - self.nativeFlow = nil - nativeCancellable = nativeFlow({ item, next, unit in - guard let subscriber = self.subscriber else { return unit } - let demand = subscriber.receive(item) - self.semaphore.wait() - defer { self.semaphore.signal() } - self.demand -= 1 - self.demand += demand - if (self.hasDemand) { - return next() - } else { - self.next = next - return unit - } - }, { error, unit in - if let error = error { - self.subscriber?.receive(completion: .failure(error)) - } else { - self.subscriber?.receive(completion: .finished) - } - return unit - }, { cancellationError, unit in - self.subscriber?.receive(completion: .failure(cancellationError)) - return unit - }) - } - - func cancel() { - subscriber = nil - nativeFlow = nil - _ = nativeCancellable?() - nativeCancellable = nil + + self.demand -= 1 + deliverTo = self.subscriber + self.lock.unlock() + + let more = deliverTo?.receive(item) ?? .none + + self.lock.lock() + self.demand += more + if self.demand > .none { + callNext = next + } else { + self.pendingNext = next + } + self.lock.unlock() + + if let callNext = callNext { + return callNext() + } else { + return unit + } + }, { [weak self] maybeError, unit in + guard let self else { return unit } + + var finishWith: Subscribers.Completion? + var toNotify: S? + + self.lock.lock() + if !self.cancelled && !self.finished { + toNotify = self.subscriber + self.subscriber = nil + self.finished = true + finishWith = (maybeError != nil) ? .failure(maybeError!) : .finished + } + self.lock.unlock() + + if let sub = toNotify, let completion = finishWith { + sub.receive(completion: completion) + } + return unit + }, { [weak self] cancellationError, unit in + guard let self else { return unit } + + var toNotify: S? + + self.lock.lock() + if !self.cancelled && !self.finished { + toNotify = self.subscriber + self.subscriber = nil + self.finished = true + } + self.lock.unlock() + + if let sub = toNotify { + sub.receive(completion: .failure(cancellationError)) + } + return unit + } + ) + } + + func cancel() { + var toCancel: NativeCancellable? + + lock.lock() + if cancelled { + lock.unlock() + return } + cancelled = true + toCancel = nativeCancellable + nativeCancellable = nil + subscriber = nil + pendingNext = nil + nativeFlow = nil + lock.unlock() + + _ = toCancel?() + } }