Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions presto-docs/src/main/sphinx/installation/deployment.rst
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ At least one resource manager is needed for a cluster, and more can be added to
discovery-server.enabled=true
discovery.uri=http://example.net:8080 (Point to resource manager host/vip)
thrift.server.ssl.enabled=true
resource-manager.http-server-enabled=false
internal-communication.resource-manager-communication-protocol=THRIFT

* ``Coordinator``

Expand All @@ -175,6 +177,7 @@ A cluster can have a pool of coordinators. Each coordinator will run a subset of
query.max-memory-per-node=1GB
discovery.uri=http://example.net:8080 (Point to resource manager host/vip)
resource-manager-enabled=true
internal-communication.resource-manager-communication-protocol=THRIFT

* ``Worker``

Expand All @@ -188,6 +191,7 @@ A cluster can have a pool of workers. They send their heartbeats to the resource
query.max-memory-per-node=1GB
discovery.uri=http://example.net:8080 (Point to resource manager host/vip)
resource-manager-enabled=true
internal-communication.resource-manager-communication-protocol=THRIFT

These properties require some explanation:

Expand Down Expand Up @@ -231,6 +235,15 @@ These properties require some explanation:
the host and port of the Presto coordinator. This URI must not end
in a slash.

* ``internal-communication.resource-manager-communication-protocol``:
The protocol used for communication with the resource manager. This
can be set to ``THRIFT`` or ``HTTP``.

* ``resource-manager.http-server-enabled``:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use the value of internal-communication.resource-manager-communication-protocol, and if it's HTTP, then HTTP server is enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that the C++ workers use the HTTP protocol regardless if the coordinator communicates in thrift. So you could want to keep the thrift implementation & keep the http server on. We could have internal-communication.resource-manager-communication-protocol override resource-manager.http-server-enabled so that if it's set to HTTP & the http-server-enabled is set to false, it'll turn on anyway.

Whether to enable the resource manager HTTP server or not. If
``internal-communication.resource-manager-communication-protocol=HTTP``, this
must be set to ``true``.

The following flags can help one tune the disaggregated coordinator cluster’s resource groups to the desired consistency:

* ``concurrency-threshold-to-enable-resource-group-refresh (default: 1.0)``
Expand Down
2 changes: 1 addition & 1 deletion presto-docs/src/main/sphinx/overview/concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ data from all coordinators and workers and constructs a global view of the clust
A Presto installation with a disaggregated coordinator needs a resource manager.
Clusters support multiple resource managers, each acting as a primary.

Coordinators and workers communicate with resource managers using a thrift API.
Coordinators and workers can communicate with resource managers using either thrift or HTTP API.

Coordinator
^^^^^^^^^^^
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.facebook.drift.annotations.ThriftField;
import com.facebook.drift.annotations.ThriftStruct;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Optional;

Expand All @@ -35,7 +37,14 @@ public class ResourceGroupRuntimeInfo
private final Optional<ResourceGroupSpecInfo> resourceGroupConfigSpec;

@ThriftConstructor
public ResourceGroupRuntimeInfo(ResourceGroupId resourceGroupId, long memoryUsageBytes, int queuedQueries, int descendantQueuedQueries, int runningQueries, int descendantRunningQueries, Optional<ResourceGroupSpecInfo> resourceGroupConfigSpec)
@JsonCreator
public ResourceGroupRuntimeInfo(@JsonProperty("resourceGroupId") ResourceGroupId resourceGroupId,
@JsonProperty("memoryUsageBytes") long memoryUsageBytes,
@JsonProperty("queuedQueries") int queuedQueries,
@JsonProperty("descendantQueuedQueries") int descendantQueuedQueries,
@JsonProperty("runningQueries") int runningQueries,
@JsonProperty("descendantRunningQueries") int descendantRunningQueries,
@JsonProperty("resourceGroupConfigSpec") Optional<ResourceGroupSpecInfo> resourceGroupConfigSpec)
{
this.resourceGroupId = requireNonNull(resourceGroupId, "resourceGroupId is null");
this.memoryUsageBytes = memoryUsageBytes;
Expand All @@ -52,42 +61,49 @@ public static Builder builder(ResourceGroupId resourceGroupId)
}

@ThriftField(1)
@JsonProperty
public ResourceGroupId getResourceGroupId()
{
return resourceGroupId;
}

@ThriftField(2)
@JsonProperty
public long getMemoryUsageBytes()
{
return memoryUsageBytes;
}

@ThriftField(3)
@JsonProperty
public int getQueuedQueries()
{
return queuedQueries;
}

@ThriftField(4)
@JsonProperty
public int getDescendantQueuedQueries()
{
return descendantQueuedQueries;
}

@ThriftField(5)
@JsonProperty
public int getRunningQueries()
{
return runningQueries;
}

@ThriftField(6)
@JsonProperty
public int getDescendantRunningQueries()
{
return descendantRunningQueries;
}

@ThriftField(7)
@JsonProperty
public Optional<ResourceGroupSpecInfo> getResourceGroupConfigSpec()
{
return resourceGroupConfigSpec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public class ResourceManagerConfig
private Duration memoryPoolFetchInterval = new Duration(1, SECONDS);
private boolean resourceGroupServiceCacheEnabled;
private Duration resourceGroupServiceCacheExpireInterval = new Duration(10, SECONDS);
private boolean heartbeatHttpEnabled;
private boolean httpServerEnabled;

private Duration resourceGroupServiceCacheRefreshInterval = new Duration(1, SECONDS);

private Duration runningTaskCountFetchInterval = new Duration(1, SECONDS);
Expand Down Expand Up @@ -279,15 +280,16 @@ public ResourceManagerConfig setRunningTaskCountFetchInterval(Duration runningTa
return this;
}

public boolean getHeartbeatHttpEnabled()
@Config("resource-manager.http-server-enabled")
@ConfigDescription("Enable HTTP REST endpoints on the resource manager for internal communication.")
public ResourceManagerConfig setHttpServerEnabled(boolean httpEnabled)
{
return heartbeatHttpEnabled;
this.httpServerEnabled = httpEnabled;
return this;
}

@Config("resource-manager.heartbeat-http-enabled")
public ResourceManagerConfig setHeartbeatHttpEnabled(boolean heartbeatHttpEnabled)
public boolean getHttpServerEnabled()
{
this.heartbeatHttpEnabled = heartbeatHttpEnabled;
return this;
return httpServerEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class InternalCommunicationConfig
private DataSize maxTaskUpdateSize = new DataSize(16, MEGABYTE);
private CommunicationProtocol taskCommunicationProtocol = CommunicationProtocol.HTTP;
private CommunicationProtocol serverInfoCommunicationProtocol = CommunicationProtocol.HTTP;
private CommunicationProtocol resourceManagerCommunicationProtocol = CommunicationProtocol.THRIFT;
private boolean memoizeDeadNodesEnabled;
private String sharedSecret;
private long nodeStatsRefreshIntervalMillis = 1_000;
Expand Down Expand Up @@ -352,6 +353,19 @@ public InternalCommunicationConfig setInternalJwtEnabled(boolean internalJwtEnab
return this;
}

@Config("internal-communication.resource-manager-communication-protocol")
@ConfigDescription("Protocol for internal communication with resource managers.")
public InternalCommunicationConfig setResourceManagerCommunicationProtocol(CommunicationProtocol resourceManagerCommunicationProtocol)
{
this.resourceManagerCommunicationProtocol = resourceManagerCommunicationProtocol;
return this;
}

public CommunicationProtocol getResourceManagerCommunicationProtocol()
{
return resourceManagerCommunicationProtocol;
}

@AssertTrue(message = "When internal JWT(internal-communication.jwt.enabled) authentication is enabled, a shared secret(internal-communication.shared-secret) is required")
public boolean isRequiredSharedSecretSet()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ private void assertMemoryPoolMap(ResourceManagerClusterStateProvider provider, i
assertEquals(clusterMemoryPoolInfo.getLargestMemoryQuery().map(QueryId::getId), largestMemoryQuery);
}

private static BasicQueryInfo createQueryInfo(String queryId, QueryState state)
public static BasicQueryInfo createQueryInfo(String queryId, QueryState state)
{
return createQueryInfo(queryId, state, "global", GENERAL_POOL);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void testDefaults()
.setResourceGroupRuntimeHeartbeatInterval(new Duration(1, SECONDS))
.setRunningTaskCountFetchInterval(new Duration(1, SECONDS))
.setResourceGroupRuntimeInfoTimeout(new Duration(30, SECONDS))
.setHeartbeatHttpEnabled(false));
.setHttpServerEnabled(false));
}

@Test
Expand All @@ -75,7 +75,7 @@ public void testExplicitPropertyMappings()
.put("resource-manager.resource-group-runtimeinfo-heartbeat-interval", "6m")
.put("resource-manager.running-task-count-fetch-interval", "1m")
.put("resource-manager.resource-group-runtimeinfo-timeout", "4s")
.put("resource-manager.heartbeat-http-enabled", "true")
.put("resource-manager.http-server-enabled", "true")
.build();

ResourceManagerConfig expected = new ResourceManagerConfig()
Expand All @@ -97,7 +97,7 @@ public void testExplicitPropertyMappings()
.setResourceGroupRuntimeHeartbeatInterval(new Duration(6, MINUTES))
.setResourceGroupRuntimeInfoTimeout(new Duration(4, SECONDS))
.setRunningTaskCountFetchInterval(new Duration(1, MINUTES))
.setHeartbeatHttpEnabled(true);
.setHttpServerEnabled(true);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public void testDefaults()
.setTaskUpdateRequestThriftSerdeEnabled(false)
.setTaskInfoResponseThriftSerdeEnabled(false)
.setInternalJwtEnabled(false)
.setResourceManagerCommunicationProtocol(CommunicationProtocol.THRIFT)
.setNodeStatsRefreshIntervalMillis(1_000)
.setNodeDiscoveryPollingIntervalMillis(5_000));
}
Expand Down Expand Up @@ -84,6 +85,7 @@ public void testExplicitPropertyMappings()
.put("experimental.internal-communication.task-info-response-thrift-serde-enabled", "true")
.put("internal-communication.node-stats-refresh-interval-millis", "2000")
.put("internal-communication.node-discovery-polling-interval-millis", "3000")
.put("internal-communication.resource-manager-communication-protocol", "HTTP")
.build();

InternalCommunicationConfig expected = new InternalCommunicationConfig()
Expand All @@ -106,6 +108,7 @@ public void testExplicitPropertyMappings()
.setMemoizeDeadNodesEnabled(true)
.setSharedSecret("secret")
.setInternalJwtEnabled(true)
.setResourceManagerCommunicationProtocol(CommunicationProtocol.HTTP)
.setTaskUpdateRequestThriftSerdeEnabled(true)
.setTaskInfoResponseThriftSerdeEnabled(true)
.setNodeStatsRefreshIntervalMillis(2000)
Expand Down
Loading
Loading