Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import java.net.URI
import java.nio.ByteBuffer
import java.nio.CharBuffer
import java.nio.charset.Charset
import java.nio.charset.CoderMalfunctionError
import java.nio.charset.CoderResult
import java.nio.charset.StandardCharsets
import java.time.Duration
Expand Down Expand Up @@ -81,13 +82,10 @@ interface Response {
fun toBytesFlow(buffer: Int = 1) = toFlow().buffer(buffer)
fun toJsonEvents(asyncParser: JsonParser, buffer: Int = 1) = toBytesFlow(buffer).toJsonEvents(asyncParser)
fun toTextFlow(charset: Charset = StandardCharsets.UTF_8, buffer: Int = 1): Flow<CharBuffer> = flow<CharBuffer> {
val decoder = charset.newDecoder()
var decoder = charset.newDecoder()
var remainingBytes: ByteBuffer? = null
var skip = false
var error: Exception? = null

toFlow().collect { bb ->
if (!skip) {
var cb = CharBuffer.allocate(
((bb.remaining() + (remainingBytes?.remaining() ?: 0)) * decoder.averageCharsPerByte()).roundToInt()
)
Expand Down Expand Up @@ -118,16 +116,14 @@ interface Response {
} else null
}
else -> {
error = IllegalStateException("Error decoding response : $coderResult")
skip = true
//In case of error, we emit what has been decoded so far, and we purge the buffer
emit(cb)
decoder = charset.newDecoder()
null
}
}
}
}

error?.let { throw it }

remainingBytes?.let {
if (it.hasRemaining()) {
val cb = CharBuffer.allocate((it.remaining() * decoder.averageCharsPerByte()).roundToInt())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.icure.asyncjacksonhttpclient.web

import io.icure.asyncjacksonhttpclient.net.web.Response
import io.icure.asyncjacksonhttpclient.net.web.ResponseStatus
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.nio.ByteBuffer
import java.time.Duration
import kotlin.random.Random

val random = Random(123)
@OptIn(ExperimentalCoroutinesApi::class)
class BinaryTestResponse: Response {
override fun toFlux(): Publisher<ByteBuffer> = Flux.interval(Duration.ofMillis(1)).map { ByteBuffer.wrap(random.nextBytes(32)) }
override fun onStatus(status: Int, handler: (ResponseStatus) -> Mono<out Throwable>): BinaryTestResponse = BinaryTestResponse()
override fun onHeader(header: String, handler: (String) -> Mono<Unit>): BinaryTestResponse = BinaryTestResponse()
override fun withTiming(handler: (Long) -> Mono<Unit>): BinaryTestResponse = BinaryTestResponse()
}

val allowedChars = (' '..'\uFF0F')
@OptIn(ExperimentalCoroutinesApi::class)
class CharTestResponse: Response {
override fun toFlux(): Publisher<ByteBuffer> = Flux.interval(Duration.ofMillis(1)).map {
val s = (0..31).map { allowedChars.random() }.joinToString("")
ByteBuffer.wrap(s.toByteArray())
}
override fun onStatus(status: Int, handler: (ResponseStatus) -> Mono<out Throwable>): CharTestResponse = CharTestResponse()
override fun onHeader(header: String, handler: (String) -> Mono<Unit>): CharTestResponse = CharTestResponse()
override fun withTiming(handler: (Long) -> Mono<Unit>): CharTestResponse = CharTestResponse()
}

@OptIn(ExperimentalCoroutinesApi::class)
class TrivialTestResponse: Response {
override fun toFlux(): Publisher<ByteBuffer> = Flux.interval(Duration.ofMillis(1)).map {
ByteBuffer.wrap("abcdefghijklmnopqrstuvwxyz012345".toByteArray())
}
override fun onStatus(status: Int, handler: (ResponseStatus) -> Mono<out Throwable>): CharTestResponse = CharTestResponse()
override fun onHeader(header: String, handler: (String) -> Mono<Unit>): CharTestResponse = CharTestResponse()
override fun withTiming(handler: (Long) -> Mono<Unit>): CharTestResponse = CharTestResponse()
}

class ResponseTest {
@Test
fun testFlowOfTrivialChars() = runBlocking {
val result = TrivialTestResponse().toTextFlow().take(1024).map {
it.toString().also {
assertEquals("abcdefghijklmnopqrstuvwxyz012345", it)
println(it)
}
}.toList()
assertEquals(1024, result.size)
}

@Test
fun testFlowOfChars() = runBlocking {
val result = CharTestResponse().toTextFlow().take(1024).map {
it.toString().also {
assertEquals(32, it.length)
println(it)
}
}.toList()
assertEquals(1024, result.size)
}

@Test
fun testFlowOfBinary() = runBlocking {
val result = BinaryTestResponse().toTextFlow().take(1024).map {
it.toString().also { println(it) }
}.toList()
assertEquals(1024, result.size)
}
}