Skip to content

Commit c98d756

Browse files
committed
temp
1 parent f5318c3 commit c98d756

File tree

9 files changed

+472
-79
lines changed

9 files changed

+472
-79
lines changed

presto-main-base/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerConfig.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.facebook.airlift.configuration.ConfigDescription;
1818
import com.facebook.airlift.units.Duration;
1919
import com.facebook.airlift.units.MinDuration;
20+
import com.facebook.presto.server.InternalCommunicationConfig;
2021
import jakarta.validation.constraints.Min;
2122

2223
import java.util.concurrent.TimeUnit;
@@ -42,7 +43,9 @@ public class ResourceManagerConfig
4243
private Duration memoryPoolFetchInterval = new Duration(1, SECONDS);
4344
private boolean resourceGroupServiceCacheEnabled;
4445
private Duration resourceGroupServiceCacheExpireInterval = new Duration(10, SECONDS);
45-
private boolean heartbeatHttpEnabled;
46+
private boolean httpServerEnabled;
47+
48+
private InternalCommunicationConfig.CommunicationProtocol communicationProtocol = InternalCommunicationConfig.CommunicationProtocol.THRIFT;
4649
private Duration resourceGroupServiceCacheRefreshInterval = new Duration(1, SECONDS);
4750

4851
private Duration runningTaskCountFetchInterval = new Duration(1, SECONDS);
@@ -279,15 +282,29 @@ public ResourceManagerConfig setRunningTaskCountFetchInterval(Duration runningTa
279282
return this;
280283
}
281284

282-
public boolean getHeartbeatHttpEnabled()
285+
@Config("resource-manager.http-server-enabled")
286+
@ConfigDescription("Enable HTTP REST endpoints on the resource manager for internal communication.")
287+
public ResourceManagerConfig setHttpServerEnabled(boolean httpEnabled)
288+
{
289+
this.httpServerEnabled = httpEnabled;
290+
return this;
291+
}
292+
293+
public boolean getHttpServerEnabled()
283294
{
284-
return heartbeatHttpEnabled;
295+
return httpServerEnabled;
285296
}
286297

287-
@Config("resource-manager.heartbeat-http-enabled")
288-
public ResourceManagerConfig setHeartbeatHttpEnabled(boolean heartbeatHttpEnabled)
298+
@Config("resource-manager.communication-protocol")
299+
@ConfigDescription("Protocol for internal communication with resource managers.")
300+
public ResourceManagerConfig setCommunicationProtocol(InternalCommunicationConfig.CommunicationProtocol communicationProtocol)
289301
{
290-
this.heartbeatHttpEnabled = heartbeatHttpEnabled;
302+
this.communicationProtocol = communicationProtocol;
291303
return this;
292304
}
305+
306+
public InternalCommunicationConfig.CommunicationProtocol getCommunicationProtocol()
307+
{
308+
return communicationProtocol;
309+
}
293310
}

presto-main-base/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerConfig.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import com.facebook.airlift.configuration.testing.ConfigAssertions;
1717
import com.facebook.airlift.units.Duration;
18+
import com.facebook.presto.server.InternalCommunicationConfig;
1819
import com.google.common.collect.ImmutableMap;
1920
import org.testng.annotations.Test;
2021

@@ -50,7 +51,8 @@ public void testDefaults()
5051
.setResourceGroupRuntimeHeartbeatInterval(new Duration(1, SECONDS))
5152
.setRunningTaskCountFetchInterval(new Duration(1, SECONDS))
5253
.setResourceGroupRuntimeInfoTimeout(new Duration(30, SECONDS))
53-
.setHeartbeatHttpEnabled(false));
54+
.setHttpServerEnabled(false)
55+
.setCommunicationProtocol(InternalCommunicationConfig.CommunicationProtocol.THRIFT));
5456
}
5557

5658
@Test
@@ -75,7 +77,8 @@ public void testExplicitPropertyMappings()
7577
.put("resource-manager.resource-group-runtimeinfo-heartbeat-interval", "6m")
7678
.put("resource-manager.running-task-count-fetch-interval", "1m")
7779
.put("resource-manager.resource-group-runtimeinfo-timeout", "4s")
78-
.put("resource-manager.heartbeat-http-enabled", "true")
80+
.put("resource-manager.http-enabled", "true")
81+
.put("resource-manager.communication-protocol", "HTTP")
7982
.build();
8083

8184
ResourceManagerConfig expected = new ResourceManagerConfig()
@@ -97,7 +100,8 @@ public void testExplicitPropertyMappings()
97100
.setResourceGroupRuntimeHeartbeatInterval(new Duration(6, MINUTES))
98101
.setResourceGroupRuntimeInfoTimeout(new Duration(4, SECONDS))
99102
.setRunningTaskCountFetchInterval(new Duration(1, MINUTES))
100-
.setHeartbeatHttpEnabled(true);
103+
.setHttpServerEnabled(true)
104+
.setCommunicationProtocol(InternalCommunicationConfig.CommunicationProtocol.HTTP);
101105

102106
assertFullMapping(properties, expected);
103107
}

presto-main/etc/catalog/hive.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@
66
#
77

88
connector.name=hive-hadoop2
9-
hive.metastore.uri=thrift://localhost:9083
9+
hive.metastore.uri=thrift://localhost:9083
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.resourcemanager;
15+
16+
import com.facebook.airlift.http.client.HttpClient;
17+
import com.facebook.airlift.http.client.HttpUriBuilder;
18+
import com.facebook.airlift.http.client.Request;
19+
import com.facebook.airlift.http.client.Response;
20+
import com.facebook.airlift.http.client.ResponseHandler;
21+
import com.facebook.airlift.json.JsonCodec;
22+
import com.facebook.airlift.log.Logger;
23+
import com.facebook.presto.execution.resourceGroups.ResourceGroupRuntimeInfo;
24+
import com.facebook.presto.server.BasicQueryInfo;
25+
import com.facebook.presto.server.NodeStatus;
26+
import com.facebook.presto.spi.HostAddress;
27+
import com.facebook.presto.spi.PrestoException;
28+
import com.facebook.presto.spi.memory.ClusterMemoryPoolInfo;
29+
import com.facebook.presto.spi.memory.MemoryPoolId;
30+
import jakarta.inject.Inject;
31+
32+
import java.net.URI;
33+
import java.util.List;
34+
import java.util.Map;
35+
36+
import static com.facebook.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
37+
import static com.facebook.airlift.http.client.JsonResponseHandler.createJsonResponseHandler;
38+
import static com.facebook.airlift.http.client.Request.Builder.prepareGet;
39+
import static com.facebook.airlift.http.client.Request.Builder.preparePut;
40+
import static com.facebook.airlift.http.client.StaticBodyGenerator.createStaticBodyGenerator;
41+
import static com.facebook.airlift.json.JsonCodec.listJsonCodec;
42+
import static com.facebook.airlift.json.JsonCodec.mapJsonCodec;
43+
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
44+
import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON;
45+
import static java.lang.String.format;
46+
import static java.util.Objects.requireNonNull;
47+
48+
/**
49+
* HTTP-based implementation of ResourceManagerClient.
50+
* Communicates with Resource Manager via HTTP REST endpoints.
51+
*/
52+
public class HttpResourceManagerClient
53+
implements ResourceManagerClient
54+
{
55+
private static final Logger log = Logger.get(HttpResourceManagerClient.class);
56+
57+
private final HttpClient httpClient;
58+
private final JsonCodec<NodeStatus> nodeStatusCodec;
59+
private final JsonCodec<BasicQueryInfo> basicQueryInfoCodec;
60+
private final JsonCodec<List<ResourceGroupRuntimeInfo>> resourceGroupRuntimeInfoListCodec;
61+
private final JsonCodec<Map<MemoryPoolId, ClusterMemoryPoolInfo>> memoryPoolInfoCodec;
62+
private final JsonCodec<Integer> integerCodec;
63+
private final HostAddress resourceManagerAddress;
64+
65+
@Inject
66+
public HttpResourceManagerClient(
67+
@ForResourceManager HttpClient httpClient,
68+
JsonCodec<NodeStatus> nodeStatusCodec,
69+
JsonCodec<BasicQueryInfo> basicQueryInfoCodec,
70+
JsonCodec<List<ResourceGroupRuntimeInfo>> resourceGroupRuntimeInfoListCodec,
71+
JsonCodec<Map<MemoryPoolId, ClusterMemoryPoolInfo>> memoryPoolInfoCodec,
72+
HostAddress resourceManagerAddress)
73+
{
74+
this.httpClient = requireNonNull(httpClient, "httpClient is null");
75+
this.nodeStatusCodec = requireNonNull(nodeStatusCodec, "nodeStatusCodec is null");
76+
this.basicQueryInfoCodec = requireNonNull(basicQueryInfoCodec, "basicQueryInfoCodec is null");
77+
this.resourceGroupRuntimeInfoListCodec = requireNonNull(resourceGroupRuntimeInfoListCodec, "resourceGroupRuntimeInfoListCodec is null");
78+
this.memoryPoolInfoCodec = requireNonNull(memoryPoolInfoCodec, "memoryPoolInfoCodec is null");
79+
this.integerCodec = JsonCodec.jsonCodec(Integer.class);
80+
this.resourceManagerAddress = requireNonNull(resourceManagerAddress, "resourceManagerAddress is null");
81+
}
82+
83+
public HttpResourceManagerClient(
84+
HttpClient httpClient,
85+
HostAddress resourceManagerAddress)
86+
{
87+
this(
88+
httpClient,
89+
JsonCodec.jsonCodec(NodeStatus.class),
90+
JsonCodec.jsonCodec(BasicQueryInfo.class),
91+
listJsonCodec(ResourceGroupRuntimeInfo.class),
92+
mapJsonCodec(MemoryPoolId.class, ClusterMemoryPoolInfo.class),
93+
resourceManagerAddress);
94+
}
95+
96+
@Override
97+
public void queryHeartbeat(String internalNode, BasicQueryInfo basicQueryInfo, long sequenceId)
98+
{
99+
URI uri = buildUri("/v1/resourcemanager/queryHeartbeat",
100+
"nodeId", internalNode, "sequenceId", String.valueOf(sequenceId));
101+
102+
Request request = preparePut()
103+
.setUri(uri)
104+
.setHeader("Content-Type", APPLICATION_JSON)
105+
.setBodyGenerator(createStaticBodyGenerator(basicQueryInfoCodec.toJsonBytes(basicQueryInfo)))
106+
.build();
107+
108+
httpClient.execute(request, new VoidResponseHandler("queryHeartbeat"));
109+
}
110+
111+
@Override
112+
public List<ResourceGroupRuntimeInfo> getResourceGroupInfo(String excludingNode)
113+
throws ResourceManagerInconsistentException
114+
{
115+
URI uri = buildUri("/v1/resourcemanager/resourceGroupInfo",
116+
"excludingNode", excludingNode);
117+
118+
Request request = prepareGet()
119+
.setUri(uri)
120+
.setHeader("Content-Type", APPLICATION_JSON)
121+
.build();
122+
123+
return httpClient.execute(request, createJsonResponseHandler(resourceGroupRuntimeInfoListCodec));
124+
}
125+
126+
@Override
127+
public void nodeHeartbeat(NodeStatus nodeStatus)
128+
{
129+
URI uri = buildUri("/v1/resourcemanager/nodeHeartbeat");
130+
131+
Request request = preparePut()
132+
.setUri(uri)
133+
.setHeader("Content-Type", APPLICATION_JSON)
134+
.setBodyGenerator(createStaticBodyGenerator(nodeStatusCodec.toJsonBytes(nodeStatus)))
135+
.build();
136+
137+
httpClient.execute(request, new VoidResponseHandler("nodeHeartbeat"));
138+
}
139+
140+
@Override
141+
public Map<MemoryPoolId, ClusterMemoryPoolInfo> getMemoryPoolInfo()
142+
{
143+
URI uri = buildUri("/v1/resourcemanager/memoryPoolInfo");
144+
145+
Request request = prepareGet()
146+
.setUri(uri)
147+
.setHeader("Content-Type", APPLICATION_JSON)
148+
.build();
149+
150+
return httpClient.execute(request, createJsonResponseHandler(memoryPoolInfoCodec));
151+
}
152+
153+
@Override
154+
public void resourceGroupRuntimeHeartbeat(String node, List<ResourceGroupRuntimeInfo> resourceGroupRuntimeInfo)
155+
{
156+
URI uri = buildUri("/v1/resourcemanager/resourceGroupRuntimeHeartbeat", "node", node);
157+
158+
Request request = preparePut()
159+
.setUri(uri)
160+
.setHeader("Content-Type", APPLICATION_JSON)
161+
.setBodyGenerator(createStaticBodyGenerator(resourceGroupRuntimeInfoListCodec.toJsonBytes(resourceGroupRuntimeInfo)))
162+
.build();
163+
164+
httpClient.execute(request, new VoidResponseHandler("resourceGroupRuntimeHeartbeat"));
165+
}
166+
167+
@Override
168+
public int getRunningTaskCount()
169+
{
170+
URI uri = buildUri("/v1/resourcemanager/getRunningTaskCount");
171+
172+
Request request = prepareGet()
173+
.setUri(uri)
174+
.setHeader("Content-Type", APPLICATION_JSON)
175+
.build();
176+
177+
return httpClient.execute(request, createJsonResponseHandler(integerCodec));
178+
}
179+
180+
private URI buildUri(String path, String... parameters)
181+
{
182+
HttpUriBuilder uriBuilder = uriBuilderFrom(URI.create("http://" + resourceManagerAddress.toString()))
183+
.appendPath(path);
184+
185+
if (parameters.length % 2 != 0) {
186+
throw new IllegalArgumentException("Parameters must be in key/value pairs");
187+
}
188+
189+
for (int i = 0; i < parameters.length; i += 2) {
190+
uriBuilder.addParameter(parameters[i], parameters[i + 1]);
191+
}
192+
return uriBuilder.build();
193+
}
194+
195+
private static class VoidResponseHandler
196+
implements ResponseHandler<Void, RuntimeException>
197+
{
198+
private final String operationName;
199+
200+
public VoidResponseHandler(String operationName)
201+
{
202+
this.operationName = requireNonNull(operationName, "operationName is null");
203+
}
204+
205+
@Override
206+
public Void handleException(Request request, Exception exception)
207+
{
208+
log.error(exception, "Resource manager %s request to %s failed", operationName, request.getUri());
209+
throw new PrestoException(
210+
GENERIC_INTERNAL_ERROR,
211+
format("Resource manager %s request to %s failed: %s", operationName, request.getUri(), exception.getMessage()),
212+
exception);
213+
}
214+
215+
@Override
216+
public Void handle(Request request, Response response)
217+
{
218+
if (response.getStatusCode() >= 400) {
219+
String errorMessage = format(
220+
"Resource manager %s request to %s failed with status %d",
221+
operationName,
222+
request.getUri(),
223+
response.getStatusCode());
224+
log.error("%s", errorMessage);
225+
throw new PrestoException(GENERIC_INTERNAL_ERROR, errorMessage);
226+
}
227+
return null;
228+
}
229+
}
230+
}

0 commit comments

Comments
 (0)