-
Notifications
You must be signed in to change notification settings - Fork 5.5k
feat(server): Add http support for internal resource manager communication #26635
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Reviewer's GuideThis PR adds HTTP-based internal communication support for the Resource Manager alongside the existing Thrift protocol. It introduces configuration to select the communication protocol, binds a new HTTP client and REST endpoints, and updates the heartbeat sender to dispatch requests over HTTP or Thrift transparently. Sequence diagram for sending a node heartbeat with protocol selectionsequenceDiagram
participant Coordinator
participant ResourceManagerClusterStatusSender
participant ResourceManagerClient
participant HttpResourceManagerClient
participant ResourceManagerResource
Coordinator->>ResourceManagerClusterStatusSender: sendNodeHeartbeat()
alt CommunicationProtocol = HTTP
ResourceManagerClusterStatusSender->>HttpResourceManagerClient: nodeHeartbeat(NodeStatus)
HttpResourceManagerClient->>ResourceManagerResource: HTTP PUT /v1/resourcemanager/nodeHeartbeat
ResourceManagerResource->>ResourceManagerClusterStateProvider: registerNodeHeartbeat(NodeStatus)
else CommunicationProtocol = Thrift
ResourceManagerClusterStatusSender->>ResourceManagerClient: nodeHeartbeat(NodeStatus)
ResourceManagerClient->>ResourceManagerClusterStateProvider: registerNodeHeartbeat(NodeStatus)
end
Class diagram for ResourceManagerClusterStatusSender and related clientsclassDiagram
class ResourceManagerClusterStatusSender {
- DriftClient thriftResourceManagerClient
- HttpClient httpClient
- InternalNodeManager internalNodeManager
- ResourceGroupManager resourceGroupManager
- Supplier statusSupplier
- ScheduledExecutorService executor
- Duration queryHeartbeatInterval
- InternalCommunicationConfig.CommunicationProtocol communicationProtocol
- Map queries
- Map httpClientCache
- PeriodicTaskExecutor nodeHeartbeatSender
- Optional resourceRuntimeHeartbeatSender
+ sendNodeHeartbeat()
+ sendQueryHeartbeat(...)
+ sendResourceGroupRuntimeHeartbeat()
- getResourceManagers()
- getOrCreateHttpClient(HostAddress)
}
class ResourceManagerClient {
<<interface>>
+ queryHeartbeat(...)
+ nodeHeartbeat(...)
+ resourceGroupRuntimeHeartbeat(...)
+ getResourceGroupInfo(...)
+ getMemoryPoolInfo()
+ getRunningTaskCount()
}
class HttpResourceManagerClient {
+ queryHeartbeat(...)
+ nodeHeartbeat(...)
+ resourceGroupRuntimeHeartbeat(...)
+ getResourceGroupInfo(...)
+ getMemoryPoolInfo()
+ getRunningTaskCount()
}
ResourceManagerClusterStatusSender --> ResourceManagerClient
ResourceManagerClusterStatusSender --> HttpResourceManagerClient
HttpResourceManagerClient ..|> ResourceManagerClient
Class diagram for ResourceManagerConfig protocol selectionclassDiagram
class ResourceManagerConfig {
- boolean httpServerEnabled
- InternalCommunicationConfig.CommunicationProtocol communicationProtocol
+ setHttpServerEnabled(boolean)
+ getHttpServerEnabled()
+ setCommunicationProtocol(CommunicationProtocol)
+ getCommunicationProtocol()
}
class InternalCommunicationConfig {
}
class CommunicationProtocol {
<<enumeration>>
THRIFT
HTTP
}
ResourceManagerConfig --> InternalCommunicationConfig
InternalCommunicationConfig --> CommunicationProtocol
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- TestResourceManagerConfig uses resource-manager.http-enabled but the actual @config annotation is resource-manager.http-server-enabled, so update the test or the annotation to use matching property names.
- TestResourceManagerClusterStatusSender is passing null for the HttpClient parameter, causing NPEs—either supply a mock HttpClient in the test setup or default the protocol to THRIFT for that test.
- ResourceManagerClusterStatusSender has repeated loops for HTTP vs THRIFT; consider pulling the per-host send logic into a shared method or strategy to avoid duplication and make future protocol additions easier.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- TestResourceManagerConfig uses resource-manager.http-enabled but the actual @Config annotation is resource-manager.http-server-enabled, so update the test or the annotation to use matching property names.
- TestResourceManagerClusterStatusSender is passing null for the HttpClient parameter, causing NPEs—either supply a mock HttpClient in the test setup or default the protocol to THRIFT for that test.
- ResourceManagerClusterStatusSender has repeated loops for HTTP vs THRIFT; consider pulling the per-host send logic into a shared method or strategy to avoid duplication and make future protocol additions easier.
## Individual Comments
### Comment 1
<location> `presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStatusSender.java:202` </location>
<code_context>
+
+ public void sendResourceGroupRuntimeHeartbeat()
+ {
+ List resourceGroupRuntimeInfos = resourceGroupManager.getResourceGroupRuntimeInfos();
+
+ if (communicationProtocol == InternalCommunicationConfig.CommunicationProtocol.HTTP) {
</code_context>
<issue_to_address>
**nitpick:** Raw type used for resourceGroupRuntimeInfos; should be parameterized.
Specify the generic type for 'resourceGroupRuntimeInfos' to improve type safety and code clarity.
</issue_to_address>
### Comment 2
<location> `presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStatusSender.java:223` </location>
<code_context>
private List<HostAddress> getResourceManagers()
{
return internalNodeManager.getResourceManagers().stream()
- .filter(node -> node.getThriftPort().isPresent())
+ .filter(node -> communicationProtocol == InternalCommunicationConfig.CommunicationProtocol.HTTP ||
</code_context>
<issue_to_address>
**question (bug_risk):** Filtering logic for resource managers may allow nodes without Thrift port in HTTP mode.
Verify that all nodes included in HTTP mode are valid HTTP resource managers to prevent connection attempts to unsupported nodes.
</issue_to_address>
### Comment 3
<location> `presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerConfig.java:46-48` </location>
<code_context>
private boolean resourceGroupServiceCacheEnabled;
private Duration resourceGroupServiceCacheExpireInterval = new Duration(10, SECONDS);
- private boolean heartbeatHttpEnabled;
+ private boolean httpServerEnabled;
+
+ private InternalCommunicationConfig.CommunicationProtocol communicationProtocol = InternalCommunicationConfig.CommunicationProtocol.THRIFT;
private Duration resourceGroupServiceCacheRefreshInterval = new Duration(1, SECONDS);
</code_context>
<issue_to_address>
**suggestion:** Both httpServerEnabled and communicationProtocol are present; possible configuration ambiguity.
Clarify how these settings interact, especially when httpServerEnabled is false but communicationProtocol is set to HTTP, to prevent misconfiguration.
Suggested implementation:
```java
/**
* If true, enables the HTTP server for resource manager communication.
* If false, communicationProtocol must not be set to HTTP.
*/
private boolean httpServerEnabled;
/**
* Protocol used for internal communication. If set to HTTP, httpServerEnabled must be true.
*/
private InternalCommunicationConfig.CommunicationProtocol communicationProtocol = InternalCommunicationConfig.CommunicationProtocol.THRIFT;
```
```java
private Duration resourceGroupServiceCacheRefreshInterval = new Duration(1, SECONDS);
/**
* Validates the configuration to prevent ambiguous or invalid settings.
* Throws IllegalArgumentException if httpServerEnabled is false but communicationProtocol is set to HTTP.
*/
@jakarta.annotation.PostConstruct
public void validateConfiguration() {
if (!httpServerEnabled &&
communicationProtocol == InternalCommunicationConfig.CommunicationProtocol.HTTP) {
throw new IllegalArgumentException(
"Invalid configuration: communicationProtocol is set to HTTP but httpServerEnabled is false. " +
"Either enable httpServerEnabled or set communicationProtocol to a non-HTTP value."
);
}
}
```
</issue_to_address>
### Comment 4
<location> `presto-main/src/main/java/com/facebook/presto/server/ResourceManagerResource.java:61` </location>
<code_context>
+ }
+
+ @Override
+ public void nodeHeartbeat(NodeStatus nodeStatus)
+ {
+ URI uri = buildUri("/v1/resourcemanager/nodeHeartbeat");
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Node heartbeat and other endpoints use executor.execute/submit; consider error handling for async tasks.
Exceptions from background tasks may be missed. Add error handling or logging to capture failures from clusterStateProvider methods.
</issue_to_address>
### Comment 5
<location> `presto-main-base/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerConfig.java:80-81` </location>
<code_context>
.put("resource-manager.running-task-count-fetch-interval", "1m")
.put("resource-manager.resource-group-runtimeinfo-timeout", "4s")
- .put("resource-manager.heartbeat-http-enabled", "true")
+ .put("resource-manager.http-enabled", "true")
+ .put("resource-manager.communication-protocol", "HTTP")
.build();
</code_context>
<issue_to_address>
**issue (testing):** Test for explicit property mappings should use the correct property name.
The test should use "resource-manager.http-server-enabled" to match the implementation and properly validate the mapping.
</issue_to_address>
### Comment 6
<location> `presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStatusSender.java:85-86` </location>
<code_context>
sender = new ResourceManagerClusterStatusSender(
(addressSelectionContext, headers) -> resourceManagerClient,
+ null,
nodeManager,
() -> NODE_STATUS,
</code_context>
<issue_to_address>
**suggestion (testing):** TestResourceManagerClusterStatusSender does not test HTTP protocol path.
Please add tests for the HTTP protocol path, including error scenarios and edge cases, to ensure full coverage of the new logic.
Suggested implementation:
```java
sender = new ResourceManagerClusterStatusSender(
(addressSelectionContext, headers) -> resourceManagerClient,
null,
nodeManager,
() -> NODE_STATUS,
newSingleThreadScheduledExecutor(),
);
// --- HTTP protocol path tests ---
@Test
public void testHttpProtocolSuccess() throws Exception {
ResourceManagerClient httpClient = mock(ResourceManagerClient.class);
when(httpClient.sendClusterStatus(any(), any())).thenReturn(/* mock successful response */);
ResourceManagerClusterStatusSender httpSender = new ResourceManagerClusterStatusSender(
(addressSelectionContext, headers) -> httpClient,
ResourceManagerClusterStatusSender.Protocol.HTTP,
nodeManager,
() -> NODE_STATUS,
newSingleThreadScheduledExecutor()
);
// Call the method and assert success
httpSender.sendClusterStatus();
// Add assertions to verify correct behavior
}
@Test
public void testHttpProtocolErrorResponse() throws Exception {
ResourceManagerClient httpClient = mock(ResourceManagerClient.class);
when(httpClient.sendClusterStatus(any(), any())).thenThrow(new IOException("HTTP error"));
ResourceManagerClusterStatusSender httpSender = new ResourceManagerClusterStatusSender(
(addressSelectionContext, headers) -> httpClient,
ResourceManagerClusterStatusSender.Protocol.HTTP,
nodeManager,
() -> NODE_STATUS,
newSingleThreadScheduledExecutor()
);
// Call the method and assert error handling
assertThrows(IOException.class, httpSender::sendClusterStatus);
}
@Test
public void testHttpProtocolTimeout() throws Exception {
ResourceManagerClient httpClient = mock(ResourceManagerClient.class);
when(httpClient.sendClusterStatus(any(), any())).thenAnswer(invocation -> {
Thread.sleep(2000); // Simulate timeout
return null;
});
ResourceManagerClusterStatusSender httpSender = new ResourceManagerClusterStatusSender(
(addressSelectionContext, headers) -> httpClient,
ResourceManagerClusterStatusSender.Protocol.HTTP,
nodeManager,
() -> NODE_STATUS,
newSingleThreadScheduledExecutor()
);
// Call the method and assert timeout handling
// (Assume sender has timeout logic, otherwise this test can be adjusted)
}
@Test
public void testHttpProtocolInvalidResponse() throws Exception {
ResourceManagerClient httpClient = mock(ResourceManagerClient.class);
when(httpClient.sendClusterStatus(any(), any())).thenReturn(null); // Simulate invalid response
ResourceManagerClusterStatusSender httpSender = new ResourceManagerClusterStatusSender(
(addressSelectionContext, headers) -> httpClient,
ResourceManagerClusterStatusSender.Protocol.HTTP,
nodeManager,
() -> NODE_STATUS,
newSingleThreadScheduledExecutor()
);
// Call the method and assert handling of invalid response
// Add assertions as appropriate
}
```
- You may need to adjust the mock responses and assertions to match the actual implementation details of `ResourceManagerClient` and `ResourceManagerClusterStatusSender`.
- Ensure that `ResourceManagerClusterStatusSender.Protocol.HTTP` is a valid enum value; if not, use the correct value for HTTP protocol.
- If the sender's error/timeout handling is more complex, add more detailed assertions and possibly use Awaitility or similar for async tests.
- Import necessary classes: `org.junit.jupiter.api.Test`, `org.mockito.Mockito`, etc.
</issue_to_address>
### Comment 7
<location> `presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStatusSender.java:86-87` </location>
<code_context>
sender = new ResourceManagerClusterStatusSender(
(addressSelectionContext, headers) -> resourceManagerClient,
+ null,
nodeManager,
() -> NODE_STATUS,
newSingleThreadScheduledExecutor(),
</code_context>
<issue_to_address>
**suggestion (testing):** No tests for error handling in HTTP client usage.
Please add tests that simulate HTTP failures to ensure error handling and logging work as intended.
Suggested implementation:
```java
sender = new ResourceManagerClusterStatusSender(
(addressSelectionContext, headers) -> resourceManagerClient,
null,
nodeManager,
() -> NODE_STATUS,
newSingleThreadScheduledExecutor(),
```
```java
@Test
public void testSendClusterStatusHandlesHttpClientFailure() throws Exception
{
// Arrange: mock ResourceManagerClient to throw an exception
ResourceManagerClient failingClient = mock(ResourceManagerClient.class);
when(failingClient.sendClusterStatus(any(), any())).thenThrow(new RuntimeException("HTTP failure"));
ResourceManagerClusterStatusSender sender = new ResourceManagerClusterStatusSender(
(addressSelectionContext, headers) -> failingClient,
null,
nodeManager,
() -> NODE_STATUS,
newSingleThreadScheduledExecutor()
);
// Act & Assert: error should be handled gracefully
try {
sender.sendClusterStatus();
// If no exception is thrown, error handling is working as expected
} catch (Exception e) {
fail("Exception should be handled within sendClusterStatus");
}
// Optionally, verify logging (if using a logging framework with test hooks)
// For example, using a log capturing library to assert error was logged
}
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
...in/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStatusSender.java
Outdated
Show resolved
Hide resolved
...in/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStatusSender.java
Show resolved
Hide resolved
presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerConfig.java
Outdated
Show resolved
Hide resolved
| @PUT | ||
| @Consumes(APPLICATION_JSON) | ||
| @Path("nodeHeartbeat") | ||
| public void nodeHeartbeat(NodeStatus nodeStatus) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (bug_risk): Node heartbeat and other endpoints use executor.execute/submit; consider error handling for async tasks.
Exceptions from background tasks may be missed. Add error handling or logging to capture failures from clusterStateProvider methods.
a9b3233 to
4fd39b9
Compare
43135aa to
d9c6e38
Compare
2428079 to
bf72aca
Compare
steveburnett
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the doc! Just a couple of suggestions, looks good overall.
steveburnett
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! (docs)
Pull branch, local doc build. Looks good, thanks!
| The protocol used for communication with the resource manager. This | ||
| can be set to ``THRIFT`` or ``HTTP``. | ||
|
|
||
| * ``resource-manager.http-server-enabled``: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use the value of internal-communication.resource-manager-communication-protocol, and if it's HTTP, then HTTP server is enabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue is that the C++ workers use the HTTP protocol regardless if the coordinator communicates in thrift. So you could want to keep the thrift implementation & keep the http server on. We could have internal-communication.resource-manager-communication-protocol override resource-manager.http-server-enabled so that if it's set to HTTP & the http-server-enabled is set to false, it'll turn on anyway.
|
|
||
| try { | ||
| client.nodeHeartbeat(nodeStatus); | ||
| fail("Expected PrestoException"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use idiomatic TestNG assertThrows
| client.nodeHeartbeat(createTestNodeStatus("node1")); | ||
| } | ||
|
|
||
| private static BasicQueryInfo createTestQueryInfo(String queryId, QueryState state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can actually test this completely end to end using the actual client and server. Look at how TestHttpRemoteTask uses JaxrsTestingHttpProcessor, I think we can do something similar:
presto/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTask.java
Lines 432 to 435 in 3d99a78
| JaxrsTestingHttpProcessor jaxrsTestingHttpProcessor = new JaxrsTestingHttpProcessor(URI.create("http://fake.invalid/"), testingTaskResource, jsonMapper, thriftMapper); | |
| TestingHttpClient testingHttpClient = new TestingHttpClient(jaxrsTestingHttpProcessor.setTrace(TRACE_HTTP)); | |
| testingTaskResource.setHttpClient(testingHttpClient); | |
| return new HttpRemoteTaskFactory( |
| getResourceManagers().forEach(hostAndPort -> | ||
| resourceManagerClient.get(Optional.of(hostAndPort.toString())).queryHeartbeat(nodeIdentifier, basicQueryInfo, sequenceId)); | ||
|
|
||
| if (communicationProtocol == InternalCommunicationConfig.CommunicationProtocol.HTTP) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if (communicationProtocol == InternalCommunicationConfig.CommunicationProtocol.HTTP) { | |
| if (communicationProtocol == HTTP) { |
| getResourceManagers().forEach(hostAndPort -> | ||
| resourceManagerClient.get(Optional.of(hostAndPort.toString())).nodeHeartbeat(statusSupplier.get())); | ||
| NodeStatus nodeStatus = statusSupplier.get(); | ||
| if (communicationProtocol == InternalCommunicationConfig.CommunicationProtocol.HTTP) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if (communicationProtocol == InternalCommunicationConfig.CommunicationProtocol.HTTP) { | |
| if (communicationProtocol == HTTP) { |
| { | ||
| List<ResourceGroupRuntimeInfo> resourceGroupRuntimeInfos = resourceGroupManager.getResourceGroupRuntimeInfos(); | ||
|
|
||
| if (communicationProtocol == InternalCommunicationConfig.CommunicationProtocol.HTTP) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if (communicationProtocol == InternalCommunicationConfig.CommunicationProtocol.HTTP) { | |
| if (communicationProtocol == HTTP) { |
| { | ||
| log.error(exception, "Resource manager %s request to %s failed", operationName, request.getUri()); | ||
| throw new PrestoException( | ||
| GENERIC_INTERNAL_ERROR, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll want to add a dedicated error code.
| import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON; | ||
| import static java.util.Objects.requireNonNull; | ||
|
|
||
| @Path("/v1/resourcemanager") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to think about how to make this more RESTful. These endpoints are RPC style like Thrift, and for HTTP we follow REST conventions.
Description
This PR adds a HTTP server for the resource manager and allows communication to and from the resource manager using HTTPS as opposed to thrift.
Motivation and Context
This will help Presto use the resource manager in cases where using the thrift protocol is not feasible.
Impact
Previously, there was a server on the resource manager located at
/v1/heartbeatfor C++ workers to send their heartbeat. This PR will revamp that server and modify the endpoint.There was also a config:
resource-manager.heartbeat-http-enabledto enable that http server. This has becomeresource-manager.http-server-enabled.Test Plan
UTs, local testing.
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.