Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import io.ktor.http.*
import io.ktor.http.cio.*
import io.ktor.util.date.*
import io.ktor.utils.io.*
import kotlinx.coroutines.job

internal class CurlClientEngine(
override val config: CurlClientEngineConfig
Expand All @@ -30,7 +31,7 @@ internal class CurlClientEngine(

val requestTime = GMTDate()

val curlRequest = data.toCurlRequest(config)
val curlRequest = data.toCurlRequest(config, callContext.job)
val responseData = curlProcessor.executeRequest(curlRequest)

return with(responseData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ internal class CurlProcessor(coroutineContext: CoroutineContext) {
val (requestData, completionHandler) = task
val requestHandler = api.scheduleRequest(requestData, completionHandler)

val requestCleaner = requestData.executionContext.invokeOnCompletion { cause ->
val requestCleaner = requestData.callContext.invokeOnCompletion { cause ->
if (cause == null) return@invokeOnCompletion
cancelRequest(requestHandler, cause)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ internal class CurlMultiApiHandler : Closeable {
wsConfig.maxFrameSize,
)
} else {
CurlHttpResponseBody(request.executionContext) {
CurlHttpResponseBody(request.callContext) {
unpauseEasyHandle(easyHandle)
}
}
Expand Down Expand Up @@ -138,10 +138,7 @@ internal class CurlMultiApiHandler : Closeable {
val easyHandle = websocket.easyHandle
val handler = activeHandles[easyHandle] ?: return
if (handler.responseWrapper.get() !== websocket) return
activeHandles.remove(easyHandle)
processCancelledEasyHandle(easyHandle, cause)
handler.responseCompletable.completeExceptionally(cause)
handler.dispose()
removeEasyHandle(easyHandle, cause)
}

fun perform(transfersRunning: IntVarOf<Int>) {
Expand Down Expand Up @@ -207,7 +204,7 @@ internal class CurlMultiApiHandler : Closeable {
private fun setupUploadContent(easyHandle: EasyHandle, request: CurlRequestData): COpaquePointer {
val requestPointer = CurlRequestBodyData(
body = request.content,
callContext = request.executionContext,
callContext = request.callContext,
onUnpause = {
unpauseEasyHandle(easyHandle)
}
Expand All @@ -221,11 +218,8 @@ internal class CurlMultiApiHandler : Closeable {
}

private fun handleCompleted() {
for (cancellation in cancelledHandles) {
val handler = activeHandles.remove(cancellation.first) ?: continue
processCancelledEasyHandle(cancellation.first, cancellation.second)
handler.responseCompletable.completeExceptionally(cancellation.second)
handler.dispose()
for ((easyHandle, cause) in cancelledHandles) {
removeEasyHandle(easyHandle, cause)
}
cancelledHandles.clear()

Expand Down Expand Up @@ -256,6 +250,16 @@ internal class CurlMultiApiHandler : Closeable {
}
}

private fun removeEasyHandle(easyHandle: EasyHandle, cause: Throwable) {
val handler = activeHandles.remove(easyHandle) ?: return
try {
processCancelledEasyHandle(easyHandle, cause)
} finally {
handler.responseCompletable.completeExceptionally(cause)
handler.dispose()
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

private fun processCancelledEasyHandle(easyHandle: EasyHandle, cause: Throwable): CurlFail = memScoped {
try {
val responseDataRef = alloc<COpaquePointerVar>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import io.ktor.client.plugins.*
import io.ktor.client.request.*
import io.ktor.http.*
import io.ktor.http.content.*
import io.ktor.util.Attributes
import io.ktor.util.*
import io.ktor.utils.io.*
import io.ktor.utils.io.core.*
import kotlinx.cinterop.CPointer
Expand All @@ -24,7 +24,10 @@ import libcurl.curl_slist
import kotlin.coroutines.coroutineContext

@OptIn(ExperimentalForeignApi::class, InternalAPI::class)
internal suspend fun HttpRequestData.toCurlRequest(config: CurlClientEngineConfig): CurlRequestData = CurlRequestData(
internal suspend fun HttpRequestData.toCurlRequest(
config: CurlClientEngineConfig,
callContext: Job,
): CurlRequestData = CurlRequestData(
protocol = url.protocol.name,
url = url.toString(),
method = method.value,
Expand All @@ -33,13 +36,13 @@ internal suspend fun HttpRequestData.toCurlRequest(config: CurlClientEngineConfi
content = body.toByteChannel(),
contentLength = body.contentLength ?: headers[HttpHeaders.ContentLength]?.toLongOrNull() ?: -1L,
connectTimeout = getCapabilityOrNull(HttpTimeoutCapability)?.connectTimeoutMillis,
executionContext = executionContext,
callContext = callContext,
isUpgradeRequest = isUpgradeRequest(),
forceProxyTunneling = config.forceProxyTunneling,
sslVerify = config.sslVerify,
caInfo = config.caInfo,
caPath = config.caPath,
attributes = attributes
attributes = attributes,
)

internal class CurlRequestData @OptIn(ExperimentalForeignApi::class) constructor(
Expand All @@ -51,7 +54,7 @@ internal class CurlRequestData @OptIn(ExperimentalForeignApi::class) constructor
val content: ByteReadChannel,
val contentLength: Long,
val connectTimeout: Long?,
val executionContext: Job,
val callContext: Job,
val isUpgradeRequest: Boolean,
val forceProxyTunneling: Boolean,
val sslVerify: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import io.ktor.http.content.*
import io.ktor.serialization.kotlinx.json.*
import io.ktor.utils.io.*
import io.ktor.utils.io.core.*
import kotlinx.coroutines.cancel
import kotlinx.coroutines.withTimeoutOrNull
import kotlinx.coroutines.*
import kotlinx.coroutines.CancellationException
import kotlinx.io.readByteArray
import kotlin.test.*
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds

private val testSize = listOf(
0,
Expand Down Expand Up @@ -383,14 +385,36 @@ class ContentTest : ClientLoader() {
@Test
fun testDownloadStreamArrayWithTimeout() = clientTests {
test { client ->
val result: ByteArray? = withTimeoutOrNull(100) {
val result: ByteArray? = withTimeoutOrNull(100.milliseconds) {
client.get("$TEST_SERVER/content/stream").body<ByteArray>()
}

assertNull(result)
}
}

// Flaky on Apache: KTOR-9544
@Test
fun testBodyChannelCancelledWhenCallerScopeIsCancelled() = clientTests(except("Apache5")) {
test { client ->
val bodyDeferred = CompletableDeferred<ByteReadChannel>()
coroutineScope {
val job = launch {
val body = client.prepareGet("$TEST_SERVER/content/stream?delay=5000").body<ByteReadChannel>()
bodyDeferred.complete(body)
awaitCancellation()
}
val body = bodyDeferred.await()

val cause = CancellationException("Test exception")
job.cancel(cause)

waitForCondition("body to be closed", timeout = 2.seconds) { body.closedCause != null }
assertEquals(cause.message, body.closedCause!!.message)
}
}
}

/**
* This is a bit of an edge case where the initial content reader fails to read the response body
* before a second reader comes in. When this happens, we simply cancel the initial reader.
Expand All @@ -407,7 +431,7 @@ class ContentTest : ClientLoader() {
HttpResponseValidator {
validateResponse { response ->
val channel = response.rawContent
for (i in 0..100) {
repeat(100) {
assertEquals(expected, channel.readByteArray(expected.length).decodeToString())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,23 @@ package io.ktor.client.tests.utils

import kotlinx.coroutines.delay
import kotlin.test.assertTrue
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
import kotlin.time.TimeSource

suspend fun waitForCondition(
description: String? = null,
waitIncrement: Long = 200L,
waitLimit: Long = 10_000L,
description: String,
timeout: Duration = 10.seconds,
delay: Duration = (timeout / 10).coerceAtMost(100.milliseconds),
condition: () -> Boolean,
) {
var waitTime = 0L
while (waitTime < waitLimit) {
val timeMark = TimeSource.Monotonic.markNow() + timeout
while (timeMark.hasNotPassedNow()) {
if (condition()) {
return
}
delay(waitIncrement)
waitTime += waitIncrement
delay(delay)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
assertTrue(condition(), "Timed out after ${waitLimit / 1000}s waiting for ${description ?: condition}")
assertTrue(condition(), "Timed out after $timeout waiting for $description")
}
Loading