KTOR-9544 Apache: Attach response body channel to callContext#5566
KTOR-9544 Apache: Attach response body channel to callContext#5566
Conversation
…card data after cancellation Attach the response body channel to `callContext`'s job directly (matching CIO/OkHttp/Apache pattern) so caller-scope cancellation propagates the exact cause to `closedCause`. Previously the channel was attached to an intermediate `consumerJob`, causing cause wrapping that made `closedCause` unreachable. Also silently discard data in `consume()` when the channel is closed instead of throwing. Throwing (even `IOException` per the interface contract) triggers Apache's error-recovery path mid-body-stream, which either corrupts connection pool state or causes a `RejectedExecutionException` on an already-shutdown scheduler. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
📝 WalkthroughWalkthroughAdjustments to Apache5 client's coroutine job lifecycle and channel closure handling: internal job wiring now anchors to Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 7/8 reviews remaining, refill in 7 minutes and 30 seconds.Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@ktor-client/ktor-client-apache5/jvm/src/io/ktor/client/engine/apache5/ApacheResponseConsumer.kt`:
- Around line 104-107: The cancellation currently calls
responseChannel.cancel(cause) in parentContext.job.invokeOnCompletion which can
let already-queued payloads reach the writer and have writeFully throw, aborting
the queue coroutine before a CloseChannel is processed; to fix, change the
cancellation handler to close the channel (responseChannel.close(cause)) or
otherwise atomically mark it closed so queued items are protected, and harden
the queue/writer loop that calls writeFully to catch exceptions (including from
writes after cancellation), swallow or translate them into a terminal state, and
always enqueue/dispatch the CloseChannel sentinel; update the code paths around
parentContext.job.invokeOnCompletion, responseChannel usage, the writeFully
calls, and the CloseChannel enqueue logic so queued chunks cannot cause the
queue coroutine to abort before CloseChannel is consumed.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: a5467358-b0c2-4c9e-a2e1-e38a8ee02caf
📒 Files selected for processing (2)
ktor-client/ktor-client-apache5/jvm/src/io/ktor/client/engine/apache5/ApacheResponseConsumer.ktktor-client/ktor-client-tests/common/test/io/ktor/client/tests/ContentTest.kt
| parentContext.job.invokeOnCompletion(onCancelling = true) { cause -> | ||
| if (cause != null) { | ||
| responseChannel.cancel(cause) | ||
| } |
There was a problem hiding this comment.
Protect queued writes after cancellation to avoid coroutine abort before close callback.
Line [147] drops only new chunks. Chunks queued before cancellation can still hit Line [117] after Line [104] cancels the channel, where writeFully may throw and terminate the queue coroutine before CloseChannel is consumed.
💡 Proposed fix
is ByteBuffer -> {
+ if (channel.isClosedForWrite) continue
val written = message.remaining()
- channel.writeFully(message)
- channel.flush()
+ try {
+ channel.writeFully(message)
+ channel.flush()
+ } catch (cause: Throwable) {
+ if (channel.isClosedForWrite || cause is CancellationException) continue
+ throw cause
+ }
when (val channel = capacityChannel) {
null -> capacity.addAndGet(written)
else -> channel.update(written)
}
}Also applies to: 115-118, 143-149
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@ktor-client/ktor-client-apache5/jvm/src/io/ktor/client/engine/apache5/ApacheResponseConsumer.kt`
around lines 104 - 107, The cancellation currently calls
responseChannel.cancel(cause) in parentContext.job.invokeOnCompletion which can
let already-queued payloads reach the writer and have writeFully throw, aborting
the queue coroutine before a CloseChannel is processed; to fix, change the
cancellation handler to close the channel (responseChannel.close(cause)) or
otherwise atomically mark it closed so queued items are protected, and harden
the queue/writer loop that calls writeFully to catch exceptions (including from
writes after cancellation), swallow or translate them into a terminal state, and
always enqueue/dispatch the CloseChannel sentinel; update the code paths around
parentContext.job.invokeOnCompletion, responseChannel usage, the writeFully
calls, and the CloseChannel enqueue logic so queued chunks cannot cause the
queue coroutine to abort before CloseChannel is consumed.
Subsystem
ktor-client-apache5
Motivation
KTOR-9544 Apache: body channel not cancelled when caller scope is cancelled
Solution
Attach the response body channel to
callContext's job directly (matching CIO/OkHttp/Apache pattern) so caller-scope cancellation propagates the exact cause toclosedCause. Previously the channel was attached to an intermediateconsumerJob, causing cause wrapping that madeclosedCauseunreachable.Also silently discard data in
consume()when the channel is closed instead of throwing. Throwing (evenIOExceptionper the interface contract) triggers Apache's error-recovery path mid-body-stream, which either corrupts connection pool state or causes aRejectedExecutionExceptionon an already-shutdown scheduler.