Skip to content

add graceful restart mechanism for GetWorkStream to prevent DEADLINE_… #34367

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

m-trieu
Copy link
Contributor

@m-trieu m-trieu commented Mar 20, 2025

GetWorkStream was previously terminating due to DEADLINE_EXCEEDED status leading to stream breaks and unnecessary retry spikes in the streaming backend.

  • Add GetWorkStream management loop that gracefully terminates and restarts the stream
  • Extend deadlines of direct streams to 1hr for less churn in restarting the streams internally
  • Add worker token to stream manager thread name

R: @scwhittle @acrites


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@m-trieu
Copy link
Contributor Author

m-trieu commented Mar 23, 2025

R: @scwhittle

Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

@m-trieu
Copy link
Contributor Author

m-trieu commented Mar 24, 2025

assign set of reviewers

Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @johnjcasey added as fallback since no labels match configuration

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@m-trieu
Copy link
Contributor Author

m-trieu commented Mar 25, 2025

Run Java precommit

@m-trieu
Copy link
Contributor Author

m-trieu commented Mar 26, 2025

test failure is unrelated

streamingEngineStreamFactory.createDirectGetWorkStream(
connection,
withRequestBudget(getWorkRequest, getWorkBudget.get()),
streamingEngineThrottleTimers.getWorkThrottleTimer(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we just use the same throttle timer, heartbeat sender, and getdataclientfactory for each stream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamingEngineThrottleTimers.getWorkThrottleTimer() returns the same throttle timer, will use the same for the others.

done

FixedStreamHeartbeatSender.create(getDataStream),
getDataClientFactory.apply(getDataStream),
workCommitter,
workItemScheduler);
// 3 threads, 1 for each stream type (GetWork, GetData, CommitWork).
this.streamStarter =
Executors.newFixedThreadPool(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about newCachedThreadPool? it seems like 2 of these threads are just for start() and then won't be used and we might as well have them go away.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (started.get()) {
getWorkStream.setBudget(budget);
synchronized (activeGetWorkStream) {
GetWorkBudget budget = GetWorkBudget.builder().setItems(items).setBytes(bytes).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this build and setting on the atomic, could be outside synchronized block

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved the build out, i think setting it should still hold the lock since (from other comment) -> setting the budget on a stream can affect how many items/bytes we fetch in the header (and subsequent extension calls).

wanted to simplify the stream mechanics by preventing setBudget() from being called mid stream creation. That way either we start the stream first, and then call set budget which will update the internal stream budget and send an extension (possibly) OR we set the budget first, and start the stream w/ the new budget.

wdyt?

synchronized (activeGetWorkStream) {
GetWorkBudget budget = GetWorkBudget.builder().setItems(items).setBytes(bytes).build();
getWorkBudget.set(budget);
if (isRunning.get()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove running check and just use the null below? seems like if activeGetWorkStream is set it is ok to call and it's one less interleaving to think about

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setBudget on the GetWorkStream object may try to send an extension, should we make the GetWorkStream check isStarted before doing that? or this would prevent that

}

} catch (InterruptedException e) {
// continue until !isRunning.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we instead force that isRunning is set to false here? we don't expect interruptions to happen for any other reason.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

try {
// Try to gracefully terminate the stream.
if (!newStream.awaitTermination(GET_WORK_STREAM_TTL_MINUTES, TimeUnit.MINUTES)) {
newStream.halfClose();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think as we half-close here we probably want to create a new stream to take over.
That way we aren't idle while we're waiting for the termination.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done offloaded closure to a different thread

}

// If graceful termination is unsuccessful, forcefully shutdown.
if (!newStream.awaitTermination(30, TimeUnit.SECONDS)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe better to increase this? If we lose getwork responses then windmill worker has to retry, if we can get them to flush with a little more time that seems fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bumped to 10x to 5 minutes

@@ -189,6 +189,12 @@ private static <T extends AbstractStub<T>> T withDefaultDeadline(T stub) {
return stub.withDeadlineAfter(DEFAULT_STREAM_RPC_DEADLINE_SECONDS, TimeUnit.SECONDS);
}

private static <T extends AbstractStub<T>> T withLongDeadline(T stub) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withDirectPathDeadline?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private final GetWorkStream getWorkStream;
private static final Logger LOG = LoggerFactory.getLogger(WindmillStreamSender.class);
private static final String STREAM_MANAGER_THREAD_NAME_FORMAT = "WindmillStreamManagerThread";
private static final int GET_WORK_STREAM_TTL_MINUTES = 45;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could note that this needs to be less than the deadline in the other file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

? "Waited "
+ totalSecondsWaited
+ "s which exceeds given deadline of "
+ deadlineSeconds
+ inactivityTimeout
+ "s for the outboundObserver to become ready meaning "
+ "that the stream deadline was not respected."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like the wrong message if it isn't the stream deadline

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@github-actions github-actions bot added build and removed build labels Apr 2, 2025
@m-trieu m-trieu force-pushed the mt-getworkstream branch from a3c671d to e7203a9 Compare April 2, 2025 08:30
@github-actions github-actions bot added the build label Apr 2, 2025
@m-trieu m-trieu force-pushed the mt-getworkstream branch from e7203a9 to 1a76711 Compare April 2, 2025 08:31
@github-actions github-actions bot added build and removed build labels Apr 2, 2025
@m-trieu m-trieu force-pushed the mt-getworkstream branch from 1a76711 to d24da6c Compare April 2, 2025 08:33
this.streamingEngineThrottleTimers = StreamingEngineThrottleTimers.create();

// Stream instances connect/reconnect internally, so we can reuse the same instance through the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we have the same problem that the GetData and commitWorkStream will have deadline exceeds?

I'm wondering if it would be better to either improve StreamPool or have a more reusable class like StreamPool for internally reconnecting an AbstractWindmillStream. It woudl also be nice if we didn't have to have an additional thread-per stream just waiting for it terminate.

Copy link
Contributor Author

@m-trieu m-trieu Apr 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about adding an internal mechanism (maybe via another executor to make it async if we dont want the old stream closing to block the new stream creation/stream restart).

Where we add this workflow:

  • create stream
  • start stream
  • wait for termination based on a TTL/timeout
  • loop while(stream.isRunning).

we can add this in AbstractWindmillStream so callers are just exposed to Stream.create(), start(), halfClose()/shutdown() as an interface and not have to worry about the restart mechanism?

another thing we can do if we don't want to awaitTermination on a thread, we can use a scheduledExecutorService to schedule a task that wakes up once every 0.75 * deadline_duration and checks if we need to initiate a restart, although this could add some complexity compared to the approach where we just have a thread where we awaitTermination() -> halfClose() -> shutdown.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can expose a boolean to the constructor that will do this internally or allow the caller to manage the restarts (like in the case of StreamPool)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awaitTermination() uses CountDownLatch underneath, so the thread would not be in a BLOCKED state I believe, just TIMED_WAITING, other threads can be scheduled in the mean time for other tasks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we discuss before adding this functionality directly to AbstractWindmillStream? It already has multiple physical grpc streams beneath it and knows how to restart things if needed. I think we'd have to make some of it's internal state per physical stream but the nice thing is that it would keep the clients simple as they just have a single logical stream.

Currently we have AbstractWindmillStream.halfClose() which means we want to terminate the logical stream and then we halfclose the physical stream (and new ones that we may create). I think that if we added some timeout/recreation internal to the abstractstream we could halfclose on physical stream that we want to teardown but the logical stream is not half-closed. We'd have to maintain a set of the background streams that were half-closed but not yet terminated.

I don't think the thread for management will cause CPU issues as noted since they are just WAITING, but since we can have a large # of windmill workers, I'd rather not just create a thread to block if not needed. We've had problems before where high # of threads use a lot of memory or perhaps slow down stack tracing etc. Some options would be some kind of scheduled future that we cancel if we observe a failure before then, or some maybeRestartStream method that a single thread at a higher level calls on all of the substreams like we do for maybeSendHeartbeats.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm will draft this up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from chat conversation

Opted to use ResettableThrowingStreamObserver to close the streams.

  • A background thread in AbstractWindmillStream calls AbstractWindmillStream.restart() based on the passed in stream TTL.
  • AbstractWindmillStream.restart() calls ResettableThrowingStreamObserver.release() to async close the stream and AbstractWindmillStream.startStream() to reset the ResettableThrowingStreamObserver and call onNewStream() to send headers and flush any pending messages
  • ResettableThrowingStreamObserver.release() offloads the delegate to a queue which is polled by a background thread which calls delegate.onError(InternalStreamTimeout())
  • AbstractWindmillStream.ResponseObserver.onError(...) (which is the delegate) treats InternalStreamTimeout differently than other errors, skipping the call the executor.execute(AbstractWindmillStream.this::startStream), since we already restarted the stream in AbstractWindmillStream.restart()

now the WindmillStreamSender and callers can just call start() and shutdown() to manage the streams.

I only added the directpath changes in this PR, the older implementations still have the pattern of start()/awaitTermination()/halfClose() in cloudpath. The older code just submits a streamTTL w/ a negative time value which is skipped by AbstractWindmillStream when scheduling the background threads.

also added a way to retain the reference to the restart task via the future. I realized we don't want fixed intervals for stream restarts, we want restart time + the timeout, which changes every time we call startStream(). I think this should prevent a bunch of restarts from piling up and/or a restart to be called right after the stream has already restarted via other means.

@github-actions github-actions bot added build and removed build labels Apr 8, 2025
@m-trieu m-trieu force-pushed the mt-getworkstream branch from 57ad0f1 to c21e222 Compare April 9, 2025 03:34
@github-actions github-actions bot added build and removed build labels Apr 9, 2025
@m-trieu m-trieu force-pushed the mt-getworkstream branch from c21e222 to d2e3d70 Compare April 9, 2025 04:12
@github-actions github-actions bot added build and removed build labels Apr 9, 2025
@m-trieu m-trieu force-pushed the mt-getworkstream branch from d2e3d70 to c0b0e6b Compare April 9, 2025 04:54
@github-actions github-actions bot added build and removed build labels Apr 9, 2025
@m-trieu m-trieu force-pushed the mt-getworkstream branch from c0b0e6b to 7cf3fb8 Compare April 9, 2025 05:11
@github-actions github-actions bot added build and removed build labels Apr 9, 2025
@m-trieu
Copy link
Contributor Author

m-trieu commented Apr 10, 2025

back to you @scwhittle thanks!

}

private static String createThreadName(String streamType, String backendWorkerToken) {
private static String createThreadName(
String threadType, String streamType, String backendWorkerToken) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't quite match how we're using it. It looks like above we're passing in (streamType, backendWorkerToken, threadType). It seems like we shouldn't need to hardcode "WindmillStream" in this method either, since it'll be passed in as threadType somewhere?

// pending restarts that have not run since they are used for preventing
// DEADLINE_EXCEEDED. Everytime the stream restarts, this deadline restarts so we want
// the stream restart to run whenever we restart the stream + streamTTL.
scheduleRestart();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this be an appropriate place to call requestObserver.startAsyncStreamCloser()? It feels like this is the point where we know the stream got created so we want to close it eventually (as opposed to other code paths here in startStream where we might give up before actually creating the stream.

@@ -72,6 +73,11 @@
public class GrpcWindmillStreamFactory implements StatusDataProvider {

private static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;

// 15 minutes less than DIRECT_PATH_STREAM_DEADLINE to allow for drains.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see this defined anywhere. Do you mean the value set in withDirectPathDeadline()?

Copy link
Contributor

@acrites acrites left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a few comments. I'm far from an expert on this section of the code though. In general, it'd be really nice if there was some way we could test this more than "run it a bit and check that it's restarted a few times".

Stuff like: someone is reading from the stream when it gets restarted.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants