Skip to content

consume flow control settings updates from job settings #34539

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 16, 2025

Conversation

m-trieu
Copy link
Contributor

@m-trieu m-trieu commented Apr 3, 2025

consume flow control settings updates from job settings, lazily create streams/channels to use new flow control settings

  • propogate current flow control settings to channel creation
  • consume new flow control settings in ChannelCache
  • lazily recreate streams in WindmillConnection to pick up new flow control settings
  • update tests

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

…e streams/channels to use new flow control settings
@m-trieu
Copy link
Contributor Author

m-trieu commented Apr 3, 2025

R: @scwhittle

Copy link
Contributor

github-actions bot commented Apr 3, 2025

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

@m-trieu
Copy link
Contributor Author

m-trieu commented Apr 4, 2025

@scwhittle this may lead to some retries in Windmill after we break the streams but it should not happen often as it is controlled by explicit updates to the flow control settings pushed from the Windmill backend.

maintain default settings for Cloudpath

remoteChannel(
serviceAddress,
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec()))));
serviceAddress -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we pass in the cache's view of the settings instead?

It seems like it could be simpler to just have the single watcher for settings instead of also fetching here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


public abstract Builder setStub(CloudWindmillServiceV1Alpha1Stub stub);
public abstract Builder setStubFactory(Supplier<CloudWindmillServiceV1Alpha1Stub> stubFactory);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we name Supplier? factory implies it is making something new, where supplier could just be returning a cached stub (which is what we want if it hasn't been modified)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

public abstract CloudWindmillServiceV1Alpha1Stub stub();
abstract Supplier<CloudWindmillServiceV1Alpha1Stub> stubFactory();

public final CloudWindmillServiceV1Alpha1Stub newStub() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's a supplier, I think currentStub() or something could be better name since new implies it is freshly allocated and not reused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

currentFlowControlSettings),
currentFlowControlSettings.getOnReadyThresholdBytes());
},
configFetcher
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto seems simpler if we remove this constructor param, and have the registeredconfig observer trigger and update the flow control settings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, also set some defaults

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -107,6 +122,23 @@ public ManagedChannel get(WindmillServiceAddress windmillServiceAddress) {
return channelCache.getUnchecked(windmillServiceAddress);
}

public synchronized void consumeFlowControlSettings(
UserWorkerGrpcFlowControlSettings flowControlSettings) {
if (!flowControlSettings.equals(currentFlowControlSettings)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way to check equilvalence instead of equals()? if field is explicilty 10MB versus default of 10MB we don't want to do anything either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, i also added a default flow control setting for directpath. equals in java compares the fields.

@@ -122,8 +135,7 @@ private static NettyChannelBuilder withDefaultChannelOptions(
.maxInboundMessageSize(Integer.MAX_VALUE)
.maxTraceEvents(MAX_REMOTE_TRACE_EVENTS)
// 1MiB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rm comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

.build();
windmillServiceRpcChannelTimeoutSec);
int flowControlWindowSizeBytes =
Math.max(WINDMILL_MAX_FLOW_CONTROL_WINDOW, flowControlSettings.getFlowControlWindowBytes());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like it should be WINDMILL_MIN_FLOW_CONTROL_WINDOW

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -36,6 +37,8 @@
public final class WindmillChannelFactory {
public static final String LOCALHOST = "localhost";
private static final int MAX_REMOTE_TRACE_EVENTS = 100;
// 1MiB.
private static final int MAX_INBOUND_METADATA_SIZE_BYTES = 1024 * 1024;
// 10MiB.
private static final int WINDMILL_MAX_FLOW_CONTROL_WINDOW =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could add comment that this is chosen to be greater than 2 * max message size

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// 100MiB.
.setOnReadyThresholdBytes(WINDMILL_MIN_FLOW_CONTROL_WINDOW * 10)
// Prevent gRPC from automatically resizing the window. If we have things to send/receive
// from Windmill we want to do it right way. There are internal pushback mechanisms in the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right away

does autosizing allow specifying a minimum? is the initial a minimum?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

https://grpc.github.io/grpc-java/javadoc/io/grpc/netty/NettyServerBuilder.html#initialFlowControlWindow(int)
i don't think so, just the initial window size. the framework may resize it.

@@ -76,9 +93,14 @@ public static ChannelCache create(
new ThreadFactoryBuilder().setNameFormat("GrpcChannelCloser").build()));
}

public static ChannelCache create(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe better to remove this and update callers to ignore settings? then we can see at that call-site easier that they are present but ignored

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// 100MiB.
.setFlowControlWindowBytes(WINDMILL_MIN_FLOW_CONTROL_WINDOW * 10)
// 100MiB.
.setOnReadyThresholdBytes(WINDMILL_MIN_FLOW_CONTROL_WINDOW * 10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about a different constant used here and below for onready threshold. Can set them to the same value for now but less brittle to change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

switch (windmillServiceAddress.getKind()) {
case GCP_SERVICE_ADDRESS:
return remoteChannel(
windmillServiceAddress.gcpServiceAddress(), windmillServiceRpcChannelTimeoutSec);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like the DEFAULT_CLOUDPATH_FLOW_CONTROL_SETTINGS aren't being used if not passed here.

Seems like we should pass in and use in remoteChannel when configuring. That would let us play around with the settings for non-direct jobs also.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
}

public static UserWorkerGrpcFlowControlSettings getDefaultDirectpathFlowControlSettings() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just make the constants public and remove the method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

AtomicInteger newChannelsCreated = new AtomicInteger();
cache =
ChannelCache.forTesting(
(a, b) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like you should keep track of what the settings passed in are, to make sure the plumbing is right. for example could add the settings to some concurrentqueue and then validate below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, we only call once so using AtomicRef

cache =
ChannelCache.forTesting(
(a, b) -> {
ManagedChannel channel = newChannel(channelName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -45,7 +49,7 @@ public class ChannelCacheTest {

private static ChannelCache newCache(
Function<WindmillServiceAddress, ManagedChannel> channelFactory) {
return ChannelCache.forTesting(channelFactory, () -> {});
return ChannelCache.forTesting((ignored, address) -> channelFactory.apply(address), () -> {});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignoredFlowControlSettings

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -108,7 +112,7 @@ public void testRemoveAndClose() throws InterruptedException {
CountDownLatch notifyWhenChannelClosed = new CountDownLatch(1);
cache =
ChannelCache.forTesting(
ignored -> newChannel(channelName), notifyWhenChannelClosed::countDown);
(a, b) -> newChannel(channelName), notifyWhenChannelClosed::countDown);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace a,b with ignoredFlowControlSettings, ignoredAddress throughout

I'd rather have test be a little wordier than confusing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

currentFlowControlSettings),
currentFlowControlSettings.getOnReadyThresholdBytes());
});
channelCache.consumeFlowControlSettings(resolveInitialFlowControlSettings(configFetcher));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this line and the resolve method in this file can be removed, the cache internally can resolve unset or default settings appropriately. the fetcher observer should notify if there are overridden settings

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@m-trieu m-trieu force-pushed the mt-grpc-flow-control branch from e14a033 to 9d1b172 Compare April 10, 2025 07:53
@m-trieu
Copy link
Contributor Author

m-trieu commented Apr 10, 2025

back to you @scwhittle thanks!

Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just minor cleanup comment

@@ -169,8 +160,16 @@ boolean isEmpty() {
public void appendSummaryHtml(PrintWriter writer) {
synchronized (this) {
if (currentFlowControlSettings != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about making resolveFlowControlSettings a private member of this class so this method and the cache loader can share it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm, im going to modify the code to remove WindmillStubFactoryFactory after this PR is submitted now that we have the ChannelCache that can reload based on job settings updates.

@m-trieu
Copy link
Contributor Author

m-trieu commented Apr 14, 2025

assign set of reviewers

Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @damccorm added as fallback since no labels match configuration

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link

codecov bot commented Apr 14, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 56.42%. Comparing base (cad045b) to head (6ff995b).
Report is 95 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master   #34539      +/-   ##
============================================
- Coverage     56.42%   56.42%   -0.01%     
  Complexity     3285     3285              
============================================
  Files          1177     1177              
  Lines        180098   180097       -1     
  Branches       3398     3398              
============================================
- Hits         101620   101619       -1     
  Misses        75216    75216              
  Partials       3262     3262              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@m-trieu
Copy link
Contributor Author

m-trieu commented Apr 14, 2025

hi @damccorm! This is ready for a merge (see #34539 (review)), Sam is just OOO.

Thank you!

writer.write("<br>");
}
}
writer.format(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this now just write the same string twice if currentFlowControlSettings is not null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

@damccorm damccorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

@damccorm damccorm merged commit 9331662 into apache:master Apr 16, 2025
15 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants