Skip to content

Commit bf72aca

Browse files
committed
Add HTTP support for resource manager internal communication
1 parent f5318c3 commit bf72aca

File tree

19 files changed

+1188
-95
lines changed

19 files changed

+1188
-95
lines changed

presto-docs/src/main/sphinx/installation/deployment.rst

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,15 @@ These properties require some explanation:
231231
the host and port of the Presto coordinator. This URI must not end
232232
in a slash.
233233

234+
* ``internal-communication.resource-manager-communication-protocol``:
235+
The protocol used for communication with the resource manager. This
236+
can be set to ``THRIFT`` or ``HTTP``.
237+
238+
* ``resource-manager.http-server-enabled``:
239+
Whether to enable the resource manager HTTP server or not. If the
240+
resource manager communication protocol is set to ``HTTP``, this
241+
must be set to ``true``.
242+
234243
The following flags can help one tune the disaggregated coordinator cluster’s resource groups to the desired consistency:
235244

236245
* ``concurrency-threshold-to-enable-resource-group-refresh (default: 1.0)``

presto-docs/src/main/sphinx/overview/concepts.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ data from all coordinators and workers and constructs a global view of the clust
3838
A Presto installation with a disaggregated coordinator needs a resource manager.
3939
Clusters support multiple resource managers, each acting as a primary.
4040

41-
Coordinators and workers communicate with resource managers using a thrift API.
41+
Coordinators and workers can communicate with resource managers using either thrift or HTTP API.
4242

4343
Coordinator
4444
^^^^^^^^^^^

presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/ResourceGroupRuntimeInfo.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import com.facebook.drift.annotations.ThriftField;
1818
import com.facebook.drift.annotations.ThriftStruct;
1919
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
20+
import com.fasterxml.jackson.annotation.JsonCreator;
21+
import com.fasterxml.jackson.annotation.JsonProperty;
2022

2123
import java.util.Optional;
2224

@@ -35,7 +37,14 @@ public class ResourceGroupRuntimeInfo
3537
private final Optional<ResourceGroupSpecInfo> resourceGroupConfigSpec;
3638

3739
@ThriftConstructor
38-
public ResourceGroupRuntimeInfo(ResourceGroupId resourceGroupId, long memoryUsageBytes, int queuedQueries, int descendantQueuedQueries, int runningQueries, int descendantRunningQueries, Optional<ResourceGroupSpecInfo> resourceGroupConfigSpec)
40+
@JsonCreator
41+
public ResourceGroupRuntimeInfo(@JsonProperty("resourceGroupId") ResourceGroupId resourceGroupId,
42+
@JsonProperty("memoryUsageBytes") long memoryUsageBytes,
43+
@JsonProperty("queuedQueries") int queuedQueries,
44+
@JsonProperty("descendantQueuedQueries") int descendantQueuedQueries,
45+
@JsonProperty("runningQueries") int runningQueries,
46+
@JsonProperty("descendantRunningQueries") int descendantRunningQueries,
47+
@JsonProperty("resourceGroupConfigSpec") Optional<ResourceGroupSpecInfo> resourceGroupConfigSpec)
3948
{
4049
this.resourceGroupId = requireNonNull(resourceGroupId, "resourceGroupId is null");
4150
this.memoryUsageBytes = memoryUsageBytes;
@@ -52,42 +61,49 @@ public static Builder builder(ResourceGroupId resourceGroupId)
5261
}
5362

5463
@ThriftField(1)
64+
@JsonProperty
5565
public ResourceGroupId getResourceGroupId()
5666
{
5767
return resourceGroupId;
5868
}
5969

6070
@ThriftField(2)
71+
@JsonProperty
6172
public long getMemoryUsageBytes()
6273
{
6374
return memoryUsageBytes;
6475
}
6576

6677
@ThriftField(3)
78+
@JsonProperty
6779
public int getQueuedQueries()
6880
{
6981
return queuedQueries;
7082
}
7183

7284
@ThriftField(4)
85+
@JsonProperty
7386
public int getDescendantQueuedQueries()
7487
{
7588
return descendantQueuedQueries;
7689
}
7790

7891
@ThriftField(5)
92+
@JsonProperty
7993
public int getRunningQueries()
8094
{
8195
return runningQueries;
8296
}
8397

8498
@ThriftField(6)
99+
@JsonProperty
85100
public int getDescendantRunningQueries()
86101
{
87102
return descendantRunningQueries;
88103
}
89104

90105
@ThriftField(7)
106+
@JsonProperty
91107
public Optional<ResourceGroupSpecInfo> getResourceGroupConfigSpec()
92108
{
93109
return resourceGroupConfigSpec;

presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerConfig.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ public class ResourceManagerConfig
4242
private Duration memoryPoolFetchInterval = new Duration(1, SECONDS);
4343
private boolean resourceGroupServiceCacheEnabled;
4444
private Duration resourceGroupServiceCacheExpireInterval = new Duration(10, SECONDS);
45-
private boolean heartbeatHttpEnabled;
45+
private boolean httpServerEnabled;
46+
4647
private Duration resourceGroupServiceCacheRefreshInterval = new Duration(1, SECONDS);
4748

4849
private Duration runningTaskCountFetchInterval = new Duration(1, SECONDS);
@@ -279,15 +280,16 @@ public ResourceManagerConfig setRunningTaskCountFetchInterval(Duration runningTa
279280
return this;
280281
}
281282

282-
public boolean getHeartbeatHttpEnabled()
283+
@Config("resource-manager.http-server-enabled")
284+
@ConfigDescription("Enable HTTP REST endpoints on the resource manager for internal communication.")
285+
public ResourceManagerConfig setHttpServerEnabled(boolean httpEnabled)
283286
{
284-
return heartbeatHttpEnabled;
287+
this.httpServerEnabled = httpEnabled;
288+
return this;
285289
}
286290

287-
@Config("resource-manager.heartbeat-http-enabled")
288-
public ResourceManagerConfig setHeartbeatHttpEnabled(boolean heartbeatHttpEnabled)
291+
public boolean getHttpServerEnabled()
289292
{
290-
this.heartbeatHttpEnabled = heartbeatHttpEnabled;
291-
return this;
293+
return httpServerEnabled;
292294
}
293295
}

presto-main-base/src/main/java/com/facebook/presto/server/InternalCommunicationConfig.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public class InternalCommunicationConfig
4747
private DataSize maxTaskUpdateSize = new DataSize(16, MEGABYTE);
4848
private CommunicationProtocol taskCommunicationProtocol = CommunicationProtocol.HTTP;
4949
private CommunicationProtocol serverInfoCommunicationProtocol = CommunicationProtocol.HTTP;
50+
private CommunicationProtocol resourceManagerCommunicationProtocol = CommunicationProtocol.THRIFT;
5051
private boolean memoizeDeadNodesEnabled;
5152
private String sharedSecret;
5253
private long nodeStatsRefreshIntervalMillis = 1_000;
@@ -352,6 +353,19 @@ public InternalCommunicationConfig setInternalJwtEnabled(boolean internalJwtEnab
352353
return this;
353354
}
354355

356+
@Config("internal-communication.resource-manager-communication-protocol")
357+
@ConfigDescription("Protocol for internal communication with resource managers.")
358+
public InternalCommunicationConfig setResourceManagerCommunicationProtocol(CommunicationProtocol resourceManagerCommunicationProtocol)
359+
{
360+
this.resourceManagerCommunicationProtocol = resourceManagerCommunicationProtocol;
361+
return this;
362+
}
363+
364+
public CommunicationProtocol getResourceManagerCommunicationProtocol()
365+
{
366+
return resourceManagerCommunicationProtocol;
367+
}
368+
355369
@AssertTrue(message = "When internal JWT(internal-communication.jwt.enabled) authentication is enabled, a shared secret(internal-communication.shared-secret) is required")
356370
public boolean isRequiredSharedSecretSet()
357371
{

presto-main-base/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStateProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -724,7 +724,7 @@ private void assertMemoryPoolMap(ResourceManagerClusterStateProvider provider, i
724724
assertEquals(clusterMemoryPoolInfo.getLargestMemoryQuery().map(QueryId::getId), largestMemoryQuery);
725725
}
726726

727-
private static BasicQueryInfo createQueryInfo(String queryId, QueryState state)
727+
public static BasicQueryInfo createQueryInfo(String queryId, QueryState state)
728728
{
729729
return createQueryInfo(queryId, state, "global", GENERAL_POOL);
730730
}

presto-main-base/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerConfig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void testDefaults()
5050
.setResourceGroupRuntimeHeartbeatInterval(new Duration(1, SECONDS))
5151
.setRunningTaskCountFetchInterval(new Duration(1, SECONDS))
5252
.setResourceGroupRuntimeInfoTimeout(new Duration(30, SECONDS))
53-
.setHeartbeatHttpEnabled(false));
53+
.setHttpServerEnabled(false));
5454
}
5555

5656
@Test
@@ -75,7 +75,7 @@ public void testExplicitPropertyMappings()
7575
.put("resource-manager.resource-group-runtimeinfo-heartbeat-interval", "6m")
7676
.put("resource-manager.running-task-count-fetch-interval", "1m")
7777
.put("resource-manager.resource-group-runtimeinfo-timeout", "4s")
78-
.put("resource-manager.heartbeat-http-enabled", "true")
78+
.put("resource-manager.http-server-enabled", "true")
7979
.build();
8080

8181
ResourceManagerConfig expected = new ResourceManagerConfig()
@@ -97,7 +97,7 @@ public void testExplicitPropertyMappings()
9797
.setResourceGroupRuntimeHeartbeatInterval(new Duration(6, MINUTES))
9898
.setResourceGroupRuntimeInfoTimeout(new Duration(4, SECONDS))
9999
.setRunningTaskCountFetchInterval(new Duration(1, MINUTES))
100-
.setHeartbeatHttpEnabled(true);
100+
.setHttpServerEnabled(true);
101101

102102
assertFullMapping(properties, expected);
103103
}

presto-main-base/src/test/java/com/facebook/presto/server/TestInternalCommunicationConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public void testDefaults()
5353
.setTaskUpdateRequestThriftSerdeEnabled(false)
5454
.setTaskInfoResponseThriftSerdeEnabled(false)
5555
.setInternalJwtEnabled(false)
56+
.setResourceManagerCommunicationProtocol(CommunicationProtocol.THRIFT)
5657
.setNodeStatsRefreshIntervalMillis(1_000)
5758
.setNodeDiscoveryPollingIntervalMillis(5_000));
5859
}
@@ -84,6 +85,7 @@ public void testExplicitPropertyMappings()
8485
.put("experimental.internal-communication.task-info-response-thrift-serde-enabled", "true")
8586
.put("internal-communication.node-stats-refresh-interval-millis", "2000")
8687
.put("internal-communication.node-discovery-polling-interval-millis", "3000")
88+
.put("internal-communication.resource-manager-communication-protocol", "HTTP")
8789
.build();
8890

8991
InternalCommunicationConfig expected = new InternalCommunicationConfig()
@@ -106,6 +108,7 @@ public void testExplicitPropertyMappings()
106108
.setMemoizeDeadNodesEnabled(true)
107109
.setSharedSecret("secret")
108110
.setInternalJwtEnabled(true)
111+
.setResourceManagerCommunicationProtocol(CommunicationProtocol.HTTP)
109112
.setTaskUpdateRequestThriftSerdeEnabled(true)
110113
.setTaskInfoResponseThriftSerdeEnabled(true)
111114
.setNodeStatsRefreshIntervalMillis(2000)

0 commit comments

Comments
 (0)