From 8daf3d985862b06b389c075ecaee42585a721e61 Mon Sep 17 00:00:00 2001 From: Osip Fatkullin Date: Wed, 29 Apr 2026 13:28:35 +0200 Subject: [PATCH 1/2] Curl: Extract removeEasyHandle function --- .../curl/internal/CurlMultiApiHandler.kt | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/internal/CurlMultiApiHandler.kt b/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/internal/CurlMultiApiHandler.kt index b3de6627917..1532ac12edf 100644 --- a/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/internal/CurlMultiApiHandler.kt +++ b/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/internal/CurlMultiApiHandler.kt @@ -138,16 +138,13 @@ internal class CurlMultiApiHandler : Closeable { val easyHandle = websocket.easyHandle val handler = activeHandles[easyHandle] ?: return if (handler.responseWrapper.get() !== websocket) return - activeHandles.remove(easyHandle) - processCancelledEasyHandle(easyHandle, cause) - handler.responseCompletable.completeExceptionally(cause) - handler.dispose() + removeEasyHandle(easyHandle, cause) } fun perform(transfersRunning: IntVarOf) { if (activeHandles.isEmpty()) return - // Process cancelled handles before performing to prevent them from blocking curl_multi_poll. + // Process cancelled handles before performing prevent them from blocking curl_multi_poll. if (cancelledHandles.isNotEmpty()) { handleCompleted() } @@ -221,11 +218,8 @@ internal class CurlMultiApiHandler : Closeable { } private fun handleCompleted() { - for (cancellation in cancelledHandles) { - val handler = activeHandles.remove(cancellation.first) ?: continue - processCancelledEasyHandle(cancellation.first, cancellation.second) - handler.responseCompletable.completeExceptionally(cancellation.second) - handler.dispose() + for ((easyHandle, cause) in cancelledHandles) { + removeEasyHandle(easyHandle, cause) } cancelledHandles.clear() @@ -256,6 +250,16 @@ internal class CurlMultiApiHandler : Closeable { } } + private fun removeEasyHandle(easyHandle: EasyHandle, cause: Throwable) { + val handler = activeHandles.remove(easyHandle) ?: return + try { + processCancelledEasyHandle(easyHandle, cause) + } finally { + handler.responseCompletable.completeExceptionally(cause) + handler.dispose() + } + } + private fun processCancelledEasyHandle(easyHandle: EasyHandle, cause: Throwable): CurlFail = memScoped { try { val responseDataRef = alloc() From 3c5b5c300118b0dd390ed575ddc819a46958b97b Mon Sep 17 00:00:00 2001 From: Osip Fatkullin Date: Wed, 15 Apr 2026 18:22:38 +0200 Subject: [PATCH 2/2] KTOR-9545 Curl: Attach bodyChannel to correct job --- .../client/engine/curl/CurlClientEngine.kt | 3 +- .../ktor/client/engine/curl/CurlProcessor.kt | 2 +- .../curl/internal/CurlMultiApiHandler.kt | 6 ++-- .../client/engine/curl/internal/CurlRaw.kt | 13 +++++--- .../test/io/ktor/client/tests/ContentTest.kt | 32 ++++++++++++++++--- .../io/ktor/client/tests/utils/WaitUtils.kt | 19 ++++++----- 6 files changed, 53 insertions(+), 22 deletions(-) diff --git a/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/CurlClientEngine.kt b/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/CurlClientEngine.kt index bf0954f6887..e1188e76ec8 100644 --- a/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/CurlClientEngine.kt +++ b/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/CurlClientEngine.kt @@ -15,6 +15,7 @@ import io.ktor.http.* import io.ktor.http.cio.* import io.ktor.util.date.* import io.ktor.utils.io.* +import kotlinx.coroutines.job internal class CurlClientEngine( override val config: CurlClientEngineConfig @@ -30,7 +31,7 @@ internal class CurlClientEngine( val requestTime = GMTDate() - val curlRequest = data.toCurlRequest(config) + val curlRequest = data.toCurlRequest(config, callContext.job) val responseData = curlProcessor.executeRequest(curlRequest) return with(responseData) { diff --git a/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/CurlProcessor.kt b/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/CurlProcessor.kt index 1a3e31aee74..ec17b036cca 100644 --- a/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/CurlProcessor.kt +++ b/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/CurlProcessor.kt @@ -103,7 +103,7 @@ internal class CurlProcessor(coroutineContext: CoroutineContext) { val (requestData, completionHandler) = task val requestHandler = api.scheduleRequest(requestData, completionHandler) - val requestCleaner = requestData.executionContext.invokeOnCompletion { cause -> + val requestCleaner = requestData.callContext.invokeOnCompletion { cause -> if (cause == null) return@invokeOnCompletion cancelRequest(requestHandler, cause) } diff --git a/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/internal/CurlMultiApiHandler.kt b/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/internal/CurlMultiApiHandler.kt index 1532ac12edf..4b8a5b5e925 100644 --- a/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/internal/CurlMultiApiHandler.kt +++ b/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/internal/CurlMultiApiHandler.kt @@ -65,7 +65,7 @@ internal class CurlMultiApiHandler : Closeable { wsConfig.maxFrameSize, ) } else { - CurlHttpResponseBody(request.executionContext) { + CurlHttpResponseBody(request.callContext) { unpauseEasyHandle(easyHandle) } } @@ -144,7 +144,7 @@ internal class CurlMultiApiHandler : Closeable { fun perform(transfersRunning: IntVarOf) { if (activeHandles.isEmpty()) return - // Process cancelled handles before performing prevent them from blocking curl_multi_poll. + // Process cancelled handles before performing to prevent them from blocking curl_multi_poll. if (cancelledHandles.isNotEmpty()) { handleCompleted() } @@ -204,7 +204,7 @@ internal class CurlMultiApiHandler : Closeable { private fun setupUploadContent(easyHandle: EasyHandle, request: CurlRequestData): COpaquePointer { val requestPointer = CurlRequestBodyData( body = request.content, - callContext = request.executionContext, + callContext = request.callContext, onUnpause = { unpauseEasyHandle(easyHandle) } diff --git a/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/internal/CurlRaw.kt b/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/internal/CurlRaw.kt index ee79f7e5b1f..0ed222b9b40 100644 --- a/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/internal/CurlRaw.kt +++ b/ktor-client/ktor-client-curl/desktop/src/io/ktor/client/engine/curl/internal/CurlRaw.kt @@ -11,7 +11,7 @@ import io.ktor.client.plugins.* import io.ktor.client.request.* import io.ktor.http.* import io.ktor.http.content.* -import io.ktor.util.Attributes +import io.ktor.util.* import io.ktor.utils.io.* import io.ktor.utils.io.core.* import kotlinx.cinterop.CPointer @@ -24,7 +24,10 @@ import libcurl.curl_slist import kotlin.coroutines.coroutineContext @OptIn(ExperimentalForeignApi::class, InternalAPI::class) -internal suspend fun HttpRequestData.toCurlRequest(config: CurlClientEngineConfig): CurlRequestData = CurlRequestData( +internal suspend fun HttpRequestData.toCurlRequest( + config: CurlClientEngineConfig, + callContext: Job, +): CurlRequestData = CurlRequestData( protocol = url.protocol.name, url = url.toString(), method = method.value, @@ -33,13 +36,13 @@ internal suspend fun HttpRequestData.toCurlRequest(config: CurlClientEngineConfi content = body.toByteChannel(), contentLength = body.contentLength ?: headers[HttpHeaders.ContentLength]?.toLongOrNull() ?: -1L, connectTimeout = getCapabilityOrNull(HttpTimeoutCapability)?.connectTimeoutMillis, - executionContext = executionContext, + callContext = callContext, isUpgradeRequest = isUpgradeRequest(), forceProxyTunneling = config.forceProxyTunneling, sslVerify = config.sslVerify, caInfo = config.caInfo, caPath = config.caPath, - attributes = attributes + attributes = attributes, ) internal class CurlRequestData @OptIn(ExperimentalForeignApi::class) constructor( @@ -51,7 +54,7 @@ internal class CurlRequestData @OptIn(ExperimentalForeignApi::class) constructor val content: ByteReadChannel, val contentLength: Long, val connectTimeout: Long?, - val executionContext: Job, + val callContext: Job, val isUpgradeRequest: Boolean, val forceProxyTunneling: Boolean, val sslVerify: Boolean, diff --git a/ktor-client/ktor-client-tests/common/test/io/ktor/client/tests/ContentTest.kt b/ktor-client/ktor-client-tests/common/test/io/ktor/client/tests/ContentTest.kt index 5b8132f4f76..5df28a078d9 100644 --- a/ktor-client/ktor-client-tests/common/test/io/ktor/client/tests/ContentTest.kt +++ b/ktor-client/ktor-client-tests/common/test/io/ktor/client/tests/ContentTest.kt @@ -19,11 +19,13 @@ import io.ktor.http.content.* import io.ktor.serialization.kotlinx.json.* import io.ktor.utils.io.* import io.ktor.utils.io.core.* -import kotlinx.coroutines.cancel -import kotlinx.coroutines.withTimeoutOrNull +import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException import kotlinx.io.readByteArray import kotlin.test.* +import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.minutes +import kotlin.time.Duration.Companion.seconds private val testSize = listOf( 0, @@ -383,7 +385,7 @@ class ContentTest : ClientLoader() { @Test fun testDownloadStreamArrayWithTimeout() = clientTests { test { client -> - val result: ByteArray? = withTimeoutOrNull(100) { + val result: ByteArray? = withTimeoutOrNull(100.milliseconds) { client.get("$TEST_SERVER/content/stream").body() } @@ -391,6 +393,28 @@ class ContentTest : ClientLoader() { } } + // Flaky on Apache: KTOR-9544 + @Test + fun testBodyChannelCancelledWhenCallerScopeIsCancelled() = clientTests(except("Apache5")) { + test { client -> + val bodyDeferred = CompletableDeferred() + coroutineScope { + val job = launch { + val body = client.prepareGet("$TEST_SERVER/content/stream?delay=5000").body() + bodyDeferred.complete(body) + awaitCancellation() + } + val body = bodyDeferred.await() + + val cause = CancellationException("Test exception") + job.cancel(cause) + + waitForCondition("body to be closed", timeout = 2.seconds) { body.closedCause != null } + assertEquals(cause.message, body.closedCause!!.message) + } + } + } + /** * This is a bit of an edge case where the initial content reader fails to read the response body * before a second reader comes in. When this happens, we simply cancel the initial reader. @@ -407,7 +431,7 @@ class ContentTest : ClientLoader() { HttpResponseValidator { validateResponse { response -> val channel = response.rawContent - for (i in 0..100) { + repeat(100) { assertEquals(expected, channel.readByteArray(expected.length).decodeToString()) } } diff --git a/ktor-client/ktor-client-tests/common/test/io/ktor/client/tests/utils/WaitUtils.kt b/ktor-client/ktor-client-tests/common/test/io/ktor/client/tests/utils/WaitUtils.kt index ef12d5f5b16..4f72e4469e9 100644 --- a/ktor-client/ktor-client-tests/common/test/io/ktor/client/tests/utils/WaitUtils.kt +++ b/ktor-client/ktor-client-tests/common/test/io/ktor/client/tests/utils/WaitUtils.kt @@ -6,20 +6,23 @@ package io.ktor.client.tests.utils import kotlinx.coroutines.delay import kotlin.test.assertTrue +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.seconds +import kotlin.time.TimeSource suspend fun waitForCondition( - description: String? = null, - waitIncrement: Long = 200L, - waitLimit: Long = 10_000L, + description: String, + timeout: Duration = 10.seconds, + delay: Duration = (timeout / 10).coerceAtMost(100.milliseconds), condition: () -> Boolean, ) { - var waitTime = 0L - while (waitTime < waitLimit) { + val timeMark = TimeSource.Monotonic.markNow() + timeout + while (timeMark.hasNotPassedNow()) { if (condition()) { return } - delay(waitIncrement) - waitTime += waitIncrement + delay(delay) } - assertTrue(condition(), "Timed out after ${waitLimit / 1000}s waiting for ${description ?: condition}") + assertTrue(condition(), "Timed out after $timeout waiting for $description") }