Skip to content

Commit 96075de

Browse files
[ML] Refactoring http settings and adding stats endpoint (#108219)
* Refactoring http settings and adding comments * Removing new line * Adding stats endpoint * Improving comment and adding tests * Fixing typo in setting name * Switching to TransportNodesAction * Fixing test * Trying to fix test * Adding equals hashcode * Trying to send to all nodes * Renaming to .diagnostics * Removing unneeded variable --------- Co-authored-by: Elastic Machine <[email protected]>
1 parent 7504fed commit 96075de

File tree

13 files changed

+629
-44
lines changed

13 files changed

+629
-44
lines changed
Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.inference.action;
9+
10+
import org.apache.http.pool.PoolStats;
11+
import org.elasticsearch.action.ActionType;
12+
import org.elasticsearch.action.FailedNodeException;
13+
import org.elasticsearch.action.support.TransportAction;
14+
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
15+
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
16+
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
17+
import org.elasticsearch.cluster.ClusterName;
18+
import org.elasticsearch.cluster.node.DiscoveryNode;
19+
import org.elasticsearch.common.io.stream.StreamInput;
20+
import org.elasticsearch.common.io.stream.StreamOutput;
21+
import org.elasticsearch.common.io.stream.Writeable;
22+
import org.elasticsearch.transport.TransportRequest;
23+
import org.elasticsearch.xcontent.ToXContentFragment;
24+
import org.elasticsearch.xcontent.ToXContentObject;
25+
import org.elasticsearch.xcontent.XContentBuilder;
26+
27+
import java.io.IOException;
28+
import java.util.List;
29+
import java.util.Objects;
30+
31+
public class GetInferenceDiagnosticsAction extends ActionType<GetInferenceDiagnosticsAction.Response> {
32+
33+
public static final GetInferenceDiagnosticsAction INSTANCE = new GetInferenceDiagnosticsAction();
34+
public static final String NAME = "cluster:monitor/xpack/inference/diagnostics/get";
35+
36+
public GetInferenceDiagnosticsAction() {
37+
super(NAME);
38+
}
39+
40+
public static class Request extends BaseNodesRequest<Request> {
41+
42+
public Request() {
43+
super((String[]) null);
44+
}
45+
46+
public Request(StreamInput in) throws IOException {
47+
super(in);
48+
}
49+
50+
@Override
51+
public boolean equals(Object o) {
52+
if (this == o) return true;
53+
if (o == null || getClass() != o.getClass()) return false;
54+
return true;
55+
}
56+
57+
@Override
58+
public int hashCode() {
59+
// The class doesn't have any members at the moment so return the same hash code
60+
return Objects.hash(NAME);
61+
}
62+
63+
@Override
64+
public void writeTo(StreamOutput out) {
65+
TransportAction.localOnly();
66+
}
67+
}
68+
69+
public static class NodeRequest extends TransportRequest {
70+
public NodeRequest(StreamInput in) throws IOException {
71+
super(in);
72+
}
73+
74+
public NodeRequest() {}
75+
}
76+
77+
public static class Response extends BaseNodesResponse<NodeResponse> implements Writeable, ToXContentObject {
78+
79+
public Response(StreamInput in) throws IOException {
80+
super(in);
81+
}
82+
83+
public Response(ClusterName clusterName, List<NodeResponse> nodes, List<FailedNodeException> failures) {
84+
super(clusterName, nodes, failures);
85+
}
86+
87+
@Override
88+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
89+
builder.startObject();
90+
91+
for (var entry : getNodesMap().entrySet()) {
92+
NodeResponse response = entry.getValue();
93+
94+
builder.startObject(entry.getKey());
95+
response.toXContent(builder, params);
96+
builder.endObject();
97+
}
98+
99+
builder.endObject();
100+
return builder;
101+
}
102+
103+
@Override
104+
protected List<NodeResponse> readNodesFrom(StreamInput in) throws IOException {
105+
return in.readCollectionAsList(NodeResponse::new);
106+
}
107+
108+
@Override
109+
protected void writeNodesTo(StreamOutput out, List<NodeResponse> nodes) throws IOException {
110+
out.writeCollection(nodes);
111+
}
112+
113+
@Override
114+
public boolean equals(Object o) {
115+
if (this == o) return true;
116+
if (o == null || getClass() != o.getClass()) return false;
117+
Response that = (Response) o;
118+
return Objects.equals(getNodes(), that.getNodes()) && Objects.equals(failures(), that.failures());
119+
}
120+
121+
@Override
122+
public int hashCode() {
123+
return Objects.hash(getNodes(), failures());
124+
}
125+
}
126+
127+
public static class NodeResponse extends BaseNodeResponse implements ToXContentFragment {
128+
static final String CONNECTION_POOL_STATS_FIELD_NAME = "connection_pool_stats";
129+
130+
private final ConnectionPoolStats connectionPoolStats;
131+
132+
public NodeResponse(DiscoveryNode node, PoolStats poolStats) {
133+
super(node);
134+
connectionPoolStats = ConnectionPoolStats.of(poolStats);
135+
}
136+
137+
public NodeResponse(StreamInput in) throws IOException {
138+
super(in);
139+
140+
connectionPoolStats = new ConnectionPoolStats(in);
141+
}
142+
143+
@Override
144+
public void writeTo(StreamOutput out) throws IOException {
145+
super.writeTo(out);
146+
connectionPoolStats.writeTo(out);
147+
}
148+
149+
@Override
150+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
151+
builder.field(CONNECTION_POOL_STATS_FIELD_NAME, connectionPoolStats, params);
152+
return builder;
153+
}
154+
155+
@Override
156+
public boolean equals(Object o) {
157+
if (this == o) return true;
158+
if (o == null || getClass() != o.getClass()) return false;
159+
NodeResponse response = (NodeResponse) o;
160+
return Objects.equals(connectionPoolStats, response.connectionPoolStats);
161+
}
162+
163+
@Override
164+
public int hashCode() {
165+
return Objects.hash(connectionPoolStats);
166+
}
167+
168+
ConnectionPoolStats getConnectionPoolStats() {
169+
return connectionPoolStats;
170+
}
171+
172+
static class ConnectionPoolStats implements ToXContentObject, Writeable {
173+
static final String LEASED_CONNECTIONS = "leased_connections";
174+
static final String PENDING_CONNECTIONS = "pending_connections";
175+
static final String AVAILABLE_CONNECTIONS = "available_connections";
176+
static final String MAX_CONNECTIONS = "max_connections";
177+
178+
static ConnectionPoolStats of(PoolStats poolStats) {
179+
return new ConnectionPoolStats(poolStats.getLeased(), poolStats.getPending(), poolStats.getAvailable(), poolStats.getMax());
180+
}
181+
182+
private final int leasedConnections;
183+
private final int pendingConnections;
184+
private final int availableConnections;
185+
private final int maxConnections;
186+
187+
ConnectionPoolStats(int leasedConnections, int pendingConnections, int availableConnections, int maxConnections) {
188+
this.leasedConnections = leasedConnections;
189+
this.pendingConnections = pendingConnections;
190+
this.availableConnections = availableConnections;
191+
this.maxConnections = maxConnections;
192+
}
193+
194+
ConnectionPoolStats(StreamInput in) throws IOException {
195+
this.leasedConnections = in.readVInt();
196+
this.pendingConnections = in.readVInt();
197+
this.availableConnections = in.readVInt();
198+
this.maxConnections = in.readVInt();
199+
}
200+
201+
@Override
202+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
203+
builder.startObject();
204+
builder.field(LEASED_CONNECTIONS, leasedConnections);
205+
builder.field(PENDING_CONNECTIONS, pendingConnections);
206+
builder.field(AVAILABLE_CONNECTIONS, availableConnections);
207+
builder.field(MAX_CONNECTIONS, maxConnections);
208+
builder.endObject();
209+
210+
return builder;
211+
}
212+
213+
@Override
214+
public void writeTo(StreamOutput out) throws IOException {
215+
out.writeVInt(leasedConnections);
216+
out.writeVInt(pendingConnections);
217+
out.writeVInt(availableConnections);
218+
out.writeVInt(maxConnections);
219+
}
220+
221+
@Override
222+
public boolean equals(Object o) {
223+
if (this == o) return true;
224+
if (o == null || getClass() != o.getClass()) return false;
225+
ConnectionPoolStats that = (ConnectionPoolStats) o;
226+
return leasedConnections == that.leasedConnections
227+
&& pendingConnections == that.pendingConnections
228+
&& availableConnections == that.availableConnections
229+
&& maxConnections == that.maxConnections;
230+
}
231+
232+
@Override
233+
public int hashCode() {
234+
return Objects.hash(leasedConnections, pendingConnections, availableConnections, maxConnections);
235+
}
236+
237+
int getLeasedConnections() {
238+
return leasedConnections;
239+
}
240+
241+
int getPendingConnections() {
242+
return pendingConnections;
243+
}
244+
245+
int getAvailableConnections() {
246+
return availableConnections;
247+
}
248+
249+
int getMaxConnections() {
250+
return maxConnections;
251+
}
252+
}
253+
}
254+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.inference.action;
9+
10+
import org.apache.http.pool.PoolStats;
11+
import org.elasticsearch.cluster.node.DiscoveryNode;
12+
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
13+
import org.elasticsearch.common.Strings;
14+
import org.elasticsearch.common.io.stream.Writeable;
15+
import org.elasticsearch.test.AbstractWireSerializingTestCase;
16+
17+
import java.io.IOException;
18+
import java.io.UnsupportedEncodingException;
19+
20+
public class GetInferenceDiagnosticsActionNodeResponseTests extends AbstractWireSerializingTestCase<
21+
GetInferenceDiagnosticsAction.NodeResponse> {
22+
public static GetInferenceDiagnosticsAction.NodeResponse createRandom() {
23+
DiscoveryNode node = DiscoveryNodeUtils.create("id");
24+
var randomPoolStats = new PoolStats(randomInt(), randomInt(), randomInt(), randomInt());
25+
26+
return new GetInferenceDiagnosticsAction.NodeResponse(node, randomPoolStats);
27+
}
28+
29+
@Override
30+
protected Writeable.Reader<GetInferenceDiagnosticsAction.NodeResponse> instanceReader() {
31+
return GetInferenceDiagnosticsAction.NodeResponse::new;
32+
}
33+
34+
@Override
35+
protected GetInferenceDiagnosticsAction.NodeResponse createTestInstance() {
36+
return createRandom();
37+
}
38+
39+
@Override
40+
protected GetInferenceDiagnosticsAction.NodeResponse mutateInstance(GetInferenceDiagnosticsAction.NodeResponse instance)
41+
throws IOException {
42+
var select = randomIntBetween(0, 3);
43+
var connPoolStats = instance.getConnectionPoolStats();
44+
45+
return switch (select) {
46+
case 0 -> new GetInferenceDiagnosticsAction.NodeResponse(
47+
instance.getNode(),
48+
new PoolStats(
49+
randomInt(),
50+
connPoolStats.getPendingConnections(),
51+
connPoolStats.getAvailableConnections(),
52+
connPoolStats.getMaxConnections()
53+
)
54+
);
55+
case 1 -> new GetInferenceDiagnosticsAction.NodeResponse(
56+
instance.getNode(),
57+
new PoolStats(
58+
connPoolStats.getLeasedConnections(),
59+
randomInt(),
60+
connPoolStats.getAvailableConnections(),
61+
connPoolStats.getMaxConnections()
62+
)
63+
);
64+
case 2 -> new GetInferenceDiagnosticsAction.NodeResponse(
65+
instance.getNode(),
66+
new PoolStats(
67+
connPoolStats.getLeasedConnections(),
68+
connPoolStats.getPendingConnections(),
69+
randomInt(),
70+
connPoolStats.getMaxConnections()
71+
)
72+
);
73+
case 3 -> new GetInferenceDiagnosticsAction.NodeResponse(
74+
instance.getNode(),
75+
new PoolStats(
76+
connPoolStats.getLeasedConnections(),
77+
connPoolStats.getPendingConnections(),
78+
connPoolStats.getAvailableConnections(),
79+
randomInt()
80+
)
81+
);
82+
default -> throw new UnsupportedEncodingException(Strings.format("Encountered unsupported case %s", select));
83+
};
84+
}
85+
}

0 commit comments

Comments
 (0)