-
Notifications
You must be signed in to change notification settings - Fork 4.3k
consume flow control settings updates from job settings #34539
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…e streams/channels to use new flow control settings
R: @scwhittle |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
@scwhittle this may lead to some retries in Windmill after we break the streams but it should not happen often as it is controlled by explicit updates to the flow control settings pushed from the Windmill backend. maintain default settings for Cloudpath |
remoteChannel( | ||
serviceAddress, | ||
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec())))); | ||
serviceAddress -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we pass in the cache's view of the settings instead?
It seems like it could be simpler to just have the single watcher for settings instead of also fetching here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
public abstract Builder setStub(CloudWindmillServiceV1Alpha1Stub stub); | ||
public abstract Builder setStubFactory(Supplier<CloudWindmillServiceV1Alpha1Stub> stubFactory); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we name Supplier? factory implies it is making something new, where supplier could just be returning a cached stub (which is what we want if it hasn't been modified)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
public abstract CloudWindmillServiceV1Alpha1Stub stub(); | ||
abstract Supplier<CloudWindmillServiceV1Alpha1Stub> stubFactory(); | ||
|
||
public final CloudWindmillServiceV1Alpha1Stub newStub() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it's a supplier, I think currentStub() or something could be better name since new implies it is freshly allocated and not reused
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
currentFlowControlSettings), | ||
currentFlowControlSettings.getOnReadyThresholdBytes()); | ||
}, | ||
configFetcher |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto seems simpler if we remove this constructor param, and have the registeredconfig observer trigger and update the flow control settings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, also set some defaults
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -107,6 +122,23 @@ public ManagedChannel get(WindmillServiceAddress windmillServiceAddress) { | |||
return channelCache.getUnchecked(windmillServiceAddress); | |||
} | |||
|
|||
public synchronized void consumeFlowControlSettings( | |||
UserWorkerGrpcFlowControlSettings flowControlSettings) { | |||
if (!flowControlSettings.equals(currentFlowControlSettings)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a way to check equilvalence instead of equals()? if field is explicilty 10MB versus default of 10MB we don't want to do anything either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, i also added a default flow control setting for directpath. equals in java compares the fields.
@@ -122,8 +135,7 @@ private static NettyChannelBuilder withDefaultChannelOptions( | |||
.maxInboundMessageSize(Integer.MAX_VALUE) | |||
.maxTraceEvents(MAX_REMOTE_TRACE_EVENTS) | |||
// 1MiB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rm comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
.build(); | ||
windmillServiceRpcChannelTimeoutSec); | ||
int flowControlWindowSizeBytes = | ||
Math.max(WINDMILL_MAX_FLOW_CONTROL_WINDOW, flowControlSettings.getFlowControlWindowBytes()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like it should be WINDMILL_MIN_FLOW_CONTROL_WINDOW
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -36,6 +37,8 @@ | |||
public final class WindmillChannelFactory { | |||
public static final String LOCALHOST = "localhost"; | |||
private static final int MAX_REMOTE_TRACE_EVENTS = 100; | |||
// 1MiB. | |||
private static final int MAX_INBOUND_METADATA_SIZE_BYTES = 1024 * 1024; | |||
// 10MiB. | |||
private static final int WINDMILL_MAX_FLOW_CONTROL_WINDOW = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could add comment that this is chosen to be greater than 2 * max message size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// 100MiB. | ||
.setOnReadyThresholdBytes(WINDMILL_MIN_FLOW_CONTROL_WINDOW * 10) | ||
// Prevent gRPC from automatically resizing the window. If we have things to send/receive | ||
// from Windmill we want to do it right way. There are internal pushback mechanisms in the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right away
does autosizing allow specifying a minimum? is the initial a minimum?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
https://grpc.github.io/grpc-java/javadoc/io/grpc/netty/NettyServerBuilder.html#initialFlowControlWindow(int)
i don't think so, just the initial window size. the framework may resize it.
@@ -76,9 +93,14 @@ public static ChannelCache create( | |||
new ThreadFactoryBuilder().setNameFormat("GrpcChannelCloser").build())); | |||
} | |||
|
|||
public static ChannelCache create( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe better to remove this and update callers to ignore settings? then we can see at that call-site easier that they are present but ignored
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// 100MiB. | ||
.setFlowControlWindowBytes(WINDMILL_MIN_FLOW_CONTROL_WINDOW * 10) | ||
// 100MiB. | ||
.setOnReadyThresholdBytes(WINDMILL_MIN_FLOW_CONTROL_WINDOW * 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about a different constant used here and below for onready threshold. Can set them to the same value for now but less brittle to change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
switch (windmillServiceAddress.getKind()) { | ||
case GCP_SERVICE_ADDRESS: | ||
return remoteChannel( | ||
windmillServiceAddress.gcpServiceAddress(), windmillServiceRpcChannelTimeoutSec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like the DEFAULT_CLOUDPATH_FLOW_CONTROL_SETTINGS aren't being used if not passed here.
Seems like we should pass in and use in remoteChannel when configuring. That would let us play around with the settings for non-direct jobs also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
} | ||
|
||
public static UserWorkerGrpcFlowControlSettings getDefaultDirectpathFlowControlSettings() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just make the constants public and remove the method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
AtomicInteger newChannelsCreated = new AtomicInteger(); | ||
cache = | ||
ChannelCache.forTesting( | ||
(a, b) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like you should keep track of what the settings passed in are, to make sure the plumbing is right. for example could add the settings to some concurrentqueue and then validate below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, we only call once so using AtomicRef
cache = | ||
ChannelCache.forTesting( | ||
(a, b) -> { | ||
ManagedChannel channel = newChannel(channelName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -45,7 +49,7 @@ public class ChannelCacheTest { | |||
|
|||
private static ChannelCache newCache( | |||
Function<WindmillServiceAddress, ManagedChannel> channelFactory) { | |||
return ChannelCache.forTesting(channelFactory, () -> {}); | |||
return ChannelCache.forTesting((ignored, address) -> channelFactory.apply(address), () -> {}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ignoredFlowControlSettings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -108,7 +112,7 @@ public void testRemoveAndClose() throws InterruptedException { | |||
CountDownLatch notifyWhenChannelClosed = new CountDownLatch(1); | |||
cache = | |||
ChannelCache.forTesting( | |||
ignored -> newChannel(channelName), notifyWhenChannelClosed::countDown); | |||
(a, b) -> newChannel(channelName), notifyWhenChannelClosed::countDown); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replace a,b with ignoredFlowControlSettings, ignoredAddress throughout
I'd rather have test be a little wordier than confusing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
currentFlowControlSettings), | ||
currentFlowControlSettings.getOnReadyThresholdBytes()); | ||
}); | ||
channelCache.consumeFlowControlSettings(resolveInitialFlowControlSettings(configFetcher)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this line and the resolve method in this file can be removed, the cache internally can resolve unset or default settings appropriately. the fetcher observer should notify if there are overridden settings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
e14a033
to
9d1b172
Compare
back to you @scwhittle thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just minor cleanup comment
@@ -169,8 +160,16 @@ boolean isEmpty() { | |||
public void appendSummaryHtml(PrintWriter writer) { | |||
synchronized (this) { | |||
if (currentFlowControlSettings != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about making resolveFlowControlSettings a private member of this class so this method and the cache loader can share it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sgtm, im going to modify the code to remove WindmillStubFactoryFactory
after this PR is submitted now that we have the ChannelCache
that can reload based on job settings updates.
assign set of reviewers |
Assigning reviewers. If you would like to opt out of this review, comment R: @damccorm added as fallback since no labels match configuration Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #34539 +/- ##
============================================
- Coverage 56.42% 56.42% -0.01%
Complexity 3285 3285
============================================
Files 1177 1177
Lines 180098 180097 -1
Branches 3398 3398
============================================
- Hits 101620 101619 -1
Misses 75216 75216
Partials 3262 3262 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
hi @damccorm! This is ready for a merge (see #34539 (review)), Sam is just OOO. Thank you! |
writer.write("<br>"); | ||
} | ||
} | ||
writer.format( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this now just write the same string twice if currentFlowControlSettings
is not null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks
consume flow control settings updates from job settings, lazily create streams/channels to use new flow control settings
ChannelCache
WindmillConnection
to pick up new flow control settingsThank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.