diff --git a/src/main/kotlin/io/icure/asyncjacksonhttpclient/net/web/WebClient.kt b/src/main/kotlin/io/icure/asyncjacksonhttpclient/net/web/WebClient.kt index dedbf28..7d1d58d 100644 --- a/src/main/kotlin/io/icure/asyncjacksonhttpclient/net/web/WebClient.kt +++ b/src/main/kotlin/io/icure/asyncjacksonhttpclient/net/web/WebClient.kt @@ -26,6 +26,7 @@ import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.flow import kotlinx.coroutines.reactive.asFlow import org.reactivestreams.Publisher +import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.net.URI import java.nio.ByteBuffer @@ -67,6 +68,13 @@ interface Request { @ExperimentalCoroutinesApi interface Response { + fun toMono( + handler: ( + body: Flux, + statusCode: Int, + headers: Map> + ) -> Mono, + ): Mono fun toFlux(): Publisher fun toFlow() = toFlux().asFlow() fun onStatus(status: Int, handler: (ResponseStatus) -> Mono): Response diff --git a/src/main/kotlin/io/icure/asyncjacksonhttpclient/netty/NettyWebClient.kt b/src/main/kotlin/io/icure/asyncjacksonhttpclient/netty/NettyWebClient.kt index e39b8c5..c5b9f19 100644 --- a/src/main/kotlin/io/icure/asyncjacksonhttpclient/netty/NettyWebClient.kt +++ b/src/main/kotlin/io/icure/asyncjacksonhttpclient/netty/NettyWebClient.kt @@ -86,6 +86,54 @@ class NettyResponse( private val headerHandler: Map Mono> = mapOf(), private val timingHandler: ((Long) -> Mono)? = null, ) : Response { + override fun toMono(handler: (Flux, Int, Map>) -> Mono): Mono { + val start = System.currentTimeMillis() + return Mono.deferContextual { ctx -> + responseReceiver.response { clientResponse, flux -> + val code = clientResponse.status().code() + val responseHeaders = clientResponse.responseHeaders() + val headerHandlers = (if (headerHandler.isNotEmpty()) { + responseHeaders.fold(Mono.empty()) { m: Mono<*>, (k, v) -> m.then(headerHandler[k]?.let { it(v) } ?: Mono.empty()) } + } else Mono.empty()) + val headers: Map> = responseHeaders.groupBy({ it.key }, { it.value }) + + headerHandlers.then( + (statusHandlers[code] ?: statusHandlers[code - (code % 100)])?.let { handler -> + val agg = flux.aggregate().asByteArray() + agg.flatMap { bytes -> + val res = handler(object : ResponseStatus(code, clientResponse.responseHeaders().entries()) { + override fun responseBodyAsString() = bytes.toString(Charsets.UTF_8) + }) + if (res == Mono.empty()) { + handler(Flux.just(ByteBuffer.wrap(bytes)), code, headers) + } else { + res.flatMap { Mono.error(it) } + } + }.switchIfEmpty(handler(object : ResponseStatus(code, clientResponse.responseHeaders().entries()) { + override fun responseBodyAsString() = "" + }).let { res -> + if (res == Mono.empty()) { + handler(Flux.just(ByteBuffer.wrap(ByteArray(0))), code, headers) + } else { + res.flatMap { Mono.error(it) } + } + }) + } ?: handler( + flux.map { + val ba = ByteArray(it.readableBytes()) + it.readBytes(ba) //Bytes need to be read now, before they become unavailable. If we just return the nioBuffer(), we have no guarantee that the bytes will be the same when the ByteBuffer will be processed down the flux + ByteBuffer.wrap(ba) + }, + code, + headers + ) + ) + }.doOnTerminate { + timingHandler?.let { it(System.currentTimeMillis() - start).contextWrite(ctx).subscribe() } + }.single() + } + } + override fun toFlux(): Flux { val start = System.currentTimeMillis() return Flux.deferContextual { ctx -> responseReceiver.response { clientResponse, flux ->