Skip to content

Commit f825d99

Browse files
committed
Implement config flag and pass through /v1/cluster endpoint
1 parent 3b17312 commit f825d99

File tree

6 files changed

+51
-6
lines changed

6 files changed

+51
-6
lines changed

presto-docs/src/main/sphinx/admin/properties.rst

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,15 @@ Maximum object size in bytes that can be considered serializable in a function c
159159

160160
The corresponding session property is :ref:`admin/properties-session:\`\`max_serializable_object_size\`\``.
161161

162+
``cluster-tag``
163+
^^^^^^^^^^^^^^^
164+
165+
* **Type:** ``string``
166+
* **Default value:** (none)
167+
168+
An optional identifier for the cluster. When set, this tag is included in the response from the
169+
``/v1/cluster`` REST API endpoint, allowing clients to identify which cluster provided the response.
170+
162171
Memory Management Properties
163172
----------------------------
164173

presto-main-base/src/main/java/com/facebook/presto/server/ServerConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public class ServerConfig
4242
private Duration clusterStatsExpirationDuration = new Duration(0, MILLISECONDS);
4343
private boolean nestedDataSerializationEnabled = true;
4444
private Duration clusterResourceGroupStateInfoExpirationDuration = new Duration(0, MILLISECONDS);
45+
private String clusterTag;
4546

4647
public boolean isResourceManager()
4748
{
@@ -240,4 +241,16 @@ public ServerConfig setClusterResourceGroupStateInfoExpirationDuration(Duration
240241
this.clusterResourceGroupStateInfoExpirationDuration = clusterResourceGroupStateInfoExpirationDuration;
241242
return this;
242243
}
244+
245+
public String getClusterTag()
246+
{
247+
return clusterTag;
248+
}
249+
250+
@Config("cluster-tag")
251+
public ServerConfig setClusterTag(String clusterTag)
252+
{
253+
this.clusterTag = clusterTag;
254+
return this;
255+
}
243256
}

presto-main/src/main/java/com/facebook/presto/resourcemanager/DistributedClusterStatsResource.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public class DistributedClusterStatsResource
5050
private final ResourceManagerClusterStateProvider clusterStateProvider;
5151
private final InternalNodeManager internalNodeManager;
5252
private final Supplier<ClusterStats> clusterStatsSupplier;
53+
private final String clusterTag;
5354

5455
@Inject
5556
public DistributedClusterStatsResource(
@@ -61,7 +62,9 @@ public DistributedClusterStatsResource(
6162
this.isIncludeCoordinator = requireNonNull(nodeSchedulerConfig, "nodeSchedulerConfig is null").isIncludeCoordinator();
6263
this.clusterStateProvider = requireNonNull(clusterStateProvider, "nodeStateManager is null");
6364
this.internalNodeManager = requireNonNull(internalNodeManager, "internalNodeManager is null");
64-
Duration expirationDuration = requireNonNull(serverConfig, "serverConfig is null").getClusterStatsExpirationDuration();
65+
ServerConfig config = requireNonNull(serverConfig, "serverConfig is null");
66+
this.clusterTag = config.getClusterTag();
67+
Duration expirationDuration = config.getClusterStatsExpirationDuration();
6568
this.clusterStatsSupplier = expirationDuration.getValue() > 0 ? memoizeWithExpiration(this::calculateClusterStats, expirationDuration.toMillis(), MILLISECONDS) : this::calculateClusterStats;
6669
}
6770

@@ -126,7 +129,8 @@ else if (query.getState() == QueryState.RUNNING) {
126129
totalInputRows,
127130
totalInputBytes,
128131
totalCpuTimeSecs,
129-
clusterStateProvider.getAdjustedQueueSize());
132+
clusterStateProvider.getAdjustedQueueSize(),
133+
clusterTag);
130134
}
131135

132136
@GET

presto-main/src/main/java/com/facebook/presto/server/ClusterStatsResource.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public class ClusterStatsResource
7676
private final InternalResourceGroupManager internalResourceGroupManager;
7777
private final ClusterTtlProviderManager clusterTtlProviderManager;
7878
private final Supplier<ClusterStats> clusterStatsSupplier;
79+
private final String clusterTag;
7980

8081
@Inject
8182
public ClusterStatsResource(
@@ -96,7 +97,9 @@ public ClusterStatsResource(
9697
this.proxyHelper = requireNonNull(proxyHelper, "internalNodeManager is null");
9798
this.internalResourceGroupManager = requireNonNull(internalResourceGroupManager, "internalResourceGroupManager is null");
9899
this.clusterTtlProviderManager = requireNonNull(clusterTtlProviderManager, "clusterTtlProvider is null");
99-
Duration expirationDuration = requireNonNull(serverConfig, "serverConfig is null").getClusterStatsExpirationDuration();
100+
ServerConfig config = requireNonNull(serverConfig, "serverConfig is null");
101+
this.clusterTag = config.getClusterTag();
102+
Duration expirationDuration = config.getClusterStatsExpirationDuration();
100103
this.clusterStatsSupplier = expirationDuration.getValue() > 0 ? memoizeWithExpiration(this::calculateClusterStats, expirationDuration.toMillis(), MILLISECONDS) : this::calculateClusterStats;
101104
}
102105

@@ -170,7 +173,8 @@ else if (query.getState() == QueryState.RUNNING) {
170173
totalInputRows,
171174
totalInputBytes,
172175
totalCpuTimeSecs,
173-
internalResourceGroupManager.getQueriesQueuedOnInternal());
176+
internalResourceGroupManager.getQueriesQueuedOnInternal(),
177+
clusterTag);
174178
}
175179

176180
@GET
@@ -238,6 +242,8 @@ public static class ClusterStats
238242
private final long totalCpuTimeSecs;
239243
private final long adjustedQueueSize;
240244

245+
private final String clusterTag;
246+
241247
@JsonCreator
242248
@ThriftConstructor
243249
public ClusterStats(
@@ -251,7 +257,8 @@ public ClusterStats(
251257
@JsonProperty("totalInputRows") long totalInputRows,
252258
@JsonProperty("totalInputBytes") long totalInputBytes,
253259
@JsonProperty("totalCpuTimeSecs") long totalCpuTimeSecs,
254-
@JsonProperty("adjustedQueueSize") long adjustedQueueSize)
260+
@JsonProperty("adjustedQueueSize") long adjustedQueueSize,
261+
@JsonProperty("clusterTag") String clusterTag)
255262
{
256263
this.runningQueries = runningQueries;
257264
this.blockedQueries = blockedQueries;
@@ -264,6 +271,7 @@ public ClusterStats(
264271
this.totalInputBytes = totalInputBytes;
265272
this.totalCpuTimeSecs = totalCpuTimeSecs;
266273
this.adjustedQueueSize = adjustedQueueSize;
274+
this.clusterTag = clusterTag;
267275
}
268276

269277
@JsonProperty
@@ -342,5 +350,12 @@ public long getAdjustedQueueSize()
342350
{
343351
return adjustedQueueSize;
344352
}
353+
354+
@JsonProperty
355+
@ThriftField(12)
356+
public String getClusterTag()
357+
{
358+
return clusterTag;
359+
}
345360
}
346361
}

presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@ else if (serverConfig.isCoordinator()) {
335335

336336
install(new InternalCommunicationModule());
337337

338+
configBinder(binder).bindConfig(ServerConfig.class);
338339
configBinder(binder).bindConfig(FeaturesConfig.class);
339340
configBinder(binder).bindConfig(FunctionsConfig.class);
340341
configBinder(binder).bindConfig(JavaFeaturesConfig.class);

presto-main/src/test/java/com/facebook/presto/server/TestThriftClusterStats.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public class TestThriftClusterStats
4646
public static final long TOTAL_INPUT_BYTES = 1003;
4747
public static final long TOTAL_CPU_TIME_SECS = 1004;
4848
public static final long ADJUSTED_QUEUE_SIZE = 1005;
49+
public static final String CLUSTER_TAG = "test-cluster";
4950
private static final ThriftCodecManager COMPILER_READ_CODEC_MANAGER = new ThriftCodecManager(new CompilerThriftCodecFactory(false));
5051
private static final ThriftCodecManager COMPILER_WRITE_CODEC_MANAGER = new ThriftCodecManager(new CompilerThriftCodecFactory(false));
5152
private static final ThriftCodec<ClusterStats> COMPILER_READ_CODEC = COMPILER_READ_CODEC_MANAGER.getCodec(ClusterStats.class);
@@ -111,6 +112,7 @@ private void assertSerde(ClusterStats clusterStats)
111112
assertEquals(clusterStats.getTotalInputBytes(), TOTAL_INPUT_BYTES);
112113
assertEquals(clusterStats.getTotalCpuTimeSecs(), TOTAL_CPU_TIME_SECS);
113114
assertEquals(clusterStats.getAdjustedQueueSize(), ADJUSTED_QUEUE_SIZE);
115+
assertEquals(clusterStats.getClusterTag(), CLUSTER_TAG);
114116
}
115117

116118
private ClusterStats getRoundTripSerialize(ThriftCodec<ClusterStats> readCodec, ThriftCodec<ClusterStats> writeCodec, Function<TTransport, TProtocol> protocolFactory)
@@ -134,6 +136,7 @@ private ClusterStats getClusterStats()
134136
TOTAL_INPUT_ROWS,
135137
TOTAL_INPUT_BYTES,
136138
TOTAL_CPU_TIME_SECS,
137-
ADJUSTED_QUEUE_SIZE);
139+
ADJUSTED_QUEUE_SIZE,
140+
CLUSTER_TAG);
138141
}
139142
}

0 commit comments

Comments
 (0)