Skip to content

Commit 57ad0f1

Browse files
committed
add internal restart mechanism to avoid DEADLINE_EXCEEDED in windmill streams
1 parent 8872039 commit 57ad0f1

File tree

9 files changed

+161
-32
lines changed

9 files changed

+161
-32
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.ExecutorService;
2626
import java.util.concurrent.Executors;
2727
import java.util.concurrent.RejectedExecutionException;
28+
import java.util.concurrent.ScheduledExecutorService;
2829
import java.util.concurrent.TimeUnit;
2930
import java.util.function.Function;
3031
import javax.annotation.concurrent.GuardedBy;
@@ -76,6 +77,8 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
7677

7778
private final Logger logger;
7879
private final ExecutorService executor;
80+
private final WindmillStreamTimeout streamTimeout;
81+
private final ScheduledExecutorService restartExecutor;
7982
private final BackOff backoff;
8083
private final CountDownLatch finishLatch;
8184
private final Set<AbstractWindmillStream<?, ?>> streamRegistry;
@@ -101,13 +104,30 @@ protected AbstractWindmillStream(
101104
StreamObserverFactory streamObserverFactory,
102105
Set<AbstractWindmillStream<?, ?>> streamRegistry,
103106
int logEveryNStreamFailures,
104-
String backendWorkerToken) {
107+
String backendWorkerToken,
108+
WindmillStreamTimeout streamTimeout) {
105109
this.backendWorkerToken = backendWorkerToken;
106110
this.executor =
107111
Executors.newSingleThreadExecutor(
108112
new ThreadFactoryBuilder()
109113
.setDaemon(true)
110-
.setNameFormat(createThreadName(debugStreamType, backendWorkerToken))
114+
.setNameFormat(
115+
createThreadName(debugStreamType, backendWorkerToken, "WindmillStream"))
116+
.build());
117+
this.restartExecutor =
118+
// Exceptions thrown on this thread will not crash the user worker process.
119+
Executors.newSingleThreadScheduledExecutor(
120+
new ThreadFactoryBuilder()
121+
.setNameFormat(
122+
createThreadName(
123+
debugStreamType, backendWorkerToken, "WindmillStreamRestarter"))
124+
// We need to explicitly log the error since it may be swallowed.
125+
.setUncaughtExceptionHandler(
126+
(t, e) ->
127+
logger.error(
128+
"{} failed due to uncaught exception during execution. ",
129+
t.getName(),
130+
e))
111131
.build());
112132
this.backoff = backoff;
113133
this.streamRegistry = streamRegistry;
@@ -126,12 +146,14 @@ protected AbstractWindmillStream(
126146
logger);
127147
this.sleeper = Sleeper.DEFAULT;
128148
this.debugMetrics = StreamDebugMetrics.create();
149+
this.streamTimeout = streamTimeout;
129150
}
130151

131-
private static String createThreadName(String streamType, String backendWorkerToken) {
152+
private static String createThreadName(
153+
String threadType, String streamType, String backendWorkerToken) {
132154
return !backendWorkerToken.isEmpty()
133-
? String.format("%s-%s-WindmillStream-thread", streamType, backendWorkerToken)
134-
: String.format("%s-WindmillStream-thread", streamType);
155+
? String.format("%s-%s-%s-thread", streamType, backendWorkerToken, threadType)
156+
: String.format("%s-%s-thread", streamType, threadType);
135157
}
136158

137159
/** Called on each response from the server. */
@@ -177,6 +199,9 @@ public final void start() {
177199

178200
if (shouldStartStream) {
179201
startStream();
202+
// Restart the stream at every streamTimeout interval.
203+
restartExecutor.scheduleAtFixedRate(
204+
this::restart, streamTimeout.time(), streamTimeout.time(), streamTimeout.unit());
180205
}
181206
}
182207

@@ -296,6 +321,21 @@ public final synchronized void halfClose() {
296321
// Synchronization of close and onCompleted necessary for correct retry logic in onNewStream.
297322
debugMetrics.recordHalfClose();
298323
clientClosed = true;
324+
resetCurrentObserver();
325+
}
326+
327+
/**
328+
* Internally restart the stream to avoid {@link Status#DEADLINE_EXCEEDED} errors.
329+
*
330+
* @implNote Similar behavior to {@link #halfClose()}, except we allow callers to interact with
331+
* the stream after restarts.
332+
*/
333+
private synchronized void restart() {
334+
debugMetrics.recordRestartReason("Internal Timeout");
335+
resetCurrentObserver();
336+
}
337+
338+
private synchronized void resetCurrentObserver() {
299339
try {
300340
requestObserver.onCompleted();
301341
} catch (ResettableThrowingStreamObserver.StreamClosedException e) {
@@ -350,6 +390,8 @@ private synchronized boolean maybeTearDownStream() {
350390
return false;
351391
}
352392

393+
private void scheduleRestarts() {}
394+
353395
private class ResponseObserver implements StreamObserver<ResponseT> {
354396

355397
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.dataflow.worker.windmill.client;
19+
20+
import com.google.auto.value.AutoValue;
21+
import java.util.concurrent.TimeUnit;
22+
import org.apache.beam.sdk.annotations.Internal;
23+
24+
/**
25+
* Timeout for a {@link WindmillStream}. This should be less than a stream's deadline since we use
26+
* this to internally restart the stream before we will get a {@link
27+
* org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status#DEADLINE_EXCEEDED} errors.
28+
*/
29+
@Internal
30+
@AutoValue
31+
public abstract class WindmillStreamTimeout {
32+
public static WindmillStreamTimeout create(int time, TimeUnit unit) {
33+
return new AutoValue_WindmillStreamTimeout(time, unit);
34+
}
35+
36+
public abstract int time();
37+
38+
public abstract TimeUnit unit();
39+
40+
long asMillis() {
41+
return unit().toMillis(time());
42+
}
43+
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream;
4242
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
4343
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamShutdownException;
44+
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamTimeout;
4445
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
4546
import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
4647
import org.apache.beam.sdk.util.BackOff;
@@ -75,7 +76,8 @@ private GrpcCommitWorkStream(
7576
ThrottleTimer commitWorkThrottleTimer,
7677
JobHeader jobHeader,
7778
AtomicLong idGenerator,
78-
int streamingRpcBatchLimit) {
79+
int streamingRpcBatchLimit,
80+
WindmillStreamTimeout streamTimeout) {
7981
super(
8082
LOG,
8183
"CommitWorkStream",
@@ -84,7 +86,8 @@ private GrpcCommitWorkStream(
8486
streamObserverFactory,
8587
streamRegistry,
8688
logEveryNStreamFailures,
87-
backendWorkerToken);
89+
backendWorkerToken,
90+
streamTimeout);
8891
pending = new ConcurrentHashMap<>();
8992
this.idGenerator = idGenerator;
9093
this.jobHeader = jobHeader;
@@ -103,7 +106,8 @@ static GrpcCommitWorkStream create(
103106
ThrottleTimer commitWorkThrottleTimer,
104107
JobHeader jobHeader,
105108
AtomicLong idGenerator,
106-
int streamingRpcBatchLimit) {
109+
int streamingRpcBatchLimit,
110+
WindmillStreamTimeout streamTimeout) {
107111
return new GrpcCommitWorkStream(
108112
backendWorkerToken,
109113
startCommitWorkRpcFn,
@@ -114,7 +118,8 @@ static GrpcCommitWorkStream create(
114118
commitWorkThrottleTimer,
115119
jobHeader,
116120
idGenerator,
117-
streamingRpcBatchLimit);
121+
streamingRpcBatchLimit,
122+
streamTimeout);
118123
}
119124

120125
@Override

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream;
3636
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
3737
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamShutdownException;
38+
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamTimeout;
3839
import org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter;
3940
import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.GetDataClient;
4041
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GetWorkResponseChunkAssembler.AssembledWorkItem;
@@ -107,7 +108,8 @@ private GrpcDirectGetWorkStream(
107108
HeartbeatSender heartbeatSender,
108109
GetDataClient getDataClient,
109110
WorkCommitter workCommitter,
110-
WorkItemScheduler workItemScheduler) {
111+
WorkItemScheduler workItemScheduler,
112+
WindmillStreamTimeout streamTimeout) {
111113
super(
112114
LOG,
113115
"GetWorkStream",
@@ -116,7 +118,8 @@ private GrpcDirectGetWorkStream(
116118
streamObserverFactory,
117119
streamRegistry,
118120
logEveryNStreamFailures,
119-
backendWorkerToken);
121+
backendWorkerToken,
122+
streamTimeout);
120123
this.requestHeader = requestHeader;
121124
this.getWorkThrottleTimer = getWorkThrottleTimer;
122125
this.workItemScheduler = workItemScheduler;
@@ -150,7 +153,8 @@ static GrpcDirectGetWorkStream create(
150153
HeartbeatSender heartbeatSender,
151154
GetDataClient getDataClient,
152155
WorkCommitter workCommitter,
153-
WorkItemScheduler workItemScheduler) {
156+
WorkItemScheduler workItemScheduler,
157+
WindmillStreamTimeout streamTimeout) {
154158
return new GrpcDirectGetWorkStream(
155159
backendWorkerToken,
156160
startGetWorkRpcFn,
@@ -164,7 +168,8 @@ static GrpcDirectGetWorkStream create(
164168
heartbeatSender,
165169
getDataClient,
166170
workCommitter,
167-
workItemScheduler);
171+
workItemScheduler,
172+
streamTimeout);
168173
}
169174

170175
private static Watermarks createWatermarks(

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream;
5252
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
5353
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamShutdownException;
54+
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamTimeout;
5455
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcGetDataStreamRequests.QueuedBatch;
5556
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcGetDataStreamRequests.QueuedRequest;
5657
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
@@ -95,7 +96,8 @@ private GrpcGetDataStream(
9596
AtomicLong idGenerator,
9697
int streamingRpcBatchLimit,
9798
boolean sendKeyedGetDataRequests,
98-
Consumer<List<Windmill.ComputationHeartbeatResponse>> processHeartbeatResponses) {
99+
Consumer<List<Windmill.ComputationHeartbeatResponse>> processHeartbeatResponses,
100+
WindmillStreamTimeout streamTimeout) {
99101
super(
100102
LOG,
101103
"GetDataStream",
@@ -104,7 +106,8 @@ private GrpcGetDataStream(
104106
streamObserverFactory,
105107
streamRegistry,
106108
logEveryNStreamFailures,
107-
backendWorkerToken);
109+
backendWorkerToken,
110+
streamTimeout);
108111
this.idGenerator = idGenerator;
109112
this.getDataThrottleTimer = getDataThrottleTimer;
110113
this.jobHeader = jobHeader;
@@ -128,7 +131,8 @@ static GrpcGetDataStream create(
128131
AtomicLong idGenerator,
129132
int streamingRpcBatchLimit,
130133
boolean sendKeyedGetDataRequests,
131-
Consumer<List<Windmill.ComputationHeartbeatResponse>> processHeartbeatResponses) {
134+
Consumer<List<Windmill.ComputationHeartbeatResponse>> processHeartbeatResponses,
135+
WindmillStreamTimeout streamTimeout) {
132136
return new GrpcGetDataStream(
133137
backendWorkerToken,
134138
startGetDataRpcFn,
@@ -141,7 +145,8 @@ static GrpcGetDataStream create(
141145
idGenerator,
142146
streamingRpcBatchLimit,
143147
sendKeyedGetDataRequests,
144-
processHeartbeatResponses);
148+
processHeartbeatResponses,
149+
streamTimeout);
145150
}
146151

147152
private static WindmillStreamShutdownException shutdownExceptionFor(QueuedBatch batch) {

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream;
3131
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
3232
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamShutdownException;
33+
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamTimeout;
3334
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GetWorkResponseChunkAssembler.AssembledWorkItem;
3435
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
3536
import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
@@ -72,7 +73,8 @@ private GrpcGetWorkStream(
7273
int logEveryNStreamFailures,
7374
boolean requestBatchedGetWorkResponse,
7475
ThrottleTimer getWorkThrottleTimer,
75-
WorkItemReceiver receiver) {
76+
WorkItemReceiver receiver,
77+
WindmillStreamTimeout streamTimeout) {
7678
super(
7779
LOG,
7880
"GetWorkStream",
@@ -81,7 +83,8 @@ private GrpcGetWorkStream(
8183
streamObserverFactory,
8284
streamRegistry,
8385
logEveryNStreamFailures,
84-
backendWorkerToken);
86+
backendWorkerToken,
87+
streamTimeout);
8588
this.request = request;
8689
this.getWorkThrottleTimer = getWorkThrottleTimer;
8790
this.receiver = receiver;
@@ -104,7 +107,8 @@ public static GrpcGetWorkStream create(
104107
int logEveryNStreamFailures,
105108
boolean requestBatchedGetWorkResponse,
106109
ThrottleTimer getWorkThrottleTimer,
107-
WorkItemReceiver receiver) {
110+
WorkItemReceiver receiver,
111+
WindmillStreamTimeout streamTimeout) {
108112
return new GrpcGetWorkStream(
109113
backendWorkerToken,
110114
startGetWorkRpcFn,
@@ -115,7 +119,8 @@ public static GrpcGetWorkStream create(
115119
logEveryNStreamFailures,
116120
requestBatchedGetWorkResponse,
117121
getWorkThrottleTimer,
118-
receiver);
122+
receiver,
123+
streamTimeout);
119124
}
120125

121126
private void sendRequestExtension(long moreItems, long moreBytes) {

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStream.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream;
3131
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkerMetadataStream;
3232
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamShutdownException;
33+
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamTimeout;
3334
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
3435
import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
3536
import org.apache.beam.sdk.util.BackOff;
@@ -60,7 +61,8 @@ private GrpcGetWorkerMetadataStream(
6061
int logEveryNStreamFailures,
6162
JobHeader jobHeader,
6263
ThrottleTimer getWorkerMetadataThrottleTimer,
63-
Consumer<WindmillEndpoints> serverMappingConsumer) {
64+
Consumer<WindmillEndpoints> serverMappingConsumer,
65+
WindmillStreamTimeout streamTimeout) {
6466
super(
6567
LOG,
6668
"GetWorkerMetadataStream",
@@ -69,7 +71,8 @@ private GrpcGetWorkerMetadataStream(
6971
streamObserverFactory,
7072
streamRegistry,
7173
logEveryNStreamFailures,
72-
"");
74+
"",
75+
streamTimeout);
7376
this.workerMetadataRequest = WorkerMetadataRequest.newBuilder().setHeader(jobHeader).build();
7477
this.getWorkerMetadataThrottleTimer = getWorkerMetadataThrottleTimer;
7578
this.serverMappingConsumer = serverMappingConsumer;
@@ -86,7 +89,8 @@ public static GrpcGetWorkerMetadataStream create(
8689
int logEveryNStreamFailures,
8790
JobHeader jobHeader,
8891
ThrottleTimer getWorkerMetadataThrottleTimer,
89-
Consumer<WindmillEndpoints> serverMappingUpdater) {
92+
Consumer<WindmillEndpoints> serverMappingUpdater,
93+
WindmillStreamTimeout streamTimeout) {
9094
return new GrpcGetWorkerMetadataStream(
9195
startGetWorkerMetadataRpcFn,
9296
backoff,
@@ -95,7 +99,8 @@ public static GrpcGetWorkerMetadataStream create(
9599
logEveryNStreamFailures,
96100
jobHeader,
97101
getWorkerMetadataThrottleTimer,
98-
serverMappingUpdater);
102+
serverMappingUpdater,
103+
streamTimeout);
99104
}
100105

101106
/**

0 commit comments

Comments
 (0)