From 5be106909a3f3cc3f082898d46c686ecdfb08c95 Mon Sep 17 00:00:00 2001 From: Luca Tremamunno Date: Thu, 31 Jul 2025 09:31:18 +0200 Subject: [PATCH 1/2] Add toMono method top Response --- .../net/web/WebClient.kt | 8 +++ .../netty/NettyWebClient.kt | 49 +++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/src/main/kotlin/io/icure/asyncjacksonhttpclient/net/web/WebClient.kt b/src/main/kotlin/io/icure/asyncjacksonhttpclient/net/web/WebClient.kt index dedbf28..7d1d58d 100644 --- a/src/main/kotlin/io/icure/asyncjacksonhttpclient/net/web/WebClient.kt +++ b/src/main/kotlin/io/icure/asyncjacksonhttpclient/net/web/WebClient.kt @@ -26,6 +26,7 @@ import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.flow import kotlinx.coroutines.reactive.asFlow import org.reactivestreams.Publisher +import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.net.URI import java.nio.ByteBuffer @@ -67,6 +68,13 @@ interface Request { @ExperimentalCoroutinesApi interface Response { + fun toMono( + handler: ( + body: Flux, + statusCode: Int, + headers: Map> + ) -> Mono, + ): Mono fun toFlux(): Publisher fun toFlow() = toFlux().asFlow() fun onStatus(status: Int, handler: (ResponseStatus) -> Mono): Response diff --git a/src/main/kotlin/io/icure/asyncjacksonhttpclient/netty/NettyWebClient.kt b/src/main/kotlin/io/icure/asyncjacksonhttpclient/netty/NettyWebClient.kt index e39b8c5..8e97d77 100644 --- a/src/main/kotlin/io/icure/asyncjacksonhttpclient/netty/NettyWebClient.kt +++ b/src/main/kotlin/io/icure/asyncjacksonhttpclient/netty/NettyWebClient.kt @@ -86,6 +86,55 @@ class NettyResponse( private val headerHandler: Map Mono> = mapOf(), private val timingHandler: ((Long) -> Mono)? = null, ) : Response { + override fun toMono(handler: (Flux, Int, Map>) -> Mono): Mono { + val start = System.currentTimeMillis() + return Mono.deferContextual { ctx -> + responseReceiver.response { clientResponse, flux -> + val code = clientResponse.status().code() + + val headerHandlers = (if (headerHandler.isNotEmpty()) { + clientResponse.responseHeaders().fold(Mono.empty()) { m: Mono<*>, (k, v) -> m.then(headerHandler[k]?.let { it(v) } ?: Mono.empty()) } + } else Mono.empty()) + + val tmpHeaders: Map> = emptyMap() + + headerHandlers.then( + (statusHandlers[code] ?: statusHandlers[code - (code % 100)])?.let { handler -> + val agg = flux.aggregate().asByteArray() + agg.flatMap { bytes -> + val res = handler(object : ResponseStatus(code, clientResponse.responseHeaders().entries()) { + override fun responseBodyAsString() = bytes.toString(Charsets.UTF_8) + }) + if (res == Mono.empty()) { + handler(Flux.just(ByteBuffer.wrap(bytes)), code, tmpHeaders) + } else { + res.flatMap { Mono.error(it) } + } + }.switchIfEmpty(handler(object : ResponseStatus(code, clientResponse.responseHeaders().entries()) { + override fun responseBodyAsString() = "" + }).let { res -> + if (res == Mono.empty()) { + handler(Flux.just(ByteBuffer.wrap(ByteArray(0))), code, tmpHeaders) + } else { + res.flatMap { Mono.error(it) } + } + }) + } ?: handler( + flux.map { + val ba = ByteArray(it.readableBytes()) + it.readBytes(ba) //Bytes need to be read now, before they become unavailable. If we just return the nioBuffer(), we have no guarantee that the bytes will be the same when the ByteBuffer will be processed down the flux + ByteBuffer.wrap(ba) + }, + code, + tmpHeaders + ) + ) + }.doOnTerminate { + timingHandler?.let { it(System.currentTimeMillis() - start).contextWrite(ctx).subscribe() } + }.single() + } + } + override fun toFlux(): Flux { val start = System.currentTimeMillis() return Flux.deferContextual { ctx -> responseReceiver.response { clientResponse, flux -> From 5b5a71bda8bcb5cbb59a73099c31b01a2fcc95d3 Mon Sep 17 00:00:00 2001 From: Luca Tremamunno Date: Thu, 31 Jul 2025 09:47:45 +0200 Subject: [PATCH 2/2] Add proper headers to handler call --- .../asyncjacksonhttpclient/netty/NettyWebClient.kt | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/main/kotlin/io/icure/asyncjacksonhttpclient/netty/NettyWebClient.kt b/src/main/kotlin/io/icure/asyncjacksonhttpclient/netty/NettyWebClient.kt index 8e97d77..c5b9f19 100644 --- a/src/main/kotlin/io/icure/asyncjacksonhttpclient/netty/NettyWebClient.kt +++ b/src/main/kotlin/io/icure/asyncjacksonhttpclient/netty/NettyWebClient.kt @@ -91,12 +91,11 @@ class NettyResponse( return Mono.deferContextual { ctx -> responseReceiver.response { clientResponse, flux -> val code = clientResponse.status().code() - + val responseHeaders = clientResponse.responseHeaders() val headerHandlers = (if (headerHandler.isNotEmpty()) { - clientResponse.responseHeaders().fold(Mono.empty()) { m: Mono<*>, (k, v) -> m.then(headerHandler[k]?.let { it(v) } ?: Mono.empty()) } + responseHeaders.fold(Mono.empty()) { m: Mono<*>, (k, v) -> m.then(headerHandler[k]?.let { it(v) } ?: Mono.empty()) } } else Mono.empty()) - - val tmpHeaders: Map> = emptyMap() + val headers: Map> = responseHeaders.groupBy({ it.key }, { it.value }) headerHandlers.then( (statusHandlers[code] ?: statusHandlers[code - (code % 100)])?.let { handler -> @@ -106,7 +105,7 @@ class NettyResponse( override fun responseBodyAsString() = bytes.toString(Charsets.UTF_8) }) if (res == Mono.empty()) { - handler(Flux.just(ByteBuffer.wrap(bytes)), code, tmpHeaders) + handler(Flux.just(ByteBuffer.wrap(bytes)), code, headers) } else { res.flatMap { Mono.error(it) } } @@ -114,7 +113,7 @@ class NettyResponse( override fun responseBodyAsString() = "" }).let { res -> if (res == Mono.empty()) { - handler(Flux.just(ByteBuffer.wrap(ByteArray(0))), code, tmpHeaders) + handler(Flux.just(ByteBuffer.wrap(ByteArray(0))), code, headers) } else { res.flatMap { Mono.error(it) } } @@ -126,7 +125,7 @@ class NettyResponse( ByteBuffer.wrap(ba) }, code, - tmpHeaders + headers ) ) }.doOnTerminate {