25
25
import java .util .concurrent .Executors ;
26
26
import java .util .concurrent .TimeUnit ;
27
27
import java .util .concurrent .atomic .AtomicBoolean ;
28
- import java .util .concurrent .atomic .AtomicReference ;
29
28
import java .util .function .Function ;
30
29
import java .util .function .Supplier ;
31
30
import javax .annotation .Nullable ;
44
43
import org .apache .beam .runners .dataflow .worker .windmill .work .budget .GetWorkBudget ;
45
44
import org .apache .beam .runners .dataflow .worker .windmill .work .budget .GetWorkBudgetSpender ;
46
45
import org .apache .beam .runners .dataflow .worker .windmill .work .refresh .FixedStreamHeartbeatSender ;
46
+ import org .apache .beam .runners .dataflow .worker .windmill .work .refresh .HeartbeatSender ;
47
47
import org .apache .beam .sdk .annotations .Internal ;
48
48
import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .util .concurrent .ThreadFactoryBuilder ;
49
49
import org .slf4j .Logger ;
66
66
final class WindmillStreamSender implements GetWorkBudgetSpender , StreamSender {
67
67
private static final Logger LOG = LoggerFactory .getLogger (WindmillStreamSender .class );
68
68
private static final String STREAM_MANAGER_THREAD_NAME_FORMAT = "WindmillStreamManagerThread" ;
69
+
70
+ // Must be shorter than withLongDeadline() duration in GrpcWindmillStreamFactory to prevent
71
+ // DEADLINE_EXCEEDED.
69
72
private static final int GET_WORK_STREAM_TTL_MINUTES = 45 ;
70
73
71
74
private final AtomicBoolean isRunning = new AtomicBoolean (false );
72
75
private final GetDataStream getDataStream ;
73
76
private final CommitWorkStream commitWorkStream ;
74
77
private final WorkCommitter workCommitter ;
75
78
private final StreamingEngineThrottleTimers streamingEngineThrottleTimers ;
76
- private final ExecutorService streamStarter ;
79
+ private final ExecutorService streamManagerExecutor ;
77
80
private final String backendWorkerToken ;
78
81
79
- @ GuardedBy ("activeGetWorkStream" )
80
- private final AtomicReference <GetWorkStream > activeGetWorkStream ;
81
-
82
- @ GuardedBy ("activeGetWorkStream" )
83
- private final AtomicReference <GetWorkBudget > getWorkBudget ;
82
+ private final Object streamLock = new Object ();
84
83
85
- @ GuardedBy ("activeGetWorkStream " )
84
+ @ GuardedBy ("streamLock " )
86
85
private final Supplier <GetWorkStream > getWorkStreamFactory ;
87
86
87
+ @ GuardedBy ("streamLock" )
88
+ private @ Nullable GetWorkStream activeGetWorkStream ;
89
+
90
+ @ GuardedBy ("streamLock" )
91
+ private GetWorkBudget getWorkBudget ;
92
+
88
93
private WindmillStreamSender (
89
94
WindmillConnection connection ,
90
95
GetWorkRequest getWorkRequest ,
91
- AtomicReference < GetWorkBudget > getWorkBudget ,
96
+ GetWorkBudget initialGetWorkBudget ,
92
97
GrpcWindmillStreamFactory streamingEngineStreamFactory ,
93
98
WorkItemScheduler workItemScheduler ,
94
99
Function <GetDataStream , GetDataClient > getDataClientFactory ,
95
100
Function <CommitWorkStream , WorkCommitter > workCommitterFactory ) {
96
101
this .backendWorkerToken = connection .backendWorkerToken ();
97
- this .getWorkBudget = getWorkBudget ;
102
+ this .getWorkBudget = initialGetWorkBudget ;
98
103
this .streamingEngineThrottleTimers = StreamingEngineThrottleTimers .create ();
99
104
// Stream instances connect/reconnect internally, so we can reuse the same instance through the
100
105
// entire lifecycle of WindmillStreamSender.
@@ -105,21 +110,26 @@ private WindmillStreamSender(
105
110
streamingEngineStreamFactory .createDirectCommitWorkStream (
106
111
connection , streamingEngineThrottleTimers .commitWorkThrottleTimer ());
107
112
this .workCommitter = workCommitterFactory .apply (commitWorkStream );
108
- this .activeGetWorkStream = new AtomicReference <>();
113
+ this .activeGetWorkStream = null ;
114
+
115
+ HeartbeatSender heartbeatSender = FixedStreamHeartbeatSender .create (getDataStream );
116
+ GetDataClient getDataClient = getDataClientFactory .apply (getDataStream );
109
117
this .getWorkStreamFactory =
110
- () ->
111
- streamingEngineStreamFactory .createDirectGetWorkStream (
118
+ () -> {
119
+ synchronized (streamLock ) {
120
+ return streamingEngineStreamFactory .createDirectGetWorkStream (
112
121
connection ,
113
- withRequestBudget (getWorkRequest , getWorkBudget . get () ),
122
+ withRequestBudget (getWorkRequest , getWorkBudget ),
114
123
streamingEngineThrottleTimers .getWorkThrottleTimer (),
115
- FixedStreamHeartbeatSender . create ( getDataStream ) ,
116
- getDataClientFactory . apply ( getDataStream ) ,
124
+ heartbeatSender ,
125
+ getDataClient ,
117
126
workCommitter ,
118
127
workItemScheduler );
128
+ }
129
+ };
119
130
// 3 threads, 1 for each stream type (GetWork, GetData, CommitWork).
120
- this .streamStarter =
121
- Executors .newFixedThreadPool (
122
- 3 ,
131
+ this .streamManagerExecutor =
132
+ Executors .newCachedThreadPool (
123
133
new ThreadFactoryBuilder ()
124
134
.setNameFormat (STREAM_MANAGER_THREAD_NAME_FORMAT + "-" + backendWorkerToken + "-%d" )
125
135
.build ());
@@ -128,15 +138,15 @@ private WindmillStreamSender(
128
138
static WindmillStreamSender create (
129
139
WindmillConnection connection ,
130
140
GetWorkRequest getWorkRequest ,
131
- GetWorkBudget getWorkBudget ,
141
+ GetWorkBudget initialGetWorkBudget ,
132
142
GrpcWindmillStreamFactory streamingEngineStreamFactory ,
133
143
WorkItemScheduler workItemScheduler ,
134
144
Function <GetDataStream , GetDataClient > getDataClientFactory ,
135
145
Function <CommitWorkStream , WorkCommitter > workCommitterFactory ) {
136
146
return new WindmillStreamSender (
137
147
connection ,
138
148
getWorkRequest ,
139
- new AtomicReference <>( getWorkBudget ) ,
149
+ initialGetWorkBudget ,
140
150
streamingEngineStreamFactory ,
141
151
workItemScheduler ,
142
152
getDataClientFactory ,
@@ -149,13 +159,14 @@ private static GetWorkRequest withRequestBudget(GetWorkRequest request, GetWorkB
149
159
150
160
synchronized void start () {
151
161
if (isRunning .compareAndSet (false , true )) {
152
- checkState (!streamStarter .isShutdown (), "WindmillStreamSender has already been shutdown." );
162
+ checkState (
163
+ !streamManagerExecutor .isShutdown (), "WindmillStreamSender has already been shutdown." );
153
164
// Start these 3 streams in parallel since they each may perform blocking IO.
154
165
CountDownLatch waitForInitialStream = new CountDownLatch (1 );
155
- streamStarter .execute (() -> getWorkStreamLoop (waitForInitialStream ));
166
+ streamManagerExecutor .execute (() -> getWorkStreamLoop (waitForInitialStream :: countDown ));
156
167
CompletableFuture .allOf (
157
- CompletableFuture .runAsync (getDataStream ::start , streamStarter ),
158
- CompletableFuture .runAsync (commitWorkStream ::start , streamStarter ))
168
+ CompletableFuture .runAsync (getDataStream ::start , streamManagerExecutor ),
169
+ CompletableFuture .runAsync (commitWorkStream ::start , streamManagerExecutor ))
159
170
.join ();
160
171
try {
161
172
waitForInitialStream .await ();
@@ -171,24 +182,23 @@ synchronized void start() {
171
182
@ Override
172
183
public synchronized void close () {
173
184
isRunning .set (false );
174
- streamStarter .shutdownNow ();
185
+ streamManagerExecutor .shutdownNow ();
175
186
getDataStream .shutdown ();
176
187
workCommitter .stop ();
177
188
commitWorkStream .shutdown ();
178
189
}
179
190
180
191
@ Override
181
192
public void setBudget (long items , long bytes ) {
182
- synchronized ( activeGetWorkStream ) {
183
- GetWorkBudget budget = GetWorkBudget . builder (). setItems ( items ). setBytes ( bytes ). build ();
184
- getWorkBudget . set ( budget ) ;
193
+ GetWorkBudget budget = GetWorkBudget . builder (). setItems ( items ). setBytes ( bytes ). build ();
194
+ synchronized ( streamLock ) {
195
+ getWorkBudget = budget ;
185
196
if (isRunning .get ()) {
186
- @ Nullable GetWorkStream stream = activeGetWorkStream .get ();
187
197
// activeGetWorkStream could be null if start() was called but activeGetWorkStream was not
188
198
// populated yet. Populating activeGetWorkStream and setting the budget are guaranteed to
189
199
// execute serially since both operations synchronize on activeGetWorkStream.
190
- if (stream != null ) {
191
- stream .setBudget (budget );
200
+ if (activeGetWorkStream != null ) {
201
+ activeGetWorkStream .setBudget (budget );
192
202
}
193
203
}
194
204
}
@@ -207,33 +217,50 @@ long getCurrentActiveCommitBytes() {
207
217
* to prevent {@link org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status#DEADLINE_EXCEEDED} errors.
208
218
* If at any point the server closes the stream, reconnects immediately.
209
219
*/
210
- private void getWorkStreamLoop (CountDownLatch waitForInitialStream ) {
211
- @ Nullable GetWorkStream newStream = null ;
220
+ private void getWorkStreamLoop (Runnable onInitialStream ) {
212
221
while (isRunning .get ()) {
213
- synchronized (activeGetWorkStream ) {
214
- newStream = getWorkStreamFactory .get ();
222
+ CountDownLatch triggerNewStream = new CountDownLatch (1 );
223
+ synchronized (streamLock ) {
224
+ GetWorkStream newStream = getWorkStreamFactory .get ();
215
225
newStream .start ();
216
- waitForInitialStream .countDown ();
217
- activeGetWorkStream .set (newStream );
226
+ onInitialStream .run ();
227
+ activeGetWorkStream = newStream ;
228
+ // Offload the old stream termination to a different thread which will terminate once it
229
+ // closes the stream. This allows this thread to not wait for the old stream to terminate
230
+ // before creating a new stream once the old stream has timed out.
231
+ streamManagerExecutor .execute (() -> gracefullyTerminateStream (newStream , triggerNewStream ));
218
232
}
219
- try {
220
- // Try to gracefully terminate the stream.
221
- if (!newStream .awaitTermination (GET_WORK_STREAM_TTL_MINUTES , TimeUnit .MINUTES )) {
222
- newStream .halfClose ();
223
- }
224
-
225
- // If graceful termination is unsuccessful, forcefully shutdown.
226
- if (!newStream .awaitTermination (30 , TimeUnit .SECONDS )) {
227
- newStream .shutdown ();
228
- }
229
233
234
+ // Wait for a new stream to be triggered w/o holding the lock.
235
+ try {
236
+ triggerNewStream .await ();
230
237
} catch (InterruptedException e ) {
231
- // continue until !isRunning.
238
+ assert !isRunning .get ();
232
239
}
233
240
}
241
+ }
242
+
243
+ private void gracefullyTerminateStream (GetWorkStream stream , CountDownLatch triggerNewStream ) {
244
+ try {
245
+ // Try to gracefully terminate the stream.
246
+ if (!stream .awaitTermination (GET_WORK_STREAM_TTL_MINUTES , TimeUnit .MINUTES )) {
247
+ // Free any threads waiting to create a new stream.
248
+ triggerNewStream .countDown ();
249
+ stream .halfClose ();
250
+ }
251
+
252
+ // Wait a bit for retries/drains then forcefully shutdown if graceful termination is
253
+ // unsuccessful.
254
+ if (!stream .awaitTermination (5 , TimeUnit .MINUTES )) {
255
+ stream .shutdown ();
256
+ }
234
257
235
- if (newStream != null ) {
236
- newStream .shutdown ();
258
+ } catch (InterruptedException e ) {
259
+ assert !isRunning .get ();
260
+ } finally {
261
+ // Make sure we clean up the stream.
262
+ stream .shutdown ();
263
+ triggerNewStream .countDown ();
237
264
}
238
265
}
239
266
}
0 commit comments