Skip to content

Commit 0e4d35e

Browse files
authored
Implement interactive/attached exec (#663)
1 parent e73d1f7 commit 0e4d35e

File tree

3 files changed

+158
-15
lines changed

3 files changed

+158
-15
lines changed

api-client/src/main/kotlin/de/gesellix/docker/remote/api/client/ExecApi.kt

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import de.gesellix.docker.remote.api.core.ServerError
3131
import de.gesellix.docker.remote.api.core.ServerException
3232
import de.gesellix.docker.remote.api.core.StreamCallback
3333
import de.gesellix.docker.remote.api.core.Success
34+
import de.gesellix.docker.remote.api.core.SuccessBidirectionalStream
3435
import de.gesellix.docker.remote.api.core.SuccessStream
3536
import kotlinx.coroutines.cancel
3637
import kotlinx.coroutines.launch
@@ -239,11 +240,6 @@ class ExecApi(dockerClientConfig: DockerClientConfig = defaultClientConfig, prox
239240
) {
240241
val localVariableConfig = execStartRequestConfig(id = id, execStartConfig = execStartConfig)
241242

242-
val expectMultiplexedResponse: Boolean = if (execStartConfig?.tty != null) {
243-
!(execStartConfig.tty ?: false)
244-
} else {
245-
!(execInspect(id).processConfig?.tty ?: false)
246-
}
247243
val localVarResponse = requestFrames(
248244
localVariableConfig
249245
)
@@ -256,18 +252,32 @@ class ExecApi(dockerClientConfig: DockerClientConfig = defaultClientConfig, prox
256252
val actualCallback = callback ?: LoggingCallback<Frame?>()
257253

258254
when (localVarResponse.responseType) {
259-
ResponseType.Success -> {
260-
runBlocking {
261-
launch {
262-
withTimeoutOrNull(timeout.toMillis()) {
263-
actualCallback.onStarting(this@launch::cancel)
264-
(localVarResponse as SuccessStream<Frame>).data.collect { actualCallback.onNext(it) }
265-
actualCallback.onFinished()
255+
ResponseType.Success,
256+
ResponseType.Informational -> {
257+
when (localVarResponse) {
258+
is SuccessBidirectionalStream ->
259+
runBlocking {
260+
launch {
261+
withTimeoutOrNull(timeout.toMillis()) {
262+
actualCallback.onStarting(this@launch::cancel)
263+
actualCallback.attachInput(localVarResponse.socket.sink)
264+
localVarResponse.data.collect { actualCallback.onNext(it) }
265+
actualCallback.onFinished()
266+
}
267+
}
268+
}
269+
else ->
270+
runBlocking {
271+
launch {
272+
withTimeoutOrNull(timeout.toMillis()) {
273+
actualCallback.onStarting(this@launch::cancel)
274+
(localVarResponse as SuccessStream<Frame>).data.collect { actualCallback.onNext(it) }
275+
actualCallback.onFinished()
276+
}
277+
}
266278
}
267-
}
268279
}
269280
}
270-
ResponseType.Informational -> throw UnsupportedOperationException("Client does not support Informational responses.")
271281
ResponseType.Redirection -> throw UnsupportedOperationException("Client does not support Redirection responses.")
272282
ResponseType.ClientError -> {
273283
val localVarError = localVarResponse as ClientError<*>
@@ -292,6 +302,18 @@ class ExecApi(dockerClientConfig: DockerClientConfig = defaultClientConfig, prox
292302
val localVariableQuery: MultiValueMap = mutableMapOf()
293303
val localVariableHeaders: MutableMap<String, String> = mutableMapOf()
294304

305+
// val expectMultiplexedResponse: Boolean = if (execStartConfig?.tty != null) {
306+
// !(execStartConfig.tty ?: false)
307+
// } else {
308+
// !(execInspect(id).processConfig?.tty ?: false)
309+
// }
310+
val requiresConnectionUpgrade = execStartConfig?.tty != null && execStartConfig.tty!!
311+
if (requiresConnectionUpgrade)
312+
localVariableHeaders.apply {
313+
put("Connection", "Upgrade")
314+
put("Upgrade", "tcp")
315+
}
316+
295317
return RequestConfig(
296318
method = POST,
297319
path = "/exec/{id}/start".replace("{" + "id" + "}", id),

api-client/src/test/java/de/gesellix/docker/remote/api/client/ExecApiIntegrationTest.java

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,37 @@
66
import de.gesellix.docker.remote.api.ExecInspectResponse;
77
import de.gesellix.docker.remote.api.ExecStartConfig;
88
import de.gesellix.docker.remote.api.IdResponse;
9+
import de.gesellix.docker.remote.api.core.Frame;
910
import de.gesellix.docker.remote.api.testutil.DockerEngineAvailable;
1011
import de.gesellix.docker.remote.api.testutil.InjectDockerClient;
1112
import de.gesellix.docker.remote.api.testutil.TestImage;
13+
import okio.BufferedSink;
14+
import okio.Okio;
15+
import okio.Sink;
16+
1217
import org.junit.jupiter.api.BeforeEach;
1318
import org.junit.jupiter.api.Test;
1419

1520
import static de.gesellix.docker.remote.api.testutil.Constants.LABEL_KEY;
1621
import static de.gesellix.docker.remote.api.testutil.Constants.LABEL_VALUE;
22+
import static de.gesellix.docker.remote.api.testutil.Failsafe.perform;
1723
import static de.gesellix.docker.remote.api.testutil.Failsafe.removeContainer;
24+
import static java.time.temporal.ChronoUnit.SECONDS;
1825
import static java.util.Arrays.asList;
26+
import static java.util.Collections.singletonList;
1927
import static java.util.Collections.singletonMap;
28+
import static org.junit.jupiter.api.Assertions.assertEquals;
2029
import static org.junit.jupiter.api.Assertions.assertFalse;
2130
import static org.junit.jupiter.api.Assertions.assertNotNull;
31+
import static org.junit.jupiter.api.Assertions.assertSame;
32+
import static org.junit.jupiter.api.Assertions.assertTrue;
33+
34+
import java.io.IOException;
35+
import java.time.Duration;
36+
import java.util.Timer;
37+
import java.util.TimerTask;
38+
import java.util.concurrent.CountDownLatch;
39+
import java.util.stream.Collectors;
2240

2341
@DockerEngineAvailable
2442
class ExecApiIntegrationTest {
@@ -83,4 +101,105 @@ public void containerExecNonInteractive() {
83101

84102
removeContainer(engineApiClient, "container-exec-test");
85103
}
104+
105+
@Test
106+
public void containerExecInteractive() {
107+
removeContainer(engineApiClient, "container-exec-interactive-test");
108+
109+
imageApi.imageCreate(testImage.getImageName(), null, null, testImage.getImageTag(), null, null, null, null, null);
110+
111+
ContainerCreateRequest containerCreateRequest = new ContainerCreateRequest(
112+
null, null, null,
113+
true, true, true,
114+
null,
115+
true, true, null,
116+
null,
117+
null,
118+
null,
119+
null,
120+
testImage.getImageWithTag(),
121+
null, null, null,
122+
null, null,
123+
null,
124+
singletonMap(LABEL_KEY, LABEL_VALUE),
125+
null, null,
126+
null,
127+
null,
128+
null
129+
);
130+
containerApi.containerCreate(containerCreateRequest, "container-exec-interactive-test");
131+
containerApi.containerStart("container-exec-interactive-test", null);
132+
133+
IdResponse exec = execApi.containerExec(
134+
"container-exec-interactive-test",
135+
new ExecConfig(true, true, true, null, null, true,
136+
null,
137+
singletonList("/cat"),
138+
null, null, null));
139+
assertNotNull(exec.getId());
140+
141+
Duration timeout = Duration.of(5, SECONDS);
142+
LogFrameStreamCallback callback = new LogFrameStreamCallback() {
143+
@Override
144+
public void attachInput(Sink sink) {
145+
System.out.println("attachInput, sending data...");
146+
new Thread(() -> {
147+
BufferedSink buffer = Okio.buffer(sink);
148+
try {
149+
buffer.writeUtf8("hello echo\n");
150+
buffer.flush();
151+
System.out.println("... data sent");
152+
} catch (IOException e) {
153+
e.printStackTrace();
154+
System.err.println("Failed to write to stdin: " + e.getMessage());
155+
} finally {
156+
try {
157+
Thread.sleep(100);
158+
sink.close();
159+
} catch (Exception ignored) {
160+
// ignore
161+
}
162+
}
163+
}).start();
164+
}
165+
};
166+
167+
new Thread(() -> execApi.execStart(
168+
exec.getId(),
169+
new ExecStartConfig(false, true, null),
170+
callback, timeout.toMillis())).start();
171+
172+
CountDownLatch wait = new CountDownLatch(1);
173+
new Timer().schedule(new TimerTask() {
174+
@Override
175+
public void run() {
176+
if (callback.job != null) {
177+
callback.job.cancel();
178+
}
179+
wait.countDown();
180+
}
181+
}, 5000);
182+
183+
try {
184+
wait.await();
185+
}
186+
catch (InterruptedException e) {
187+
e.printStackTrace();
188+
}
189+
190+
ExecInspectResponse execInspect = execApi.execInspect(exec.getId());
191+
assertTrue(execInspect.getRunning());
192+
193+
assertSame(Frame.StreamType.RAW, callback.frames.stream().findAny().get().getStreamType());
194+
assertEquals(
195+
"hello echo\nhello echo".replaceAll("[\\n\\r]", ""),
196+
callback.frames.stream().map(Frame::getPayloadAsString).collect(Collectors.joining()).replaceAll("[\\n\\r]", ""));
197+
198+
removeContainer(engineApiClient, "container-exec-interactive-test");
199+
200+
perform(() -> {
201+
ExecInspectResponse execInspectAfterStop = execApi.execInspect(exec.getId());
202+
assertFalse(execInspectAfterStop.getRunning());
203+
});
204+
}
86205
}

build.gradle.kts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ subprojects {
4141
repositories {
4242
// mavenLocal()
4343
// listOf<String>(
44-
//// "gesellix/okhttp",
44+
// "gesellix/okhttp",
4545
//// "docker-client/*",
4646
// ).forEach { slug ->
4747
//// fun findProperty(s: String) = project.findProperty(s) as String?
@@ -71,6 +71,8 @@ allprojects {
7171
}
7272
}
7373
// dependencySubstitution {
74+
// substitute(module("com.squareup.okhttp3:okhttp"))
75+
// .using(module("de.gesellix.okhttp3-forked:okhttp:${libs.versions.okhttp.get()}"))
7476
// substitute(module("com.squareup.okhttp3:okhttp-jvm"))
7577
// .using(module("de.gesellix.okhttp3-forked:okhttp-jvm:${libs.versions.okhttp.get()}"))
7678
// }

0 commit comments

Comments
 (0)