@@ -42,31 +42,31 @@ public final class WindmillChannels {
42
42
// 10MiB. Roughly 2x max message size.
43
43
private static final int WINDMILL_MIN_FLOW_CONTROL_WINDOW =
44
44
NettyChannelBuilder .DEFAULT_FLOW_CONTROL_WINDOW * 10 ;
45
-
45
+ public static final UserWorkerGrpcFlowControlSettings DEFAULT_CLOUDPATH_FLOW_CONTROL_SETTINGS =
46
+ UserWorkerGrpcFlowControlSettings .newBuilder ()
47
+ .setFlowControlWindowBytes (WINDMILL_MIN_FLOW_CONTROL_WINDOW )
48
+ .setEnableAutoFlowControl (false )
49
+ .build ();
50
+ // 100MiB.
51
+ private static final int DIRECTPATH_MIN_FLOW_CONTROL_WINDOW =
52
+ WINDMILL_MIN_FLOW_CONTROL_WINDOW * 10 ;
53
+ // 100MiB.
54
+ private static final int DIRECTPATH_ON_READY_THRESHOLD = WINDMILL_MIN_FLOW_CONTROL_WINDOW * 10 ;
46
55
// User a bigger flow control window and onready threshold for directpath to prevent churn when
47
56
// gRPC is trying to flush gRPCs over the wire. If onReadyThreshold and flowControlWindowBytes are
48
57
// too low, it was observed in testing that the gRPC channel can get stuck in a "not-ready" state
49
58
// until stream deadline.
50
- private static final UserWorkerGrpcFlowControlSettings DEFAULT_DIRECTPATH_FLOW_CONTROL_SETTINGS =
59
+ public static final UserWorkerGrpcFlowControlSettings DEFAULT_DIRECTPATH_FLOW_CONTROL_SETTINGS =
51
60
UserWorkerGrpcFlowControlSettings .newBuilder ()
52
- // 100MiB.
53
- .setFlowControlWindowBytes (WINDMILL_MIN_FLOW_CONTROL_WINDOW * 10 )
54
- // 100MiB.
55
- .setOnReadyThresholdBytes (WINDMILL_MIN_FLOW_CONTROL_WINDOW * 10 )
61
+ .setFlowControlWindowBytes (DIRECTPATH_MIN_FLOW_CONTROL_WINDOW )
62
+ .setOnReadyThresholdBytes (DIRECTPATH_ON_READY_THRESHOLD )
56
63
// Prevent gRPC from automatically resizing the window. If we have things to send/receive
57
- // from Windmill we want to do it right way . There are internal pushback mechanisms in the
58
- // user worker and Windmill that attempt to guard the process from OOMing (i.e
64
+ // from Windmill we want to do it right away . There are internal pushback mechanisms in
65
+ // the user worker and Windmill that attempt to guard the process from OOMing (i.e
59
66
// MemoryMonitor.java).
60
67
.setEnableAutoFlowControl (false )
61
68
.build ();
62
69
63
- private static final UserWorkerGrpcFlowControlSettings DEFAULT_CLOUDPATH_FLOW_CONTROL_SETTINGS =
64
- UserWorkerGrpcFlowControlSettings .newBuilder ()
65
- .setFlowControlWindowBytes (WINDMILL_MIN_FLOW_CONTROL_WINDOW )
66
- .setOnReadyThresholdBytes (WINDMILL_MIN_FLOW_CONTROL_WINDOW )
67
- .setEnableAutoFlowControl (false )
68
- .build ();
69
-
70
70
private WindmillChannels () {}
71
71
72
72
public static ManagedChannel inProcessChannel (String channelName ) {
@@ -87,7 +87,9 @@ public static ManagedChannel remoteChannel(
87
87
switch (windmillServiceAddress .getKind ()) {
88
88
case GCP_SERVICE_ADDRESS :
89
89
return remoteChannel (
90
- windmillServiceAddress .gcpServiceAddress (), windmillServiceRpcChannelTimeoutSec );
90
+ windmillServiceAddress .gcpServiceAddress (),
91
+ windmillServiceRpcChannelTimeoutSec ,
92
+ flowControlSettings );
91
93
case AUTHENTICATED_GCP_SERVICE_ADDRESS :
92
94
return remoteDirectChannel (
93
95
windmillServiceAddress .authenticatedGcpServiceAddress (),
@@ -100,14 +102,6 @@ public static ManagedChannel remoteChannel(
100
102
}
101
103
}
102
104
103
- public static UserWorkerGrpcFlowControlSettings getDefaultDirectpathFlowControlSettings () {
104
- return DEFAULT_DIRECTPATH_FLOW_CONTROL_SETTINGS ;
105
- }
106
-
107
- public static UserWorkerGrpcFlowControlSettings getDefaultCloudpathFlowControlSettings () {
108
- return DEFAULT_CLOUDPATH_FLOW_CONTROL_SETTINGS ;
109
- }
110
-
111
105
private static ManagedChannel remoteDirectChannel (
112
106
AuthenticatedGcpServiceAddress authenticatedGcpServiceAddress ,
113
107
int windmillServiceRpcChannelTimeoutSec ,
@@ -121,28 +115,45 @@ private static ManagedChannel remoteDirectChannel(
121
115
new AltsChannelCredentials .Builder ().build ())
122
116
.overrideAuthority (authenticatedGcpServiceAddress .authenticatingService ()),
123
117
windmillServiceRpcChannelTimeoutSec );
124
- int flowControlWindowSizeBytes =
125
- Math .max (WINDMILL_MIN_FLOW_CONTROL_WINDOW , flowControlSettings .getFlowControlWindowBytes ());
126
- return flowControlSettings .getEnableAutoFlowControl ()
127
- ? channelBuilder .initialFlowControlWindow (flowControlWindowSizeBytes ).build ()
128
- : channelBuilder .flowControlWindow (flowControlWindowSizeBytes ).build ();
118
+
119
+ return withFlowControlSettings (
120
+ channelBuilder , flowControlSettings , DIRECTPATH_MIN_FLOW_CONTROL_WINDOW )
121
+ .build ();
129
122
}
130
123
131
124
public static ManagedChannel remoteChannel (
132
- HostAndPort endpoint , int windmillServiceRpcChannelTimeoutSec ) {
133
- return withDefaultChannelOptions (
134
- NettyChannelBuilder .forAddress (
135
- endpoint .getHost (),
136
- endpoint .hasPort ()
137
- ? endpoint .getPort ()
138
- : WindmillEndpoints .DEFAULT_WINDMILL_SERVICE_PORT ),
139
- windmillServiceRpcChannelTimeoutSec )
140
- .negotiationType (NegotiationType .TLS )
141
- .sslContext (dataflowGrpcSslContext (endpoint ))
142
- .flowControlWindow (WINDMILL_MIN_FLOW_CONTROL_WINDOW )
125
+ HostAndPort endpoint ,
126
+ int windmillServiceRpcChannelTimeoutSec ,
127
+ UserWorkerGrpcFlowControlSettings flowControlSettings ) {
128
+ NettyChannelBuilder channelBuilder =
129
+ withDefaultChannelOptions (
130
+ NettyChannelBuilder .forAddress (
131
+ endpoint .getHost (),
132
+ endpoint .hasPort ()
133
+ ? endpoint .getPort ()
134
+ : WindmillEndpoints .DEFAULT_WINDMILL_SERVICE_PORT ),
135
+ windmillServiceRpcChannelTimeoutSec )
136
+ .negotiationType (NegotiationType .TLS )
137
+ .sslContext (dataflowGrpcSslContext (endpoint ));
138
+
139
+ return withFlowControlSettings (
140
+ channelBuilder , flowControlSettings , WINDMILL_MIN_FLOW_CONTROL_WINDOW )
143
141
.build ();
144
142
}
145
143
144
+ private static NettyChannelBuilder withFlowControlSettings (
145
+ NettyChannelBuilder channelBuilder ,
146
+ UserWorkerGrpcFlowControlSettings flowControlSettings ,
147
+ int defaultFlowControlBytes ) {
148
+ int flowControlWindowSizeBytes =
149
+ Math .max (defaultFlowControlBytes , flowControlSettings .getFlowControlWindowBytes ());
150
+ return flowControlSettings .getEnableAutoFlowControl ()
151
+ // Enable auto flow control with an initial window of flowControlWindowSizeBytes. gRPC may
152
+ // resize this value based on throughput.
153
+ ? channelBuilder .initialFlowControlWindow (flowControlWindowSizeBytes )
154
+ : channelBuilder .flowControlWindow (flowControlWindowSizeBytes );
155
+ }
156
+
146
157
@ SuppressWarnings ("nullness" )
147
158
private static SslContext dataflowGrpcSslContext (HostAndPort endpoint ) {
148
159
try {
0 commit comments