Skip to content

Commit 38c7b22

Browse files
committed
Implement interactive container attach
1 parent 946980c commit 38c7b22

File tree

6 files changed

+159
-20
lines changed

6 files changed

+159
-20
lines changed

api-client/src/main/java/de/gesellix/docker/remote/api/core/StreamCallback.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,21 @@
11
package de.gesellix.docker.remote.api.core;
22

3+
import okio.Sink;
4+
35
public interface StreamCallback<T> {
46

57
default void onStarting(Cancellable cancellable) {
68
}
79

10+
default void attachInput(Sink sink) {
11+
try {
12+
Thread.sleep(500);
13+
sink.close();
14+
} catch (Exception ignored) {
15+
}
16+
throw new IllegalStateException("Falling back to default implementation that closes the sink after 500ms. This is probably not what you want. Please provide a custom implementation of this method to handle the input stream.");
17+
}
18+
819
void onNext(T element);
920

1021
default void onFailed(Exception e) {

api-client/src/main/kotlin/de/gesellix/docker/remote/api/client/ContainerApi.kt

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import de.gesellix.docker.remote.api.core.ServerError
4141
import de.gesellix.docker.remote.api.core.ServerException
4242
import de.gesellix.docker.remote.api.core.StreamCallback
4343
import de.gesellix.docker.remote.api.core.Success
44+
import de.gesellix.docker.remote.api.core.SuccessBidirectionalStream
4445
import de.gesellix.docker.remote.api.core.SuccessStream
4546
import kotlinx.coroutines.cancel
4647
import kotlinx.coroutines.launch
@@ -216,14 +217,29 @@ class ContainerApi(dockerClientConfig: DockerClientConfig = defaultClientConfig,
216217
when (localVarResponse.responseType) {
217218
ResponseType.Success,
218219
ResponseType.Informational -> {
219-
runBlocking {
220-
launch {
221-
withTimeout(timeoutMillis) {
222-
callback.onStarting(this@launch::cancel)
223-
(localVarResponse as SuccessStream<Frame>).data.collect { callback.onNext(it) }
224-
callback.onFinished()
220+
when (localVarResponse) {
221+
is SuccessBidirectionalStream ->
222+
runBlocking {
223+
launch {
224+
withTimeout(timeoutMillis) {
225+
callback.onStarting(this@launch::cancel)
226+
callback.attachInput(localVarResponse.socket.sink)
227+
localVarResponse.data.collect { callback.onNext(it) }
228+
callback.onFinished()
229+
}
230+
}
231+
}
232+
233+
else ->
234+
runBlocking {
235+
launch {
236+
withTimeout(timeoutMillis) {
237+
callback.onStarting(this@launch::cancel)
238+
(localVarResponse as SuccessStream<Frame>).data.collect { callback.onNext(it) }
239+
callback.onFinished()
240+
}
241+
}
225242
}
226-
}
227243
}
228244
}
229245
ResponseType.Redirection -> throw UnsupportedOperationException("Client does not support Redirection responses.")
@@ -282,12 +298,17 @@ class ContainerApi(dockerClientConfig: DockerClientConfig = defaultClientConfig,
282298
}
283299
}
284300
val localVariableHeaders: MutableMap<String, String> = mutableMapOf()
285-
val requiresConnectionUpgrade = stdin != null
301+
val requiresConnectionUpgrade = stdin != null && stdin
286302
if (requiresConnectionUpgrade)
287303
localVariableHeaders.apply {
288-
put("Upgrade", "tcp")
289304
put("Connection", "Upgrade")
305+
put("Upgrade", "tcp")
290306
}
307+
// else
308+
// localVariableHeaders.apply {
309+
// put("Connection", "Upgrade")
310+
// put("Upgrade", "tcp")
311+
// }
291312

292313
return RequestConfig(
293314
method = POST,

api-client/src/main/kotlin/de/gesellix/docker/remote/api/core/ApiClient.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,8 +319,9 @@ open class ApiClient(
319319
response.code,
320320
response.headers.toMultimap()
321321
)
322-
response.code == 101 && request.isTcpUpgrade() && response.isTcpUpgrade() -> return SuccessStream(
322+
response.code == 101 && request.isTcpUpgrade() && response.isTcpUpgrade() -> return SuccessBidirectionalStream(
323323
response.socket.consumeFrames(mediaType),
324+
response.socket!!,
324325
response.code,
325326
response.headers.toMultimap()
326327
)

api-client/src/main/kotlin/de/gesellix/docker/remote/api/core/ApiInfrastructureResponse.kt

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package de.gesellix.docker.remote.api.core
22

33
import kotlinx.coroutines.flow.Flow
4+
import okio.Socket
45

56
enum class ResponseType {
67
Success, Informational, Redirection, ClientError, ServerError
@@ -14,8 +15,15 @@ abstract class ApiInfrastructureResponse<T>(val responseType: ResponseType) : Re
1415
abstract val headers: Map<String, List<String>>
1516
}
1617

17-
class SuccessStream<T>(
18-
val data: Flow<T>,
18+
class SuccessBidirectionalStream<T>(
19+
override val data: Flow<T>,
20+
val socket: Socket,
21+
override val statusCode: Int = -1,
22+
override val headers: Map<String, List<String>> = mapOf()
23+
) : SuccessStream<T>(data, statusCode, headers)
24+
25+
open class SuccessStream<T>(
26+
open val data: Flow<T>,
1927
override val statusCode: Int = -1,
2028
override val headers: Map<String, List<String>> = mapOf()
2129
) : ApiInfrastructureResponse<T>(ResponseType.Success)

api-client/src/main/kotlin/de/gesellix/docker/remote/api/core/ResponseConsumer.kt

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,7 @@ fun Socket?.consumeFrames(mediaType: String?): Flow<Frame> {
8181
return emptyFlow()
8282
}
8383
when (mediaType) {
84-
// Requires api v1.42
85-
// multiplexed-stream: without attached Tty
8684
ApiClient.Companion.DockerMultiplexedStreamMediaType,
87-
// Requires api v1.42
88-
// raw-stream: with attached Tty
8985
ApiClient.Companion.DockerRawStreamMediaType -> {
9086
val reader = FrameReader(source, mediaType)
9187
val events = flow {
@@ -109,11 +105,7 @@ fun ResponseBody?.consumeFrames(mediaType: String?): Flow<Frame> {
109105
return emptyFlow()
110106
}
111107
when (mediaType) {
112-
// Requires api v1.42
113-
// multiplexed-stream: without attached Tty
114108
ApiClient.Companion.DockerMultiplexedStreamMediaType,
115-
// Requires api v1.42
116-
// raw-stream: with attached Tty
117109
ApiClient.Companion.DockerRawStreamMediaType -> {
118110
val reader = FrameReader(source(), mediaType)
119111
val events = flow {

api-client/src/test/java/de/gesellix/docker/remote/api/client/ContainerApiIntegrationTest.java

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@
2626
import de.gesellix.docker.remote.api.testutil.InjectDockerClient;
2727
import de.gesellix.docker.remote.api.testutil.TarUtil;
2828
import de.gesellix.docker.remote.api.testutil.TestImage;
29+
import okio.BufferedSink;
2930
import okio.Okio;
31+
import okio.Sink;
32+
3033
import org.junit.jupiter.api.BeforeEach;
3134
import org.junit.jupiter.api.Test;
3235
import org.slf4j.Logger;
@@ -759,6 +762,8 @@ public void containerStatsOnce() {
759762

760763
@Test
761764
public void containerAttachNonInteractive() {
765+
removeContainer(engineApiClient, "container-attach-non-interactive-test");
766+
762767
imageApi.imageCreate(testImage.getImageName(), null, null, testImage.getImageTag(), null, null, null, null, null);
763768

764769
ContainerCreateRequest containerCreateRequest = new ContainerCreateRequest(
@@ -816,6 +821,107 @@ public void run() {
816821
removeContainer(engineApiClient, "container-attach-non-interactive-test");
817822
}
818823

824+
@Test
825+
public void containerAttachInteractive() {
826+
removeContainer(engineApiClient, "container-attach-interactive-test");
827+
828+
imageApi.imageCreate(testImage.getImageName(), null, null, testImage.getImageTag(), null, null, null, null, null);
829+
830+
ContainerCreateRequest containerCreateRequest = new ContainerCreateRequest(
831+
null, null, null,
832+
true, true, true,
833+
null,
834+
true, true, null,
835+
null,
836+
null,
837+
null,
838+
null,
839+
testImage.getImageWithTag(),
840+
null, null, singletonList("/cat"),
841+
null, null,
842+
null,
843+
singletonMap(LABEL_KEY, LABEL_VALUE),
844+
null, null,
845+
null,
846+
null,
847+
null
848+
);
849+
containerApi.containerCreate(containerCreateRequest, "container-attach-interactive-test");
850+
containerApi.containerStart("container-attach-interactive-test", null);
851+
852+
Duration timeout = Duration.of(5, SECONDS);
853+
LogFrameAndAttachStreamCallback callback = new LogFrameAndAttachStreamCallback() {
854+
@Override
855+
public void attachInput(Sink sink) {
856+
System.out.println("attachInput, sending data...");
857+
new Thread(() -> {
858+
BufferedSink buffer = Okio.buffer(sink);
859+
try {
860+
buffer.writeUtf8("hello echo\n");
861+
buffer.flush();
862+
System.out.println("... data sent");
863+
} catch (IOException e) {
864+
e.printStackTrace();
865+
System.err.println("Failed to write to stdin: " + e.getMessage());
866+
} finally {
867+
try {
868+
Thread.sleep(100);
869+
sink.close();
870+
} catch (Exception ignored) {
871+
// ignore
872+
}
873+
}
874+
}).start();
875+
}
876+
};
877+
878+
new Thread(() -> containerApi.containerAttach(
879+
"container-attach-interactive-test",
880+
null, true, true, true, true, true,
881+
callback, timeout.toMillis())).start();
882+
883+
CountDownLatch wait = new CountDownLatch(1);
884+
new Timer().schedule(new TimerTask() {
885+
@Override
886+
public void run() {
887+
if (callback.job != null) {
888+
callback.job.cancel();
889+
}
890+
wait.countDown();
891+
}
892+
}, 5000);
893+
894+
try {
895+
wait.await();
896+
}
897+
catch (InterruptedException e) {
898+
e.printStackTrace();
899+
}
900+
assertSame(Frame.StreamType.RAW, callback.frames.stream().findAny().get().getStreamType());
901+
assertEquals(
902+
"hello echo\nhello echo".replaceAll("[\\n\\r]", ""),
903+
callback.frames.stream().map(Frame::getPayloadAsString).collect(Collectors.joining()).replaceAll("[\\n\\r]", ""));
904+
905+
removeContainer(engineApiClient, "container-attach-interactive-test");
906+
}
907+
908+
static class LogFrameAndAttachStreamCallback implements StreamCallback<Frame> {
909+
910+
List<Frame> frames = new ArrayList<>();
911+
Cancellable job = null;
912+
913+
@Override
914+
public void onStarting(Cancellable cancellable) {
915+
job = cancellable;
916+
}
917+
918+
@Override
919+
public void onNext(Frame frame) {
920+
frames.add(frame);
921+
log.info("next: {}", frame);
922+
}
923+
}
924+
819925
static class LogFrameStreamCallback implements StreamCallback<Frame> {
820926

821927
List<Frame> frames = new ArrayList<>();

0 commit comments

Comments
 (0)