@@ -6,6 +6,8 @@ import com.openai.core.http.HttpResponse
66import com.openai.core.http.HttpResponse.Handler
77import com.openai.core.http.PhantomReachableClosingStreamResponse
88import com.openai.core.http.StreamResponse
9+ import com.openai.errors.OpenAIIoException
10+ import java.io.IOException
911import java.util.stream.Stream
1012import kotlin.streams.asStream
1113
@@ -14,17 +16,30 @@ internal fun <T> streamHandler(
1416 block : suspend SequenceScope <T >.(response: HttpResponse , lines: Sequence <String >) -> Unit
1517): Handler <StreamResponse <T >> =
1618 object : Handler <StreamResponse <T >> {
19+
1720 override fun handle (response : HttpResponse ): StreamResponse <T > {
1821 val reader = response.body().bufferedReader()
1922 val sequence =
2023 // Wrap in a `CloseableSequence` to avoid performing a read on the `reader`
2124 // after it has been closed, which would throw an `IOException`.
2225 CloseableSequence (
23- sequence { reader.useLines { block(response, it) } }.constrainOnce()
26+ sequence {
27+ reader.useLines { lines ->
28+ block(
29+ response,
30+ // We wrap the `lines` instead of the top-level sequence because
31+ // we only want to catch `IOException` from the reader; not from
32+ // the user's own code.
33+ IOExceptionWrappingSequence (lines),
34+ )
35+ }
36+ }
37+ .constrainOnce()
2438 )
2539
2640 return PhantomReachableClosingStreamResponse (
2741 object : StreamResponse <T > {
42+
2843 override fun stream (): Stream <T > = sequence.asStream()
2944
3045 override fun close () {
@@ -37,18 +52,44 @@ internal fun <T> streamHandler(
3752 }
3853 }
3954
55+ /* * A sequence that catches, wraps, and rethrows [IOException] as [OpenAIIoException]. */
56+ private class IOExceptionWrappingSequence <T >(private val sequence : Sequence <T >) : Sequence<T> {
57+
58+ override fun iterator (): Iterator <T > {
59+ val iterator = sequence.iterator()
60+ return object : Iterator <T > {
61+
62+ override fun next (): T =
63+ try {
64+ iterator.next()
65+ } catch (e: IOException ) {
66+ throw OpenAIIoException (" Stream failed" , e)
67+ }
68+
69+ override fun hasNext (): Boolean =
70+ try {
71+ iterator.hasNext()
72+ } catch (e: IOException ) {
73+ throw OpenAIIoException (" Stream failed" , e)
74+ }
75+ }
76+ }
77+ }
78+
4079/* *
4180 * A sequence that can be closed.
4281 *
4382 * Once [close] is called, it will not yield more elements. It will also no longer consult the
4483 * underlying [Iterator.hasNext] method.
4584 */
4685private class CloseableSequence <T >(private val sequence : Sequence <T >) : Sequence<T> {
86+
4787 private var isClosed: Boolean = false
4888
4989 override fun iterator (): Iterator <T > {
5090 val iterator = sequence.iterator()
5191 return object : Iterator <T > {
92+
5293 override fun next (): T = iterator.next()
5394
5495 override fun hasNext (): Boolean = ! isClosed && iterator.hasNext()
0 commit comments