Skip to content

Conversation

doozMen
Copy link

@doozMen doozMen commented Feb 27, 2025

I had a look at your super library and it solved many issues compared to Skie. I had some trouble understanding the asyncSequence as I did not understand why AsyncThrowingStream could not be used directly. So I tried and came up with this solution that at least to me is simpler. Would you consider merging it?

It removes the need for the custom type NativeFlowAsyncSequence in favor or the AsyncThrowingStream so this would be breaking. But it keeps the use in a for try await in ... loop so I did not have to alter the tests for them to work.

I added a lock although I think you can debate as the closure of AsyncThrowingStream is in Swifts structured concurrency that it is safe to remove it. It does not harm the structured concurrency so I kept it as I do not know if it is needed for the Kotlin part.

@rickclephas
Copy link
Owner

I had a look at your super library and it solved many issues compared to Skie.

Thanks! If you have some time I would love to better understand what issues you were facing and what about KMP-NativeCoroutines is solving these issues better than Skie.

I had some trouble understanding the asyncSequence as I did not understand why AsyncThrowingStream could not be used directly. So I tried and came up with this solution that at least to me is simpler. Would you consider merging it?

I am all for a simpler approach/implementation. But in this case the custom implementation does have a purpose.
Specifically the following test case (it's an integration test in the sample project):

func testValueBackPressure() async {

As described in the docs AsyncThrowingStream keeps a buffer :

An arbitrary source of elements can produce elements faster than they are consumed by a caller iterating over them. Because of this, AsyncThrowingStream defines a buffering behavior, allowing the stream to buffer a specific number of oldest or newest elements. By default, the buffer limit is Int.max, which means it’s unbounded.

This behaviour doesn't match with the expected behaviour in Kotlin.
In Kotlin if you emit a value to a Flow the function call is expected to suspend until the value is collected.

Basically the call to next() should only occur once the AsyncSequence requests another value/element.
Once you call next() the Kotlin code resumes and will produce another value/element (if there are any).

It removes the need for the custom type NativeFlowAsyncSequence in favor or the AsyncThrowingStream so this would be breaking. But it keeps the use in a for try await in ... loop so I did not have to alter the tests for them to work.

Yeah, not ideal. But not too big of a problem since NativeFlowAsyncSequence doesn't really have its own public API.

I added a lock although I think you can debate as the closure of AsyncThrowingStream is in Swifts structured concurrency that it is safe to remove it. It does not harm the structured concurrency so I kept it as I do not know if it is needed for the Kotlin part.

The locks are indeed to make sure that the callbacks from Kotlin will never run concurrently.
In the current implementation the Kotlin and Swift code can run concurrently, so in that case they are definitely needed.
But IIRC during cancellation the callbacks could be invoked concurrently as well.
So indeed best to have some kind of locking.

@doozMen
Copy link
Author

doozMen commented Feb 28, 2025

Thanks for the detailed response!

Thanks! If you have some time I would love to better understand what issues you were facing and what about KMP-NativeCoroutines is solving these issues better than Skie.

I have trouble with code that could, even if it is theoretical contains a fatal error. So when our Android developer proposed this library I could not understand why the fatal error was needed. This intrigued me and I explored if AsyncThrowingStream could not be used directly. It could and make me more confident using the code like this although that the code with the custom NativeFlowAsyncSequence worked out of the box I have bad experience with async libraries so I reviewed.

Kotlin does not have backpressure so does not hold values if they are not consumed

I added docs explaining this and added the buffer policy as a parameter so it could be chosen. If Kotlin does not have a way to continue besides throwing away results then this implementation adds that. continuation.yield(item) would tap into that system no? Then it is up to the consumer to opt into this if wanted with the added bufferPolicy parameter now.

KMP-NativeCoroutines/sample/Async/AsyncSequenceIntegrationTests.swift
I would like to run this test but it is not part of any test target so I'm not sure how to run it?

@rickclephas
Copy link
Owner

Kotlin does not have backpressure so does not hold values if they are not consumed

I added docs explaining this and added the buffer policy as a parameter so it could be chosen. If Kotlin does not have a way to continue besides throwing away results then this implementation adds that. continuation.yield(item) would tap into that system no? Then it is up to the consumer to opt into this if wanted with the added bufferPolicy parameter now.

The thing is that Kotlin doesn't throw away (of buffer) values unless a consumer decides to do so.
If you just collect a Flow your emit calls will suspend until the value has been collected.
See the following sample: https://pl.kotl.in/oDEVW_VZ5.

With AsyncThrowingStream that isn't the case. Values are either buffered or dropped depending on the buffer policy.
We don't want that, we need to make sure that the Kotlin emit calls suspend until Swift is actually collecting the values.

KMP-NativeCoroutines/sample/Async/AsyncSequenceIntegrationTests.swift
I would like to run this test but it is not part of any test target so I'm not sure how to run it?

If you open the sample project in Xcode you can run the integrations tests from there, or using the following command:

xcodebuild test -project KMPNativeCoroutinesSample.xcodeproj -scheme "macOS App" -destination "platform=OS X"

@doozMen
Copy link
Author

doozMen commented Mar 3, 2025

Let me try grasping what you are saying and understand the test more.

@doozMen
Copy link
Author

doozMen commented Mar 3, 2025

I adjusted the test KMP-NativeCoroutines/sample/Async/AsyncSequenceIntegrationTests.swift to prove that that AsyncThrowingStream does buffer by default and output all the values. Could you verify that with the tests?

In my opinion these changes align with Swift’s AsyncStream behaviour while preserving the expected Kotlin Flow semantics. Would love to hear your thoughts on this, and @mattmassicotte’s perspective as well.

I discussed this here https://bsky.app/profile/doozmen.bsky.social/post/3lj5gdxsuxs27 and to my knowledge there are no downsides to this simplified code. Could you let me know what I should do to be able to merge it?

Thanks for the thorough and very interesting feedback sofar!

@rickclephas
Copy link
Owner

I adjusted the test to prove that that AsyncThrowingStream does buffer by default and output all the values. Could you verify that with the tests?

Sorry about the confusion, but we don't need to prove the buffering behaviour.
We actually need to prevent the buffering (or dropping) behaviour from occurring.

In my opinion these changes align with Swift’s AsyncStream behaviour while preserving the expected Kotlin Flow semantics. Would love to hear your thoughts on this, and @mattmassicotte’s perspective as well.

The challenge here is in preserving the Kotlin behaviour. In Kotlin you emit values to a Flow.
This is a suspend function that will wait for the consumer to collect the value in question.

Since the yield from AsyncThrowingStream is buffering the values it doesn't provide any back pressure.
Let's say we have the following Kotlin Flow:

val flow = flow {
    var i = 0
    while (true) {
        emit(i++)
    }
}

If we would collect that Flow in Kotlin like this:

flow.collect {
    delay(1.seconds)
    println("Collected $it")
}

We are always emitting a single value every second (even if we were to switch the Dispatcher).

To bridge from Kotlin to Swift the library starts a Job in Kotlin that collects the Flow and invokes the callbacks/closures.
We somehow need to wait for Swift to fully consume the value before resuming the Kotlin code.

Since yield will buffer (or drop depending on the policy) it isn't able to tell Kotlin when to resume.
Meaning that in the above example Kotlin would emit a whole lot of values that Swift is going to buffer.

Right now this is solved by waiting for the call to next on the Iterator.
Once that is invoked the value is being consumed and we tell Kotlin to continue.
While it doesn't match 100% (technically we still buffer at most 1 value) it gets really close to the Kotlin behaviour.

I discussed this here https://bsky.app/profile/doozmen.bsky.social/post/3lj5gdxsuxs27 and to my knowledge there are no downsides to this simplified code. Could you let me know what I should do to be able to merge it?

This line is the important part of the test.
It verifies that Kotlin doesn't emit more values than Swift can handle.

XCTAssert(emittedCount == receivedValueCount || emittedCount == receivedValueCount + 1, "Back pressure isn't applied")

I am happy to accept simplifications, but they need to preserve this "back pressure" behaviour.
So far I haven't been able to implement that with AsyncThrowingStream hence the custom AsyncSequence.

@rickclephas
Copy link
Owner

While it doesn't match 100% (technically we still buffer at most 1 value) it gets really close to the Kotlin behaviour.

Writing this made me realise that we could actually make it match 100%.
With a small change (#208) we already have 99.99%.

@doozMen
Copy link
Author

doozMen commented Mar 18, 2025

Sorry about my delayed response and question. It is a topic with very much knowledge needed from behind the scenes. I can see you put a lot of thinking in it and my thanks for that.

On the Swift side of things I am worried about this behaviour and disabling. I cannot yet quite grasp why it is needed. I will ask @ktoso to have a look if he has time as this is a bit to complex for me. Maybe I wrongfully trust the normal AsyncThrowingStream more and worry a bit about the custom implementation.

@doozMen
Copy link
Author

doozMen commented Mar 24, 2025

I'm not sure how to continue. I made this pull request as I wanted to have the expected behaviour on the swift side. All the tests pass if I test for the output. The output of the stream. I fail to understand why there is a need to mimic the kotlin behaviour in the swift interface. It should at least in my opinion react as swift expect.

I showed that even if the consuming thread blocks that kotlin can continue yielding values into the AsyncThrowingStream if there is a buffer, if not then the buffer correctly does nothing and values are lost.

To do this I was able to remove fatalErrors from the code and have a simplified implementation. We are also using this implementation in our production code.

I however do understand that I lack some domain knowledge about the Kotlin part and you explained it very well. But I'm just unsure why the mimic of Kotlin behaviour in a swift interface is needed? I challenge even the need for this indeed. I made the PR as the current implementation made me doubt using the library as I failed to understand why a simple AsyncThrowingStream would not work. I find it easier to debug and would like to ask if it would be possible to approve this?

@rickclephas
Copy link
Owner

I made this pull request as I wanted to have the expected behaviour on the swift side.

What exactly is it that isn't working as expected?

All the tests pass if I test for the output. The output of the stream.

It's not just about the output of the stream.

I fail to understand why there is a need to mimic the kotlin behaviour in the swift interface.

The whole point of the library is to be able to use a Kotlin Flow in your Swift code.
So basically the whole AsyncSequence is just a Swift friendly way to call flow.collect { }.
Therefore we need to respect the contract of the Flow, specifically how it suspends until a value has been collected.

It should at least in my opinion react as swift expect.

Please provide details on what currently doesn't react the way Swift expects.

I showed that even if the consuming thread blocks that kotlin can continue yielding values into the AsyncThrowingStream if there is a buffer, if not then the buffer correctly does nothing and values are lost.

This is the whole point. That's not how Flows are expected to behave.
They don't buffer, nor do they drop any values. They simply suspend until the value is collected.

To do this I was able to remove fatalErrors from the code and have a simplified implementation. We are also using this implementation in our production code.

I am not saying you can't use it like that. I am just saying that by doing this you are possibly creating a huge buffer, or dropping values, which isn't expected behaviour.

But I'm just unsure why the mimic of Kotlin behaviour in a swift interface is needed?

Because you are effectively calling flow.collect { }, so we need to respect how that function is supposed to behave.

I find it easier to debug and would like to ask if it would be possible to approve this?

I am afraid we won't be able to merge this, unless we can preserve the current behaviour.

@doozMen
Copy link
Author

doozMen commented Apr 14, 2025

Ok so we say the same thing but in text we do not understand each other. If possible we could have a call about this? We need this in our production app that serves al lot of clients of Mediahuis.

I showed that even if the consuming thread blocks that kotlin can continue yielding values into the AsyncThrowingStream if there is a buffer, if not then the buffer correctly does nothing and values are lost.

You respond:

This is the whole point. That's not how Flows are expected to behave.
They don't buffer, nor do they drop any values. They simply suspend until the value is collected.

As you can see the default behaviour of async streams is already to solve the issue you are coding agains in a too complicated way with fatal errors including. The buffer policy is exactly to cover the use case you describe with the flow collect to follow.

So I will need this behaviour to be consistent. I do not like libraries with fatal errors trying to avoid a problem that do not exist. You are off course fee to close this pr. Please do as then I stop hoping and we can make the discussion to move away from this otherwise beautiful library.

Thanks again for the time and effort but if needed I think we should agree to disagree unfortunately.

@doozMen
Copy link
Author

doozMen commented Apr 14, 2025

I am not saying you can't use it like that. I am just saying that by doing this you are possibly creating a huge buffer, or dropping values, which isn't expected behaviour.

The test proves that you never drop values. The huge buffer is normally not a problem and can be tuned via a setting.

@doozMen
Copy link
Author

doozMen commented Apr 14, 2025

I am afraid we won't be able to merge this, unless we can preserve the current behaviour.

I made this merge request as I can prove that the current behaviour is wrong and does not work well with a large scale app that relies on streams to be piped together. The buffer is needed and has to work as expected. The current implementation tries to forcefully move away from the standard buffer behaviour causing our app to not function. So I'm trying to tell you I really think it is wrong to bypass the buffer in order to preserve a behaviour that is kotlin related and should not be preserved.

Sorry again I really know these reviews take time and effort but this can be closed then as you suggest. But we cannot use your library then. I did not close it my self as I still hope we can resolve this?

@rickclephas
Copy link
Owner

As you can see the default behaviour of async streams is already to solve the issue you are coding agains in a too complicated way with fatal errors including. The buffer policy is exactly to cover the use case you describe with the flow collect to follow.

It isn't. You have removed the test logic that is supposed to test this behaviour.
We don't want a buffer nor do we want do drop values. The emit call should suspend until the value has been consumed.

The huge buffer is normally not a problem and can be tuned via a setting.

It's a problem because that's not how a Flow should behave.
If you need buffering you are supposed to add a buffer (or a Swift equivalent if you just need it in Swift).

I made this merge request as I can prove that the current behaviour is wrong and does not work well with a large scale app that relies on streams to be piped together. The buffer is needed and has to work as expected. The current implementation tries to forcefully move away from the standard buffer behaviour causing our app to not function.

You didn't provide any errors, samples or the like that prove the current behaviour is wrong.
There is also no mention of why the current behaviour doesn't work well in a large scale app.

I am going to close this now as I don't believe what you are trying to achieve is possible while preserving the expected behaviour. Feel free to open an issue if you have concrete details about a use case that currently doesn't work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants