Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 146 additions & 60 deletions KMPNativeCoroutinesCombine/Publisher.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import Combine
import Dispatch
import KMPNativeCoroutinesCore
import Foundation

/// Creates an `AnyPublisher` for the provided `NativeFlow`.
/// - Parameter nativeFlow: The native flow to collect.
Expand Down Expand Up @@ -43,68 +44,153 @@ internal struct NativeFlowPublisher<Output, Failure: Error, Unit>: Publisher {
}
}

internal class NativeFlowSubscription<Output, Failure, Unit, S: Subscriber>: Subscription where S.Input == Output, S.Failure == Failure {

private let semaphore = DispatchSemaphore(value: 1)
private var nativeFlow: NativeFlow<Output, Failure, Unit>?
private var nativeCancellable: NativeCancellable<Unit>? = nil
private var subscriber: S?
private var demand: Subscribers.Demand = .none
private var hasDemand: Bool { demand >= 1 }
private var next: (() -> Unit)? = nil

init(nativeFlow: @escaping NativeFlow<Output, Failure, Unit>, subscriber: S) {
self.nativeFlow = nativeFlow
self.subscriber = subscriber
internal final class NativeFlowSubscription<Output, Failure, Unit, S: Subscriber>: Subscription
where S.Input == Output, S.Failure == Failure {

private let lock = NSLock()
private var nativeFlow: NativeFlow<Output, Failure, Unit>?
private var nativeCancellable: NativeCancellable<Unit>?
private var subscriber: S?
private var demand: Subscribers.Demand = .none
private var pendingNext: (() -> Unit)?
private var finished = false
private var cancelled = false

init(nativeFlow: @escaping NativeFlow<Output, Failure, Unit>, subscriber: S) {
self.nativeFlow = nativeFlow
self.subscriber = subscriber
}

func request(_ newDemand: Subscribers.Demand) {
var toStartFlow: NativeFlow<Output, Failure, Unit>?
var toResumeNext: (() -> Unit)?

lock.lock()
guard !cancelled, !finished else {
lock.unlock()
return
}

func request(_ demand: Subscribers.Demand) {
semaphore.wait()
self.demand += demand
guard hasDemand else {
semaphore.signal()
return

demand += newDemand

if demand > .none, let nxt = pendingNext {
toResumeNext = nxt
pendingNext = nil
}

if let nf = nativeFlow {
toStartFlow = nf
nativeFlow = nil
}
lock.unlock()

// Execute callbacks outside the lock to prevent deadlocks
// when callbacks synchronously invoke completion/error handlers
if let resume = toResumeNext {
_ = resume()
}

if let flow = toStartFlow {
start(flow: flow)
}
}

private func start(flow: @escaping NativeFlow<Output, Failure, Unit>) {
nativeCancellable = flow(
{ [weak self] item, next, unit in
guard let self else { return unit }

var deliverTo: S?
var callNext: (() -> Unit)?

self.lock.lock()
if self.cancelled || self.finished || self.subscriber == nil {
self.lock.unlock()
return unit
}
guard let nativeFlow = nativeFlow else {
if let next = self.next {
_ = next()
self.next = nil
}
semaphore.signal()
return

if self.demand == .none {
self.pendingNext = next
self.lock.unlock()
return unit
}
semaphore.signal()
self.nativeFlow = nil
nativeCancellable = nativeFlow({ item, next, unit in
guard let subscriber = self.subscriber else { return unit }
let demand = subscriber.receive(item)
self.semaphore.wait()
defer { self.semaphore.signal() }
self.demand -= 1
self.demand += demand
if (self.hasDemand) {
return next()
} else {
self.next = next
return unit
}
}, { error, unit in
if let error = error {
self.subscriber?.receive(completion: .failure(error))
} else {
self.subscriber?.receive(completion: .finished)
}
return unit
}, { cancellationError, unit in
self.subscriber?.receive(completion: .failure(cancellationError))
return unit
})
}

func cancel() {
subscriber = nil
nativeFlow = nil
_ = nativeCancellable?()
nativeCancellable = nil

self.demand -= 1
deliverTo = self.subscriber
self.lock.unlock()

let more = deliverTo?.receive(item) ?? .none

self.lock.lock()
self.demand += more
if self.demand > .none {
callNext = next
} else {
self.pendingNext = next
}
self.lock.unlock()

if let callNext = callNext {
return callNext()
} else {
return unit
}
}, { [weak self] maybeError, unit in
guard let self else { return unit }

var finishWith: Subscribers.Completion<Failure>?
var toNotify: S?

self.lock.lock()
if !self.cancelled && !self.finished {
toNotify = self.subscriber
self.subscriber = nil
self.finished = true
finishWith = (maybeError != nil) ? .failure(maybeError!) : .finished
}
self.lock.unlock()

if let sub = toNotify, let completion = finishWith {
sub.receive(completion: completion)
}
return unit
}, { [weak self] cancellationError, unit in
guard let self else { return unit }

var toNotify: S?

self.lock.lock()
if !self.cancelled && !self.finished {
toNotify = self.subscriber
self.subscriber = nil
self.finished = true
}
self.lock.unlock()

if let sub = toNotify {
sub.receive(completion: .failure(cancellationError))
}
return unit
}
)
}

func cancel() {
var toCancel: NativeCancellable<Unit>?

lock.lock()
if cancelled {
lock.unlock()
return
}
cancelled = true
toCancel = nativeCancellable
nativeCancellable = nil
subscriber = nil
pendingNext = nil
nativeFlow = nil
lock.unlock()

_ = toCancel?()
}
}