From 4168eb90fda25f1b2e68ea22b1288f0a50504460 Mon Sep 17 00:00:00 2001 From: aduchate Date: Tue, 1 Aug 2023 11:46:43 +0200 Subject: [PATCH] Add tests and fix the toTextFlow() in case garbage is sent to us --- .../net/web/WebClient.kt | 14 ++-- .../web/ResponseTest.kt | 80 +++++++++++++++++++ 2 files changed, 85 insertions(+), 9 deletions(-) create mode 100644 src/test/kotlin/io/icure/asyncjacksonhttpclient/web/ResponseTest.kt diff --git a/src/main/kotlin/io/icure/asyncjacksonhttpclient/net/web/WebClient.kt b/src/main/kotlin/io/icure/asyncjacksonhttpclient/net/web/WebClient.kt index 7dfaa6d..07dbe9f 100644 --- a/src/main/kotlin/io/icure/asyncjacksonhttpclient/net/web/WebClient.kt +++ b/src/main/kotlin/io/icure/asyncjacksonhttpclient/net/web/WebClient.kt @@ -32,6 +32,7 @@ import java.net.URI import java.nio.ByteBuffer import java.nio.CharBuffer import java.nio.charset.Charset +import java.nio.charset.CoderMalfunctionError import java.nio.charset.CoderResult import java.nio.charset.StandardCharsets import java.time.Duration @@ -81,13 +82,10 @@ interface Response { fun toBytesFlow(buffer: Int = 1) = toFlow().buffer(buffer) fun toJsonEvents(asyncParser: JsonParser, buffer: Int = 1) = toBytesFlow(buffer).toJsonEvents(asyncParser) fun toTextFlow(charset: Charset = StandardCharsets.UTF_8, buffer: Int = 1): Flow = flow { - val decoder = charset.newDecoder() + var decoder = charset.newDecoder() var remainingBytes: ByteBuffer? = null - var skip = false - var error: Exception? = null toFlow().collect { bb -> - if (!skip) { var cb = CharBuffer.allocate( ((bb.remaining() + (remainingBytes?.remaining() ?: 0)) * decoder.averageCharsPerByte()).roundToInt() ) @@ -118,16 +116,14 @@ interface Response { } else null } else -> { - error = IllegalStateException("Error decoding response : $coderResult") - skip = true + //In case of error, we emit what has been decoded so far, and we purge the buffer + emit(cb) + decoder = charset.newDecoder() null } } - } } - error?.let { throw it } - remainingBytes?.let { if (it.hasRemaining()) { val cb = CharBuffer.allocate((it.remaining() * decoder.averageCharsPerByte()).roundToInt()) diff --git a/src/test/kotlin/io/icure/asyncjacksonhttpclient/web/ResponseTest.kt b/src/test/kotlin/io/icure/asyncjacksonhttpclient/web/ResponseTest.kt new file mode 100644 index 0000000..01b7bc2 --- /dev/null +++ b/src/test/kotlin/io/icure/asyncjacksonhttpclient/web/ResponseTest.kt @@ -0,0 +1,80 @@ +package io.icure.asyncjacksonhttpclient.web + +import io.icure.asyncjacksonhttpclient.net.web.Response +import io.icure.asyncjacksonhttpclient.net.web.ResponseStatus +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import org.reactivestreams.Publisher +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import java.nio.ByteBuffer +import java.time.Duration +import kotlin.random.Random + +val random = Random(123) +@OptIn(ExperimentalCoroutinesApi::class) +class BinaryTestResponse: Response { + override fun toFlux(): Publisher = Flux.interval(Duration.ofMillis(1)).map { ByteBuffer.wrap(random.nextBytes(32)) } + override fun onStatus(status: Int, handler: (ResponseStatus) -> Mono): BinaryTestResponse = BinaryTestResponse() + override fun onHeader(header: String, handler: (String) -> Mono): BinaryTestResponse = BinaryTestResponse() + override fun withTiming(handler: (Long) -> Mono): BinaryTestResponse = BinaryTestResponse() +} + +val allowedChars = (' '..'\uFF0F') +@OptIn(ExperimentalCoroutinesApi::class) +class CharTestResponse: Response { + override fun toFlux(): Publisher = Flux.interval(Duration.ofMillis(1)).map { + val s = (0..31).map { allowedChars.random() }.joinToString("") + ByteBuffer.wrap(s.toByteArray()) + } + override fun onStatus(status: Int, handler: (ResponseStatus) -> Mono): CharTestResponse = CharTestResponse() + override fun onHeader(header: String, handler: (String) -> Mono): CharTestResponse = CharTestResponse() + override fun withTiming(handler: (Long) -> Mono): CharTestResponse = CharTestResponse() +} + +@OptIn(ExperimentalCoroutinesApi::class) +class TrivialTestResponse: Response { + override fun toFlux(): Publisher = Flux.interval(Duration.ofMillis(1)).map { + ByteBuffer.wrap("abcdefghijklmnopqrstuvwxyz012345".toByteArray()) + } + override fun onStatus(status: Int, handler: (ResponseStatus) -> Mono): CharTestResponse = CharTestResponse() + override fun onHeader(header: String, handler: (String) -> Mono): CharTestResponse = CharTestResponse() + override fun withTiming(handler: (Long) -> Mono): CharTestResponse = CharTestResponse() +} + +class ResponseTest { + @Test + fun testFlowOfTrivialChars() = runBlocking { + val result = TrivialTestResponse().toTextFlow().take(1024).map { + it.toString().also { + assertEquals("abcdefghijklmnopqrstuvwxyz012345", it) + println(it) + } + }.toList() + assertEquals(1024, result.size) + } + + @Test + fun testFlowOfChars() = runBlocking { + val result = CharTestResponse().toTextFlow().take(1024).map { + it.toString().also { + assertEquals(32, it.length) + println(it) + } + }.toList() + assertEquals(1024, result.size) + } + + @Test + fun testFlowOfBinary() = runBlocking { + val result = BinaryTestResponse().toTextFlow().take(1024).map { + it.toString().also { println(it) } + }.toList() + assertEquals(1024, result.size) + } +}