-
Notifications
You must be signed in to change notification settings - Fork 4.3k
add graceful restart mechanism for GetWorkStream to prevent DEADLINE_… #34367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
d4fbaf0
to
362dd32
Compare
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
362dd32
to
e746fd0
Compare
R: @scwhittle |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
assign set of reviewers |
Assigning reviewers. If you would like to opt out of this review, comment R: @johnjcasey added as fallback since no labels match configuration Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Run Java precommit |
test failure is unrelated |
streamingEngineStreamFactory.createDirectGetWorkStream( | ||
connection, | ||
withRequestBudget(getWorkRequest, getWorkBudget.get()), | ||
streamingEngineThrottleTimers.getWorkThrottleTimer(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we just use the same throttle timer, heartbeat sender, and getdataclientfactory for each stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
streamingEngineThrottleTimers.getWorkThrottleTimer()
returns the same throttle timer, will use the same for the others.
done
FixedStreamHeartbeatSender.create(getDataStream), | ||
getDataClientFactory.apply(getDataStream), | ||
workCommitter, | ||
workItemScheduler); | ||
// 3 threads, 1 for each stream type (GetWork, GetData, CommitWork). | ||
this.streamStarter = | ||
Executors.newFixedThreadPool( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about newCachedThreadPool? it seems like 2 of these threads are just for start() and then won't be used and we might as well have them go away.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
if (started.get()) { | ||
getWorkStream.setBudget(budget); | ||
synchronized (activeGetWorkStream) { | ||
GetWorkBudget budget = GetWorkBudget.builder().setItems(items).setBytes(bytes).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this build and setting on the atomic, could be outside synchronized block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved the build out, i think setting it should still hold the lock since (from other comment) -> setting the budget on a stream can affect how many items/bytes we fetch in the header (and subsequent extension calls).
wanted to simplify the stream mechanics by preventing setBudget() from being called mid stream creation. That way either we start the stream first, and then call set budget which will update the internal stream budget and send an extension (possibly) OR we set the budget first, and start the stream w/ the new budget.
wdyt?
synchronized (activeGetWorkStream) { | ||
GetWorkBudget budget = GetWorkBudget.builder().setItems(items).setBytes(bytes).build(); | ||
getWorkBudget.set(budget); | ||
if (isRunning.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove running check and just use the null below? seems like if activeGetWorkStream is set it is ok to call and it's one less interleaving to think about
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setBudget on the GetWorkStream
object may try to send an extension, should we make the GetWorkStream
check isStarted before doing that? or this would prevent that
} | ||
|
||
} catch (InterruptedException e) { | ||
// continue until !isRunning. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we instead force that isRunning is set to false here? we don't expect interruptions to happen for any other reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
try { | ||
// Try to gracefully terminate the stream. | ||
if (!newStream.awaitTermination(GET_WORK_STREAM_TTL_MINUTES, TimeUnit.MINUTES)) { | ||
newStream.halfClose(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think as we half-close here we probably want to create a new stream to take over.
That way we aren't idle while we're waiting for the termination.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done offloaded closure to a different thread
} | ||
|
||
// If graceful termination is unsuccessful, forcefully shutdown. | ||
if (!newStream.awaitTermination(30, TimeUnit.SECONDS)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe better to increase this? If we lose getwork responses then windmill worker has to retry, if we can get them to flush with a little more time that seems fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bumped to 10x to 5 minutes
@@ -189,6 +189,12 @@ private static <T extends AbstractStub<T>> T withDefaultDeadline(T stub) { | |||
return stub.withDeadlineAfter(DEFAULT_STREAM_RPC_DEADLINE_SECONDS, TimeUnit.SECONDS); | |||
} | |||
|
|||
private static <T extends AbstractStub<T>> T withLongDeadline(T stub) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withDirectPathDeadline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
private final GetWorkStream getWorkStream; | ||
private static final Logger LOG = LoggerFactory.getLogger(WindmillStreamSender.class); | ||
private static final String STREAM_MANAGER_THREAD_NAME_FORMAT = "WindmillStreamManagerThread"; | ||
private static final int GET_WORK_STREAM_TTL_MINUTES = 45; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could note that this needs to be less than the deadline in the other file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
? "Waited " | ||
+ totalSecondsWaited | ||
+ "s which exceeds given deadline of " | ||
+ deadlineSeconds | ||
+ inactivityTimeout | ||
+ "s for the outboundObserver to become ready meaning " | ||
+ "that the stream deadline was not respected." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems like the wrong message if it isn't the stream deadline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
a3c671d
to
e7203a9
Compare
e7203a9
to
1a76711
Compare
1a76711
to
d24da6c
Compare
this.streamingEngineThrottleTimers = StreamingEngineThrottleTimers.create(); | ||
|
||
// Stream instances connect/reconnect internally, so we can reuse the same instance through the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we have the same problem that the GetData and commitWorkStream will have deadline exceeds?
I'm wondering if it would be better to either improve StreamPool or have a more reusable class like StreamPool for internally reconnecting an AbstractWindmillStream. It woudl also be nice if we didn't have to have an additional thread-per stream just waiting for it terminate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about adding an internal mechanism (maybe via another executor to make it async if we dont want the old stream closing to block the new stream creation/stream restart).
Where we add this workflow:
- create stream
- start stream
- wait for termination based on a TTL/timeout
- loop while(stream.isRunning).
we can add this in AbstractWindmillStream
so callers are just exposed to Stream.create()
, start()
, halfClose()/shutdown()
as an interface and not have to worry about the restart mechanism?
another thing we can do if we don't want to awaitTermination
on a thread, we can use a scheduledExecutorService to schedule a task that wakes up once every 0.75 * deadline_duration
and checks if we need to initiate a restart, although this could add some complexity compared to the approach where we just have a thread where we awaitTermination() -> halfClose() -> shutdown
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can expose a boolean to the constructor that will do this internally or allow the caller to manage the restarts (like in the case of StreamPool
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
awaitTermination() uses CountDownLatch
underneath, so the thread would not be in a BLOCKED
state I believe, just TIMED_WAITING
, other threads can be scheduled in the mean time for other tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did we discuss before adding this functionality directly to AbstractWindmillStream? It already has multiple physical grpc streams beneath it and knows how to restart things if needed. I think we'd have to make some of it's internal state per physical stream but the nice thing is that it would keep the clients simple as they just have a single logical stream.
Currently we have AbstractWindmillStream.halfClose() which means we want to terminate the logical stream and then we halfclose the physical stream (and new ones that we may create). I think that if we added some timeout/recreation internal to the abstractstream we could halfclose on physical stream that we want to teardown but the logical stream is not half-closed. We'd have to maintain a set of the background streams that were half-closed but not yet terminated.
I don't think the thread for management will cause CPU issues as noted since they are just WAITING, but since we can have a large # of windmill workers, I'd rather not just create a thread to block if not needed. We've had problems before where high # of threads use a lot of memory or perhaps slow down stack tracing etc. Some options would be some kind of scheduled future that we cancel if we observe a failure before then, or some maybeRestartStream method that a single thread at a higher level calls on all of the substreams like we do for maybeSendHeartbeats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sgtm will draft this up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from chat conversation
Opted to use ResettableThrowingStreamObserver
to close the streams.
- A background thread in
AbstractWindmillStream
callsAbstractWindmillStream.restart()
based on the passed in stream TTL. AbstractWindmillStream.restart()
callsResettableThrowingStreamObserver.release()
to async close the stream andAbstractWindmillStream.startStream()
to reset theResettableThrowingStreamObserver
and callonNewStream()
to send headers and flush any pending messagesResettableThrowingStreamObserver.release()
offloads the delegate to a queue which is polled by a background thread which callsdelegate.onError(InternalStreamTimeout())
AbstractWindmillStream.ResponseObserver.onError(...)
(which is the delegate) treatsInternalStreamTimeout
differently than other errors, skipping the call theexecutor.execute(AbstractWindmillStream.this::startStream)
, since we already restarted the stream inAbstractWindmillStream.restart()
now the WindmillStreamSender
and callers can just call start() and shutdown() to manage the streams.
I only added the directpath changes in this PR, the older implementations still have the pattern of start()/awaitTermination()/halfClose()
in cloudpath. The older code just submits a streamTTL w/ a negative time value which is skipped by AbstractWindmillStream
when scheduling the background threads.
also added a way to retain the reference to the restart task via the future. I realized we don't want fixed intervals for stream restarts, we want restart time + the timeout, which changes every time we call startStream(). I think this should prevent a bunch of restarts from piling up and/or a restart to be called right after the stream has already restarted via other means.
57ad0f1
to
c21e222
Compare
c21e222
to
d2e3d70
Compare
d2e3d70
to
c0b0e6b
Compare
…EXCEEDED; add long deadlines of 1hr to direct streams
c0b0e6b
to
7cf3fb8
Compare
7cf3fb8
to
0cc695c
Compare
back to you @scwhittle thanks! |
} | ||
|
||
private static String createThreadName(String streamType, String backendWorkerToken) { | ||
private static String createThreadName( | ||
String threadType, String streamType, String backendWorkerToken) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't quite match how we're using it. It looks like above we're passing in (streamType, backendWorkerToken, threadType)
. It seems like we shouldn't need to hardcode "WindmillStream" in this method either, since it'll be passed in as threadType somewhere?
// pending restarts that have not run since they are used for preventing | ||
// DEADLINE_EXCEEDED. Everytime the stream restarts, this deadline restarts so we want | ||
// the stream restart to run whenever we restart the stream + streamTTL. | ||
scheduleRestart(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this be an appropriate place to call requestObserver.startAsyncStreamCloser()
? It feels like this is the point where we know the stream got created so we want to close it eventually (as opposed to other code paths here in startStream
where we might give up before actually creating the stream.
@@ -72,6 +73,11 @@ | |||
public class GrpcWindmillStreamFactory implements StatusDataProvider { | |||
|
|||
private static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300; | |||
|
|||
// 15 minutes less than DIRECT_PATH_STREAM_DEADLINE to allow for drains. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't see this defined anywhere. Do you mean the value set in withDirectPathDeadline()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a few comments. I'm far from an expert on this section of the code though. In general, it'd be really nice if there was some way we could test this more than "run it a bit and check that it's restarted a few times".
Stuff like: someone is reading from the stream when it gets restarted.
GetWorkStream was previously terminating due to
DEADLINE_EXCEEDED
status leading to stream breaks and unnecessary retry spikes in the streaming backend.R: @scwhittle @acrites
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.