Skip to content

Commit 9d1b172

Browse files
committed
address PR comments
1 parent 699042c commit 9d1b172

File tree

6 files changed

+108
-92
lines changed

6 files changed

+108
-92
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@
6666
import org.apache.beam.runners.dataflow.worker.windmill.ApplianceWindmillClient;
6767
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
6868
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
69-
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerGrpcFlowControlSettings;
7069
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
7170
import org.apache.beam.runners.dataflow.worker.windmill.appliance.JniWindmillApplianceServer;
7271
import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream;
@@ -90,7 +89,6 @@
9089
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingRemoteStubFactory;
9190
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory;
9291
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel;
93-
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannels;
9492
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory;
9593
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl;
9694
import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottledTimeTracker;
@@ -637,7 +635,6 @@ private static ChannelCachingStubFactory createFanOutStubFactory(
637635
currentFlowControlSettings),
638636
currentFlowControlSettings.getOnReadyThresholdBytes());
639637
});
640-
channelCache.consumeFlowControlSettings(resolveInitialFlowControlSettings(configFetcher));
641638
configFetcher
642639
.getGlobalConfigHandle()
643640
.registerConfigObserver(
@@ -648,22 +645,6 @@ private static ChannelCachingStubFactory createFanOutStubFactory(
648645
return ChannelCachingRemoteStubFactory.create(workerOptions.getGcpCredential(), channelCache);
649646
}
650647

651-
private static UserWorkerGrpcFlowControlSettings resolveInitialFlowControlSettings(
652-
ComputationConfig.Fetcher configFetcher) {
653-
// Default flow control settings will limit directpath throughput. If it is not explicitly
654-
// configured, ignore them.
655-
UserWorkerGrpcFlowControlSettings configuredFlowControlSettings =
656-
configFetcher
657-
.getGlobalConfigHandle()
658-
.getConfig()
659-
.userWorkerJobSettings()
660-
.getFlowControlSettings();
661-
return configuredFlowControlSettings.equals(
662-
UserWorkerGrpcFlowControlSettings.getDefaultInstance())
663-
? WindmillChannels.getDefaultDirectpathFlowControlSettings()
664-
: configuredFlowControlSettings;
665-
}
666-
667648
@VisibleForTesting
668649
static StreamingDataflowWorker forTesting(
669650
Map<String, String> prePopulatedStateNameMappings,

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.concurrent.Executor;
2222
import java.util.concurrent.Executors;
2323
import java.util.concurrent.TimeUnit;
24-
import java.util.function.Function;
2524
import javax.annotation.concurrent.GuardedBy;
2625
import javax.annotation.concurrent.ThreadSafe;
2726
import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
@@ -36,6 +35,7 @@
3635
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalListeners;
3736
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.MoreExecutors;
3837
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
38+
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
3939
import org.slf4j.Logger;
4040
import org.slf4j.LoggerFactory;
4141

@@ -53,6 +53,7 @@ public final class ChannelCache implements StatusDataProvider {
5353
private final LoadingCache<WindmillServiceAddress, ManagedChannel> channelCache;
5454

5555
@GuardedBy("this")
56+
@MonotonicNonNull
5657
private UserWorkerGrpcFlowControlSettings currentFlowControlSettings = null;
5758

5859
private ChannelCache(
@@ -75,8 +76,8 @@ private UserWorkerGrpcFlowControlSettings resolveFlowControlSettings(
7576
if (currentFlowControlSettings == null) {
7677
return addressType
7778
== WindmillServiceAddress.Kind.AUTHENTICATED_GCP_SERVICE_ADDRESS
78-
? WindmillChannels.getDefaultDirectpathFlowControlSettings()
79-
: WindmillChannels.getDefaultCloudpathFlowControlSettings();
79+
? WindmillChannels.DEFAULT_DIRECTPATH_FLOW_CONTROL_SETTINGS
80+
: WindmillChannels.DEFAULT_CLOUDPATH_FLOW_CONTROL_SETTINGS;
8081
}
8182
return currentFlowControlSettings;
8283
}
@@ -93,11 +94,6 @@ public static ChannelCache create(WindmillChannelFactory channelFactory) {
9394
new ThreadFactoryBuilder().setNameFormat("GrpcChannelCloser").build()));
9495
}
9596

96-
public static ChannelCache create(
97-
Function<WindmillServiceAddress, ManagedChannel> channelFactory) {
98-
return create((settings, address) -> channelFactory.apply(address));
99-
}
100-
10197
@VisibleForTesting
10298
public static ChannelCache forTesting(
10399
WindmillChannelFactory channelFactory, Runnable onChannelShutdown) {
@@ -137,12 +133,7 @@ public synchronized void consumeFlowControlSettings(
137133
// only do it when we have received new flow control settings.
138134
LOG.debug("Updating flow control settings {}.", flowControlSettings);
139135
currentFlowControlSettings = flowControlSettings;
140-
channelCache.asMap().keySet().stream()
141-
.filter(
142-
address ->
143-
address.getKind()
144-
== WindmillServiceAddress.Kind.AUTHENTICATED_GCP_SERVICE_ADDRESS)
145-
.forEach(channelCache::refresh);
136+
channelCache.asMap().keySet().forEach(channelCache::refresh);
146137
}
147138
}
148139

@@ -169,8 +160,16 @@ boolean isEmpty() {
169160
public void appendSummaryHtml(PrintWriter writer) {
170161
synchronized (this) {
171162
if (currentFlowControlSettings != null) {
163+
writer.format("Current gRPC flow control settings: [%s]", currentFlowControlSettings);
164+
} else {
165+
writer.format(
166+
"Cloudpath gRPC flow control settings: [%s]",
167+
WindmillChannels.DEFAULT_CLOUDPATH_FLOW_CONTROL_SETTINGS);
168+
writer.write("<br>");
172169
writer.format(
173-
"Current gRPC flow control settings:<br>[%s]<br>", currentFlowControlSettings);
170+
"Directpath gRPC flow control settings: [%s]",
171+
WindmillChannels.DEFAULT_DIRECTPATH_FLOW_CONTROL_SETTINGS);
172+
writer.write("<br>");
174173
}
175174
}
176175
writer.write("Active gRPC Channels:<br>");

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannels.java

Lines changed: 51 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -42,31 +42,31 @@ public final class WindmillChannels {
4242
// 10MiB. Roughly 2x max message size.
4343
private static final int WINDMILL_MIN_FLOW_CONTROL_WINDOW =
4444
NettyChannelBuilder.DEFAULT_FLOW_CONTROL_WINDOW * 10;
45-
45+
public static final UserWorkerGrpcFlowControlSettings DEFAULT_CLOUDPATH_FLOW_CONTROL_SETTINGS =
46+
UserWorkerGrpcFlowControlSettings.newBuilder()
47+
.setFlowControlWindowBytes(WINDMILL_MIN_FLOW_CONTROL_WINDOW)
48+
.setEnableAutoFlowControl(false)
49+
.build();
50+
// 100MiB.
51+
private static final int DIRECTPATH_MIN_FLOW_CONTROL_WINDOW =
52+
WINDMILL_MIN_FLOW_CONTROL_WINDOW * 10;
53+
// 100MiB.
54+
private static final int DIRECTPATH_ON_READY_THRESHOLD = WINDMILL_MIN_FLOW_CONTROL_WINDOW * 10;
4655
// User a bigger flow control window and onready threshold for directpath to prevent churn when
4756
// gRPC is trying to flush gRPCs over the wire. If onReadyThreshold and flowControlWindowBytes are
4857
// too low, it was observed in testing that the gRPC channel can get stuck in a "not-ready" state
4958
// until stream deadline.
50-
private static final UserWorkerGrpcFlowControlSettings DEFAULT_DIRECTPATH_FLOW_CONTROL_SETTINGS =
59+
public static final UserWorkerGrpcFlowControlSettings DEFAULT_DIRECTPATH_FLOW_CONTROL_SETTINGS =
5160
UserWorkerGrpcFlowControlSettings.newBuilder()
52-
// 100MiB.
53-
.setFlowControlWindowBytes(WINDMILL_MIN_FLOW_CONTROL_WINDOW * 10)
54-
// 100MiB.
55-
.setOnReadyThresholdBytes(WINDMILL_MIN_FLOW_CONTROL_WINDOW * 10)
61+
.setFlowControlWindowBytes(DIRECTPATH_MIN_FLOW_CONTROL_WINDOW)
62+
.setOnReadyThresholdBytes(DIRECTPATH_ON_READY_THRESHOLD)
5663
// Prevent gRPC from automatically resizing the window. If we have things to send/receive
57-
// from Windmill we want to do it right way. There are internal pushback mechanisms in the
58-
// user worker and Windmill that attempt to guard the process from OOMing (i.e
64+
// from Windmill we want to do it right away. There are internal pushback mechanisms in
65+
// the user worker and Windmill that attempt to guard the process from OOMing (i.e
5966
// MemoryMonitor.java).
6067
.setEnableAutoFlowControl(false)
6168
.build();
6269

63-
private static final UserWorkerGrpcFlowControlSettings DEFAULT_CLOUDPATH_FLOW_CONTROL_SETTINGS =
64-
UserWorkerGrpcFlowControlSettings.newBuilder()
65-
.setFlowControlWindowBytes(WINDMILL_MIN_FLOW_CONTROL_WINDOW)
66-
.setOnReadyThresholdBytes(WINDMILL_MIN_FLOW_CONTROL_WINDOW)
67-
.setEnableAutoFlowControl(false)
68-
.build();
69-
7070
private WindmillChannels() {}
7171

7272
public static ManagedChannel inProcessChannel(String channelName) {
@@ -87,7 +87,9 @@ public static ManagedChannel remoteChannel(
8787
switch (windmillServiceAddress.getKind()) {
8888
case GCP_SERVICE_ADDRESS:
8989
return remoteChannel(
90-
windmillServiceAddress.gcpServiceAddress(), windmillServiceRpcChannelTimeoutSec);
90+
windmillServiceAddress.gcpServiceAddress(),
91+
windmillServiceRpcChannelTimeoutSec,
92+
flowControlSettings);
9193
case AUTHENTICATED_GCP_SERVICE_ADDRESS:
9294
return remoteDirectChannel(
9395
windmillServiceAddress.authenticatedGcpServiceAddress(),
@@ -100,14 +102,6 @@ public static ManagedChannel remoteChannel(
100102
}
101103
}
102104

103-
public static UserWorkerGrpcFlowControlSettings getDefaultDirectpathFlowControlSettings() {
104-
return DEFAULT_DIRECTPATH_FLOW_CONTROL_SETTINGS;
105-
}
106-
107-
public static UserWorkerGrpcFlowControlSettings getDefaultCloudpathFlowControlSettings() {
108-
return DEFAULT_CLOUDPATH_FLOW_CONTROL_SETTINGS;
109-
}
110-
111105
private static ManagedChannel remoteDirectChannel(
112106
AuthenticatedGcpServiceAddress authenticatedGcpServiceAddress,
113107
int windmillServiceRpcChannelTimeoutSec,
@@ -121,28 +115,45 @@ private static ManagedChannel remoteDirectChannel(
121115
new AltsChannelCredentials.Builder().build())
122116
.overrideAuthority(authenticatedGcpServiceAddress.authenticatingService()),
123117
windmillServiceRpcChannelTimeoutSec);
124-
int flowControlWindowSizeBytes =
125-
Math.max(WINDMILL_MIN_FLOW_CONTROL_WINDOW, flowControlSettings.getFlowControlWindowBytes());
126-
return flowControlSettings.getEnableAutoFlowControl()
127-
? channelBuilder.initialFlowControlWindow(flowControlWindowSizeBytes).build()
128-
: channelBuilder.flowControlWindow(flowControlWindowSizeBytes).build();
118+
119+
return withFlowControlSettings(
120+
channelBuilder, flowControlSettings, DIRECTPATH_MIN_FLOW_CONTROL_WINDOW)
121+
.build();
129122
}
130123

131124
public static ManagedChannel remoteChannel(
132-
HostAndPort endpoint, int windmillServiceRpcChannelTimeoutSec) {
133-
return withDefaultChannelOptions(
134-
NettyChannelBuilder.forAddress(
135-
endpoint.getHost(),
136-
endpoint.hasPort()
137-
? endpoint.getPort()
138-
: WindmillEndpoints.DEFAULT_WINDMILL_SERVICE_PORT),
139-
windmillServiceRpcChannelTimeoutSec)
140-
.negotiationType(NegotiationType.TLS)
141-
.sslContext(dataflowGrpcSslContext(endpoint))
142-
.flowControlWindow(WINDMILL_MIN_FLOW_CONTROL_WINDOW)
125+
HostAndPort endpoint,
126+
int windmillServiceRpcChannelTimeoutSec,
127+
UserWorkerGrpcFlowControlSettings flowControlSettings) {
128+
NettyChannelBuilder channelBuilder =
129+
withDefaultChannelOptions(
130+
NettyChannelBuilder.forAddress(
131+
endpoint.getHost(),
132+
endpoint.hasPort()
133+
? endpoint.getPort()
134+
: WindmillEndpoints.DEFAULT_WINDMILL_SERVICE_PORT),
135+
windmillServiceRpcChannelTimeoutSec)
136+
.negotiationType(NegotiationType.TLS)
137+
.sslContext(dataflowGrpcSslContext(endpoint));
138+
139+
return withFlowControlSettings(
140+
channelBuilder, flowControlSettings, WINDMILL_MIN_FLOW_CONTROL_WINDOW)
143141
.build();
144142
}
145143

144+
private static NettyChannelBuilder withFlowControlSettings(
145+
NettyChannelBuilder channelBuilder,
146+
UserWorkerGrpcFlowControlSettings flowControlSettings,
147+
int defaultFlowControlBytes) {
148+
int flowControlWindowSizeBytes =
149+
Math.max(defaultFlowControlBytes, flowControlSettings.getFlowControlWindowBytes());
150+
return flowControlSettings.getEnableAutoFlowControl()
151+
// Enable auto flow control with an initial window of flowControlWindowSizeBytes. gRPC may
152+
// resize this value based on throughput.
153+
? channelBuilder.initialFlowControlWindow(flowControlWindowSizeBytes)
154+
: channelBuilder.flowControlWindow(flowControlWindowSizeBytes);
155+
}
156+
146157
@SuppressWarnings("nullness")
147158
private static SslContext dataflowGrpcSslContext(HostAndPort endpoint) {
148159
try {

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactoryImpl.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,9 @@
2020
import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannels.remoteChannel;
2121

2222
import com.google.auth.Credentials;
23-
import java.util.function.Function;
2423
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
25-
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
26-
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ManagedChannel;
2724

2825
public class WindmillStubFactoryFactoryImpl implements WindmillStubFactoryFactory {
29-
3026
private final int windmillServiceRpcChannelAliveTimeoutSec;
3127
private final Credentials gcpCredential;
3228

@@ -38,19 +34,23 @@ public WindmillStubFactoryFactoryImpl(DataflowWorkerHarnessOptions workerOptions
3834

3935
@Override
4036
public WindmillStubFactory makeWindmillStubFactory(boolean useIsolatedChannels) {
41-
Function<WindmillServiceAddress, ManagedChannel> channelFactory =
42-
serviceAddress ->
43-
remoteChannel(
44-
serviceAddress.getServiceAddress(), windmillServiceRpcChannelAliveTimeoutSec);
4537
ChannelCache channelCache =
4638
ChannelCache.create(
47-
(ignored, serviceAddress) ->
39+
(flowControlSettings, serviceAddress) ->
4840
// IsolationChannel will create and manage separate RPC channels to the same
4941
// serviceAddress via calling the channelFactory, else just directly return the
5042
// RPC channel.
5143
useIsolatedChannels
52-
? IsolationChannel.create(() -> channelFactory.apply(serviceAddress))
53-
: channelFactory.apply(serviceAddress));
44+
? IsolationChannel.create(
45+
() ->
46+
remoteChannel(
47+
serviceAddress.getServiceAddress(),
48+
windmillServiceRpcChannelAliveTimeoutSec,
49+
flowControlSettings))
50+
: remoteChannel(
51+
serviceAddress.getServiceAddress(),
52+
windmillServiceRpcChannelAliveTimeoutSec,
53+
flowControlSettings));
5454
return ChannelCachingRemoteStubFactory.create(gcpCredential, channelCache);
5555
}
5656
}

0 commit comments

Comments
 (0)