20
20
import static org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Preconditions .checkState ;
21
21
22
22
import java .util .concurrent .CompletableFuture ;
23
- import java .util .concurrent .CountDownLatch ;
24
23
import java .util .concurrent .ExecutorService ;
25
24
import java .util .concurrent .Executors ;
26
- import java .util .concurrent .TimeUnit ;
27
25
import java .util .concurrent .atomic .AtomicBoolean ;
26
+ import java .util .concurrent .atomic .AtomicReference ;
28
27
import java .util .function .Function ;
29
- import java .util .function .Supplier ;
30
- import javax .annotation .Nullable ;
31
- import javax .annotation .concurrent .GuardedBy ;
32
28
import javax .annotation .concurrent .ThreadSafe ;
33
29
import org .apache .beam .runners .dataflow .worker .windmill .Windmill .GetWorkRequest ;
34
30
import org .apache .beam .runners .dataflow .worker .windmill .WindmillConnection ;
43
39
import org .apache .beam .runners .dataflow .worker .windmill .work .budget .GetWorkBudget ;
44
40
import org .apache .beam .runners .dataflow .worker .windmill .work .budget .GetWorkBudgetSpender ;
45
41
import org .apache .beam .runners .dataflow .worker .windmill .work .refresh .FixedStreamHeartbeatSender ;
46
- import org .apache .beam .runners .dataflow .worker .windmill .work .refresh .HeartbeatSender ;
47
42
import org .apache .beam .sdk .annotations .Internal ;
48
43
import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .util .concurrent .ThreadFactoryBuilder ;
49
- import org .slf4j .Logger ;
50
- import org .slf4j .LoggerFactory ;
51
44
52
45
/**
53
46
* Owns and maintains a set of streams used to communicate with a specific Windmill worker.
64
57
@ Internal
65
58
@ ThreadSafe
66
59
final class WindmillStreamSender implements GetWorkBudgetSpender , StreamSender {
67
- private static final Logger LOG = LoggerFactory .getLogger (WindmillStreamSender .class );
68
- private static final String STREAM_MANAGER_THREAD_NAME_FORMAT = "WindmillStreamManagerThread" ;
69
-
70
- // Must be shorter than withLongDeadline() duration in GrpcWindmillStreamFactory to prevent
71
- // DEADLINE_EXCEEDED.
72
- private static final int GET_WORK_STREAM_TTL_MINUTES = 45 ;
73
-
74
- private final AtomicBoolean isRunning = new AtomicBoolean (false );
60
+ private static final String STREAM_STARTER_THREAD_NAME = "StartWindmillStreamThread-%d" ;
61
+ private final AtomicBoolean started ;
62
+ private final AtomicReference <GetWorkBudget > getWorkBudget ;
63
+ private final GetWorkStream getWorkStream ;
75
64
private final GetDataStream getDataStream ;
76
65
private final CommitWorkStream commitWorkStream ;
77
66
private final WorkCommitter workCommitter ;
78
67
private final StreamingEngineThrottleTimers streamingEngineThrottleTimers ;
79
- private final ExecutorService streamManagerExecutor ;
80
- private final String backendWorkerToken ;
81
- private final Object getWorkStreamLock = new Object ();
82
-
83
- @ GuardedBy ("getWorkStreamLock" )
84
- private final Supplier <GetWorkStream > getWorkStreamFactory ;
85
-
86
- @ GuardedBy ("getWorkStreamLock" )
87
- private @ Nullable GetWorkStream activeGetWorkStream ;
88
-
89
- @ GuardedBy ("getWorkStreamLock" )
90
- private GetWorkBudget getWorkBudget ;
68
+ private final ExecutorService streamStarter ;
91
69
92
70
private WindmillStreamSender (
93
71
WindmillConnection connection ,
94
72
GetWorkRequest getWorkRequest ,
95
- GetWorkBudget initialGetWorkBudget ,
73
+ AtomicReference < GetWorkBudget > getWorkBudget ,
96
74
GrpcWindmillStreamFactory streamingEngineStreamFactory ,
97
75
WorkItemScheduler workItemScheduler ,
98
76
Function <GetDataStream , GetDataClient > getDataClientFactory ,
99
77
Function <CommitWorkStream , WorkCommitter > workCommitterFactory ) {
100
- this .backendWorkerToken = connection . backendWorkerToken ( );
101
- this .getWorkBudget = initialGetWorkBudget ;
78
+ this .started = new AtomicBoolean ( false );
79
+ this .getWorkBudget = getWorkBudget ;
102
80
this .streamingEngineThrottleTimers = StreamingEngineThrottleTimers .create ();
81
+
103
82
// Stream instances connect/reconnect internally, so we can reuse the same instance through the
104
83
// entire lifecycle of WindmillStreamSender.
105
84
this .getDataStream =
@@ -109,43 +88,33 @@ private WindmillStreamSender(
109
88
streamingEngineStreamFactory .createDirectCommitWorkStream (
110
89
connection , streamingEngineThrottleTimers .commitWorkThrottleTimer ());
111
90
this .workCommitter = workCommitterFactory .apply (commitWorkStream );
112
- this .activeGetWorkStream = null ;
113
-
114
- HeartbeatSender heartbeatSender = FixedStreamHeartbeatSender .create (getDataStream );
115
- GetDataClient getDataClient = getDataClientFactory .apply (getDataStream );
116
- this .getWorkStreamFactory =
117
- () -> {
118
- synchronized (getWorkStreamLock ) {
119
- return streamingEngineStreamFactory .createDirectGetWorkStream (
120
- connection ,
121
- withRequestBudget (getWorkRequest , getWorkBudget ),
122
- streamingEngineThrottleTimers .getWorkThrottleTimer (),
123
- heartbeatSender ,
124
- getDataClient ,
125
- workCommitter ,
126
- workItemScheduler );
127
- }
128
- };
91
+ this .getWorkStream =
92
+ streamingEngineStreamFactory .createDirectGetWorkStream (
93
+ connection ,
94
+ withRequestBudget (getWorkRequest , getWorkBudget .get ()),
95
+ streamingEngineThrottleTimers .getWorkThrottleTimer (),
96
+ FixedStreamHeartbeatSender .create (getDataStream ),
97
+ getDataClientFactory .apply (getDataStream ),
98
+ workCommitter ,
99
+ workItemScheduler );
129
100
// 3 threads, 1 for each stream type (GetWork, GetData, CommitWork).
130
- this .streamManagerExecutor =
101
+ this .streamStarter =
131
102
Executors .newCachedThreadPool (
132
- new ThreadFactoryBuilder ()
133
- .setNameFormat (STREAM_MANAGER_THREAD_NAME_FORMAT + "-" + backendWorkerToken + "-%d" )
134
- .build ());
103
+ new ThreadFactoryBuilder ().setNameFormat (STREAM_STARTER_THREAD_NAME ).build ());
135
104
}
136
105
137
106
static WindmillStreamSender create (
138
107
WindmillConnection connection ,
139
108
GetWorkRequest getWorkRequest ,
140
- GetWorkBudget initialGetWorkBudget ,
109
+ GetWorkBudget getWorkBudget ,
141
110
GrpcWindmillStreamFactory streamingEngineStreamFactory ,
142
111
WorkItemScheduler workItemScheduler ,
143
112
Function <GetDataStream , GetDataClient > getDataClientFactory ,
144
113
Function <CommitWorkStream , WorkCommitter > workCommitterFactory ) {
145
114
return new WindmillStreamSender (
146
115
connection ,
147
116
getWorkRequest ,
148
- initialGetWorkBudget ,
117
+ new AtomicReference <>( getWorkBudget ) ,
149
118
streamingEngineStreamFactory ,
150
119
workItemScheduler ,
151
120
getDataClientFactory ,
@@ -157,31 +126,24 @@ private static GetWorkRequest withRequestBudget(GetWorkRequest request, GetWorkB
157
126
}
158
127
159
128
synchronized void start () {
160
- if (isRunning . compareAndSet ( false , true )) {
161
- checkState (
162
- ! streamManagerExecutor . isShutdown (), "WindmillStreamSender has already been shutdown." );
129
+ if (! started . get ( )) {
130
+ checkState (! streamStarter . isShutdown (), "WindmillStreamSender has already been shutdown." );
131
+
163
132
// Start these 3 streams in parallel since they each may perform blocking IO.
164
- CountDownLatch waitForInitialStream = new CountDownLatch (1 );
165
- streamManagerExecutor .execute (() -> getWorkStreamLoop (waitForInitialStream ::countDown ));
166
133
CompletableFuture .allOf (
167
- CompletableFuture .runAsync (getDataStream ::start , streamManagerExecutor ),
168
- CompletableFuture .runAsync (commitWorkStream ::start , streamManagerExecutor ))
134
+ CompletableFuture .runAsync (getWorkStream ::start , streamStarter ),
135
+ CompletableFuture .runAsync (getDataStream ::start , streamStarter ),
136
+ CompletableFuture .runAsync (commitWorkStream ::start , streamStarter ))
169
137
.join ();
170
- try {
171
- waitForInitialStream .await ();
172
- } catch (InterruptedException e ) {
173
- close ();
174
- LOG .error ("GetWorkStream to {} was never able to start." , backendWorkerToken );
175
- throw new IllegalStateException ("GetWorkStream unable to start aborting." , e );
176
- }
177
138
workCommitter .start ();
139
+ started .set (true );
178
140
}
179
141
}
180
142
181
143
@ Override
182
144
public synchronized void close () {
183
- isRunning . set ( false );
184
- streamManagerExecutor . shutdownNow ();
145
+ streamStarter . shutdownNow ( );
146
+ getWorkStream . shutdown ();
185
147
getDataStream .shutdown ();
186
148
workCommitter .stop ();
187
149
commitWorkStream .shutdown ();
@@ -190,16 +152,9 @@ public synchronized void close() {
190
152
@ Override
191
153
public void setBudget (long items , long bytes ) {
192
154
GetWorkBudget budget = GetWorkBudget .builder ().setItems (items ).setBytes (bytes ).build ();
193
- synchronized (getWorkStreamLock ) {
194
- getWorkBudget = budget ;
195
- if (isRunning .get ()) {
196
- // activeGetWorkStream could be null if start() was called but activeGetWorkStream was not
197
- // populated yet. Populating activeGetWorkStream and setting the budget are guaranteed to
198
- // execute serially since both operations synchronize on getWorkStreamLock.
199
- if (activeGetWorkStream != null ) {
200
- activeGetWorkStream .setBudget (budget );
201
- }
202
- }
155
+ getWorkBudget .set (budget );
156
+ if (started .get ()) {
157
+ getWorkStream .setBudget (budget );
203
158
}
204
159
}
205
160
@@ -210,70 +165,4 @@ long getAndResetThrottleTime() {
210
165
long getCurrentActiveCommitBytes () {
211
166
return workCommitter .currentActiveCommitBytes ();
212
167
}
213
-
214
- /**
215
- * Creates, starts, and gracefully terminates {@link GetWorkStream} before the clientside deadline
216
- * to prevent {@link org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status#DEADLINE_EXCEEDED} errors.
217
- * If at any point the server closes the stream, reconnects immediately.
218
- */
219
- private void getWorkStreamLoop (Runnable onInitialStream ) {
220
- boolean shouldCreateNewStream = true ;
221
- while (isRunning .get () && shouldCreateNewStream ) {
222
- GetWorkStream newStream ;
223
- synchronized (getWorkStreamLock ) {
224
- newStream = getWorkStreamFactory .get ();
225
- newStream .start ();
226
- onInitialStream .run ();
227
- activeGetWorkStream = newStream ;
228
- }
229
-
230
- shouldCreateNewStream = shouldCreateNewStreamAfterAwaitingTermination (newStream );
231
- }
232
- }
233
-
234
- /**
235
- * Manages stream termination. Returns true if a new stream should be created after the stream is
236
- * terminated.
237
- *
238
- * @implNote This may block for up to {@link #GET_WORK_STREAM_TTL_MINUTES} minutes.
239
- */
240
- private boolean shouldCreateNewStreamAfterAwaitingTermination (GetWorkStream stream ) {
241
- try {
242
- // Try to gracefully terminate the stream. If awaitTermination() returns before the TTL it
243
- // means the server has terminated the connection and we reconnect immediately. If the stream
244
- // is alive, terminate and drain the stream from the client to prevent DEADLINE_EXCEEDED
245
- // status errors.
246
- if (!stream .awaitTermination (GET_WORK_STREAM_TTL_MINUTES , TimeUnit .MINUTES )) {
247
- drainStreamAsync (stream );
248
- }
249
- } catch (InterruptedException e ) {
250
- assert !isRunning .get ();
251
- return false ;
252
- }
253
-
254
- return true ;
255
- }
256
-
257
- private void drainStreamAsync (GetWorkStream stream ) {
258
- // Offload the old stream termination to a different thread which will terminate once it
259
- // closes the stream. This allows this thread to not wait for the old stream to terminate
260
- // before creating a new stream once the old stream has timed out.
261
- streamManagerExecutor .execute (
262
- () -> {
263
- stream .halfClose ();
264
- try {
265
- // Wait a bit for retries/drains then forcefully shutdown if graceful termination is
266
- // unsuccessful.
267
- if (!stream .awaitTermination (5 , TimeUnit .MINUTES )) {
268
- stream .shutdown ();
269
- }
270
-
271
- } catch (InterruptedException e ) {
272
- assert !isRunning .get ();
273
- } finally {
274
- // Make sure we clean up the stream.
275
- stream .shutdown ();
276
- }
277
- });
278
- }
279
168
}
0 commit comments