Skip to content

Commit b1502df

Browse files
committed
Add a multiplexed stream decoder.
See https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach for the stream format documentation. Handling the .../logs endpoints would work without the custom interceptor, when the response content-type would be set correctly :-/ See #21 for details.
1 parent 323cd67 commit b1502df

File tree

3 files changed

+128
-0
lines changed

3 files changed

+128
-0
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package de.gesellix.docker.engine.client.infrastructure;
2+
3+
import java.nio.charset.StandardCharsets;
4+
5+
public class Frame {
6+
7+
private final StreamType streamType;
8+
private final byte[] payload;
9+
10+
public Frame(StreamType streamType, byte[] payload) {
11+
this.streamType = streamType;
12+
this.payload = payload;
13+
}
14+
15+
public StreamType getStreamType() {
16+
return streamType;
17+
}
18+
19+
public byte[] getPayload() {
20+
return payload;
21+
}
22+
23+
public String getPayloadAsString() {
24+
return new String(payload, StandardCharsets.UTF_8).trim();
25+
}
26+
27+
@Override
28+
public String toString() {
29+
return "Frame{" +
30+
"streamType=" + streamType +
31+
", payload=" + getPayloadAsString() +
32+
'}';
33+
}
34+
35+
/**
36+
* STREAM_TYPE can be:
37+
* <ul>
38+
* <li>0: stdin (will be written on stdout)</li>
39+
* <li>1: stdout</li>
40+
* <li>2: stderr</li>
41+
* <li>3: systemerr</li>
42+
* </ul>
43+
* See the paragraph _Stream format_ at https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach.
44+
* Reference implementation: https://github.com/moby/moby/blob/master/pkg/stdcopy/stdcopy.go.
45+
* Docker client GoDoc: https://godoc.org/github.com/moby/moby/client#Client.ContainerAttach.
46+
*/
47+
public enum StreamType {
48+
49+
STDIN((byte) 0),
50+
STDOUT((byte) 1),
51+
STDERR((byte) 2),
52+
SYSTEMERR((byte) 3);
53+
54+
StreamType(Object streamTypeId) {
55+
this.streamTypeId = ((byte) (streamTypeId));
56+
}
57+
58+
public static StreamType valueOf(final byte b) {
59+
switch (b) {
60+
case 0:
61+
return STDIN;
62+
case 1:
63+
return STDOUT;
64+
case 2:
65+
return STDERR;
66+
case 3:
67+
return SYSTEMERR;
68+
default:
69+
throw new IllegalArgumentException("no enum value for " + String.valueOf(b) + " found.");
70+
}
71+
}
72+
73+
public byte getStreamTypeId() {
74+
return streamTypeId;
75+
}
76+
77+
private final byte streamTypeId;
78+
}
79+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package de.gesellix.docker.engine.client.infrastructure
2+
3+
import okhttp3.Interceptor
4+
import okhttp3.Response
5+
6+
data class MultiplexedStreamConfig(val expectMultiplexedStream: Boolean)
7+
8+
// This one would work automatically, when the response content-type would be set correctly :-/
9+
// see https://github.com/gesellix/docker-client/issues/21
10+
class EnsureRawStreamContentTypeInterceptor : Interceptor {
11+
12+
override fun intercept(chain: Interceptor.Chain): Response {
13+
val response = chain.proceed(chain.request())
14+
if (chain.request().tag(MultiplexedStreamConfig::class.java)?.expectMultiplexedStream == true) {
15+
if (response.headers("Content-Type").isEmpty()) {
16+
// TODO use a proper logger
17+
println("Overriding Content-Type response header with application/vnd.docker.raw-stream")
18+
return response.newBuilder().header("Content-Type", "application/vnd.docker.raw-stream").build()
19+
}
20+
}
21+
return response
22+
}
23+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package de.gesellix.docker.engine.client.infrastructure
2+
3+
import de.gesellix.docker.response.Reader
4+
import okio.BufferedSource
5+
import okio.Source
6+
import okio.buffer
7+
8+
class FrameReader(source: Source) : Reader<Frame> {
9+
10+
private val buffer: BufferedSource = source.buffer()
11+
12+
override fun readNext(type: Class<Frame>?): Frame {
13+
// Stream format: https://docs.docker.com/engine/api/v1.41/#operation/ContainerAttach
14+
// header := [8]byte{STREAM_TYPE, 0, 0, 0, SIZE1, SIZE2, SIZE3, SIZE4}
15+
16+
val streamType = Frame.StreamType.valueOf(buffer.readByte())
17+
buffer.skip(3)
18+
val frameSize = buffer.readInt()
19+
20+
return Frame(streamType, buffer.readByteArray(frameSize.toLong()))
21+
}
22+
23+
override fun hasNext(): Boolean {
24+
return !Thread.currentThread().isInterrupted && !buffer.exhausted()
25+
}
26+
}

0 commit comments

Comments
 (0)