20
20
import static org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Preconditions .checkState ;
21
21
22
22
import java .util .concurrent .CompletableFuture ;
23
+ import java .util .concurrent .CountDownLatch ;
23
24
import java .util .concurrent .ExecutorService ;
24
25
import java .util .concurrent .Executors ;
26
+ import java .util .concurrent .TimeUnit ;
25
27
import java .util .concurrent .atomic .AtomicBoolean ;
26
28
import java .util .concurrent .atomic .AtomicReference ;
27
29
import java .util .function .Function ;
30
+ import java .util .function .Supplier ;
31
+ import javax .annotation .Nullable ;
32
+ import javax .annotation .concurrent .GuardedBy ;
28
33
import javax .annotation .concurrent .ThreadSafe ;
29
34
import org .apache .beam .runners .dataflow .worker .windmill .Windmill .GetWorkRequest ;
30
35
import org .apache .beam .runners .dataflow .worker .windmill .WindmillConnection ;
41
46
import org .apache .beam .runners .dataflow .worker .windmill .work .refresh .FixedStreamHeartbeatSender ;
42
47
import org .apache .beam .sdk .annotations .Internal ;
43
48
import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .util .concurrent .ThreadFactoryBuilder ;
49
+ import org .slf4j .Logger ;
50
+ import org .slf4j .LoggerFactory ;
44
51
45
52
/**
46
53
* Owns and maintains a set of streams used to communicate with a specific Windmill worker.
57
64
@ Internal
58
65
@ ThreadSafe
59
66
final class WindmillStreamSender implements GetWorkBudgetSpender , StreamSender {
60
- private static final String STREAM_STARTER_THREAD_NAME = "StartWindmillStreamThread-%d" ;
61
- private final AtomicBoolean started ;
62
- private final AtomicReference <GetWorkBudget > getWorkBudget ;
63
- private final GetWorkStream getWorkStream ;
67
+ private static final Logger LOG = LoggerFactory .getLogger (WindmillStreamSender .class );
68
+ private static final String STREAM_MANAGER_THREAD_NAME_FORMAT = "WindmillStreamManagerThread" ;
69
+ private static final int GET_WORK_STREAM_TTL_MINUTES = 45 ;
70
+
71
+ private final AtomicBoolean isRunning = new AtomicBoolean (false );
64
72
private final GetDataStream getDataStream ;
65
73
private final CommitWorkStream commitWorkStream ;
66
74
private final WorkCommitter workCommitter ;
67
75
private final StreamingEngineThrottleTimers streamingEngineThrottleTimers ;
68
76
private final ExecutorService streamStarter ;
77
+ private final String backendWorkerToken ;
78
+
79
+ @ GuardedBy ("activeGetWorkStream" )
80
+ private final AtomicReference <GetWorkStream > activeGetWorkStream ;
81
+
82
+ @ GuardedBy ("activeGetWorkStream" )
83
+ private final AtomicReference <GetWorkBudget > getWorkBudget ;
84
+
85
+ @ GuardedBy ("activeGetWorkStream" )
86
+ private final Supplier <GetWorkStream > getWorkStreamFactory ;
69
87
70
88
private WindmillStreamSender (
71
89
WindmillConnection connection ,
@@ -75,10 +93,9 @@ private WindmillStreamSender(
75
93
WorkItemScheduler workItemScheduler ,
76
94
Function <GetDataStream , GetDataClient > getDataClientFactory ,
77
95
Function <CommitWorkStream , WorkCommitter > workCommitterFactory ) {
78
- this .started = new AtomicBoolean ( false );
96
+ this .backendWorkerToken = connection . backendWorkerToken ( );
79
97
this .getWorkBudget = getWorkBudget ;
80
98
this .streamingEngineThrottleTimers = StreamingEngineThrottleTimers .create ();
81
-
82
99
// Stream instances connect/reconnect internally, so we can reuse the same instance through the
83
100
// entire lifecycle of WindmillStreamSender.
84
101
this .getDataStream =
@@ -88,19 +105,24 @@ private WindmillStreamSender(
88
105
streamingEngineStreamFactory .createDirectCommitWorkStream (
89
106
connection , streamingEngineThrottleTimers .commitWorkThrottleTimer ());
90
107
this .workCommitter = workCommitterFactory .apply (commitWorkStream );
91
- this .getWorkStream =
92
- streamingEngineStreamFactory .createDirectGetWorkStream (
93
- connection ,
94
- withRequestBudget (getWorkRequest , getWorkBudget .get ()),
95
- streamingEngineThrottleTimers .getWorkThrottleTimer (),
96
- FixedStreamHeartbeatSender .create (getDataStream ),
97
- getDataClientFactory .apply (getDataStream ),
98
- workCommitter ,
99
- workItemScheduler );
108
+ this .activeGetWorkStream = new AtomicReference <>();
109
+ this .getWorkStreamFactory =
110
+ () ->
111
+ streamingEngineStreamFactory .createDirectGetWorkStream (
112
+ connection ,
113
+ withRequestBudget (getWorkRequest , getWorkBudget .get ()),
114
+ streamingEngineThrottleTimers .getWorkThrottleTimer (),
115
+ FixedStreamHeartbeatSender .create (getDataStream ),
116
+ getDataClientFactory .apply (getDataStream ),
117
+ workCommitter ,
118
+ workItemScheduler );
100
119
// 3 threads, 1 for each stream type (GetWork, GetData, CommitWork).
101
120
this .streamStarter =
102
121
Executors .newFixedThreadPool (
103
- 3 , new ThreadFactoryBuilder ().setNameFormat (STREAM_STARTER_THREAD_NAME ).build ());
122
+ 3 ,
123
+ new ThreadFactoryBuilder ()
124
+ .setNameFormat (STREAM_MANAGER_THREAD_NAME_FORMAT + "-" + backendWorkerToken + "-%d" )
125
+ .build ());
104
126
}
105
127
106
128
static WindmillStreamSender create (
@@ -126,35 +148,49 @@ private static GetWorkRequest withRequestBudget(GetWorkRequest request, GetWorkB
126
148
}
127
149
128
150
synchronized void start () {
129
- if (! started . get ( )) {
151
+ if (isRunning . compareAndSet ( false , true )) {
130
152
checkState (!streamStarter .isShutdown (), "WindmillStreamSender has already been shutdown." );
131
-
132
153
// Start these 3 streams in parallel since they each may perform blocking IO.
154
+ CountDownLatch waitForInitialStream = new CountDownLatch (1 );
155
+ streamStarter .execute (() -> getWorkStreamLoop (waitForInitialStream ));
133
156
CompletableFuture .allOf (
134
- CompletableFuture .runAsync (getWorkStream ::start , streamStarter ),
135
157
CompletableFuture .runAsync (getDataStream ::start , streamStarter ),
136
158
CompletableFuture .runAsync (commitWorkStream ::start , streamStarter ))
137
159
.join ();
160
+ try {
161
+ waitForInitialStream .await ();
162
+ } catch (InterruptedException e ) {
163
+ close ();
164
+ LOG .error ("GetWorkStream to {} was never able to start." , backendWorkerToken );
165
+ throw new IllegalStateException ("GetWorkStream unable to start aborting." , e );
166
+ }
138
167
workCommitter .start ();
139
- started .set (true );
140
168
}
141
169
}
142
170
143
171
@ Override
144
172
public synchronized void close () {
173
+ isRunning .set (false );
145
174
streamStarter .shutdownNow ();
146
- getWorkStream .shutdown ();
147
175
getDataStream .shutdown ();
148
176
workCommitter .stop ();
149
177
commitWorkStream .shutdown ();
150
178
}
151
179
152
180
@ Override
153
181
public void setBudget (long items , long bytes ) {
154
- GetWorkBudget budget = GetWorkBudget .builder ().setItems (items ).setBytes (bytes ).build ();
155
- getWorkBudget .set (budget );
156
- if (started .get ()) {
157
- getWorkStream .setBudget (budget );
182
+ synchronized (activeGetWorkStream ) {
183
+ GetWorkBudget budget = GetWorkBudget .builder ().setItems (items ).setBytes (bytes ).build ();
184
+ getWorkBudget .set (budget );
185
+ if (isRunning .get ()) {
186
+ @ Nullable GetWorkStream stream = activeGetWorkStream .get ();
187
+ // activeGetWorkStream could be null if start() was called but activeGetWorkStream was not
188
+ // populated yet. Populating activeGetWorkStream and setting the budget are guaranteed to
189
+ // execute serially since both operations synchronize on activeGetWorkStream.
190
+ if (stream != null ) {
191
+ stream .setBudget (budget );
192
+ }
193
+ }
158
194
}
159
195
}
160
196
@@ -165,4 +201,39 @@ long getAndResetThrottleTime() {
165
201
long getCurrentActiveCommitBytes () {
166
202
return workCommitter .currentActiveCommitBytes ();
167
203
}
204
+
205
+ /**
206
+ * Creates, starts, and gracefully terminates {@link GetWorkStream} before the clientside deadline
207
+ * to prevent {@link org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status#DEADLINE_EXCEEDED} errors.
208
+ * If at any point the server closes the stream, reconnects immediately.
209
+ */
210
+ private void getWorkStreamLoop (CountDownLatch waitForInitialStream ) {
211
+ @ Nullable GetWorkStream newStream = null ;
212
+ while (isRunning .get ()) {
213
+ synchronized (activeGetWorkStream ) {
214
+ newStream = getWorkStreamFactory .get ();
215
+ newStream .start ();
216
+ waitForInitialStream .countDown ();
217
+ activeGetWorkStream .set (newStream );
218
+ }
219
+ try {
220
+ // Try to gracefully terminate the stream.
221
+ if (!newStream .awaitTermination (GET_WORK_STREAM_TTL_MINUTES , TimeUnit .MINUTES )) {
222
+ newStream .halfClose ();
223
+ }
224
+
225
+ // If graceful termination is unsuccessful, forcefully shutdown.
226
+ if (!newStream .awaitTermination (30 , TimeUnit .SECONDS )) {
227
+ newStream .shutdown ();
228
+ }
229
+
230
+ } catch (InterruptedException e ) {
231
+ // continue until !isRunning.
232
+ }
233
+ }
234
+
235
+ if (newStream != null ) {
236
+ newStream .shutdown ();
237
+ }
238
+ }
168
239
}
0 commit comments