Skip to content

Commit 70a7a84

Browse files
AMBARI-26316: Add metrics for Agent Stomp and Api Stomp performance
1 parent 55c2ba1 commit 70a7a84

File tree

8 files changed

+1917
-1
lines changed

8 files changed

+1917
-1
lines changed

ambari-funtest/src/test/resources/metrics.properties

+1
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,4 @@
2121
# Source interval determines how often the metric is sent to sink. Its unit is in seconds
2222
metric.sources=jvm
2323
source.jvm.class=org.apache.ambari.server.metrics.system.impl.JvmMetricsSource
24+
source.stats.class=org.apache.ambari.server.metrics.system.impl.StompStatsMetricsSource

ambari-server/conf/unix/metrics.properties

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@
1919

2020
#################### Metrics Source Configs #####################
2121

22-
#Metric sources : jvm,database
22+
#Metric sources : jvm,database, stats
2323
metric.sources=jvm,event
2424

2525
#### JVM Source Configs ###
2626
source.jvm.class=org.apache.ambari.server.metrics.system.impl.JvmMetricsSource
2727
source.event.class=org.apache.ambari.server.metrics.system.impl.StompEventsMetricsSource
28+
source.stats.class=org.apache.ambari.server.metrics.system.impl.StompStatsMetricsSource
2829
source.jvm.interval=10
2930

3031
#### Database Source Configs ###

ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java

+9
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@
8484
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
8585
import org.apache.ambari.server.ldap.LdapModule;
8686
import org.apache.ambari.server.metrics.system.MetricsService;
87+
import org.apache.ambari.server.metrics.system.impl.MetricsConfiguration;
88+
import org.apache.ambari.server.metrics.system.impl.MetricsServiceImpl;
8789
import org.apache.ambari.server.orm.GuiceJpaInitializer;
8890
import org.apache.ambari.server.orm.PersistenceType;
8991
import org.apache.ambari.server.orm.dao.BlueprintDAO;
@@ -164,6 +166,7 @@
164166
import org.springframework.web.context.support.AnnotationConfigWebApplicationContext;
165167
import org.springframework.web.filter.DelegatingFilterProxy;
166168
import org.springframework.web.servlet.DispatcherServlet;
169+
import org.springframework.web.socket.config.WebSocketMessageBrokerStats;
167170

168171
import com.google.common.base.Joiner;
169172
import com.google.common.util.concurrent.ServiceManager;
@@ -583,6 +586,12 @@ public void run() throws Exception {
583586
LOG.info("********* Started Services **********");
584587

585588
if (!configs.isMetricsServiceDisabled()) {
589+
if (MetricsConfiguration.isStompStatMetricsConfigured() && metricsService instanceof MetricsServiceImpl) {
590+
WebSocketMessageBrokerStats apiStompStats = apiDispatcherContext.getBean(WebSocketMessageBrokerStats.class);
591+
((MetricsServiceImpl) metricsService).setApiStompStats(apiStompStats);
592+
WebSocketMessageBrokerStats agentStompStats = agentDispatcherContext.getBean(WebSocketMessageBrokerStats.class);
593+
((MetricsServiceImpl) metricsService).setAgentStompStats(agentStompStats);
594+
}
586595
metricsService.start();
587596
} else {
588597
LOG.info("AmbariServer Metrics disabled.");

ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsConfiguration.java

+26
Original file line numberDiff line numberDiff line change
@@ -124,4 +124,30 @@ public static MetricsConfiguration getSubsetConfiguration(MetricsConfiguration m
124124

125125
return new MetricsConfiguration(subsetProperties);
126126
}
127+
128+
/**
129+
* For enabling stompStats Metrics, append 'stats' in metric.sources.
130+
* @return true if StompStatsMetricsSource is configured, false otherwise.
131+
*/
132+
public static boolean isStompStatMetricsConfigured() {
133+
MetricsConfiguration configuration = getMetricsConfiguration();
134+
String commaSeparatedSources = configuration.getProperty("metric.sources");
135+
if (StringUtils.isEmpty(commaSeparatedSources)) {
136+
return false;
137+
}
138+
139+
String[] sourceNames = commaSeparatedSources.split(",");
140+
for (String sourceName : sourceNames) {
141+
if (StringUtils.isEmpty(sourceName)) {
142+
continue;
143+
}
144+
sourceName = sourceName.trim();
145+
146+
String className = configuration.getProperty("source." + sourceName + ".class");
147+
if (className.equals(StompStatsMetricsSource.class.getName())) {
148+
return true;
149+
}
150+
}
151+
return false;
152+
}
127153
}

ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java

+17
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.commons.lang.StringUtils;
3131
import org.slf4j.Logger;
3232
import org.slf4j.LoggerFactory;
33+
import org.springframework.web.socket.config.WebSocketMessageBrokerStats;
3334

3435
import com.google.inject.Inject;
3536
import com.google.inject.Singleton;
@@ -46,6 +47,9 @@ public class MetricsServiceImpl implements MetricsService {
4647
@Inject
4748
STOMPUpdatePublisher STOMPUpdatePublisher;
4849

50+
private WebSocketMessageBrokerStats apiStompStats;
51+
private WebSocketMessageBrokerStats agentStompStats;
52+
4953
@Override
5054
public void start() {
5155
LOG.info("********* Initializing AmbariServer Metrics Service **********");
@@ -119,6 +123,11 @@ private void initializeMetricSources() {
119123
STOMPUpdatePublisher.registerAPI(src);
120124
STOMPUpdatePublisher.registerAgent(src);
121125
}
126+
127+
if (src instanceof StompStatsMetricsSource && apiStompStats != null && agentStompStats != null) {
128+
((StompStatsMetricsSource) src).setApiStompStats(apiStompStats);
129+
((StompStatsMetricsSource) src).setAgentStompStats(agentStompStats);
130+
}
122131
src.start();
123132
}
124133

@@ -134,4 +143,12 @@ public static MetricsSource getSource(String type) {
134143
public static MetricsSink getSink() {
135144
return sink;
136145
}
146+
147+
public void setApiStompStats(WebSocketMessageBrokerStats apiStompStats) {
148+
this.apiStompStats = apiStompStats;
149+
}
150+
151+
public void setAgentStompStats(WebSocketMessageBrokerStats agentStompStats) {
152+
this.agentStompStats = agentStompStats;
153+
}
137154
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.ambari.server.metrics.system.impl;
20+
21+
import java.util.ArrayList;
22+
import java.util.Arrays;
23+
import java.util.List;
24+
import java.util.concurrent.Executors;
25+
import java.util.concurrent.ScheduledExecutorService;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.stream.Collectors;
28+
29+
import org.apache.ambari.server.metrics.system.SingleMetric;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
import org.springframework.web.socket.config.WebSocketMessageBrokerStats;
33+
34+
/**
35+
* Gets the metrics about stomp stats connections and publishes to configured
36+
* Metric Sink.
37+
*/
38+
public class StompStatsMetricsSource extends AbstractMetricsSource {
39+
public static final String[] metricsTypes = { "stomp.api", "stomp.agent" };
40+
public static final String[] poolMetrics = { "pool_size", "active_threads", "queued_tasks", "completed_tasks" };
41+
public static final String[] webSocketMetrics = { "current_client_session", "current_web_socket_session",
42+
"current_http_stream", "current_http_polling",
43+
"total_sessions_established", "abnormally_closed_session", "connect_failure_session",
44+
"send_time_limit_exceeded", "transport_errors_sessions" };
45+
public static final String[] stompSubProtocolMetrics = { "connect", "connected", "disconnect" };
46+
private WebSocketMessageBrokerStats apiStompStats;
47+
private WebSocketMessageBrokerStats agentStompStats;
48+
private static final Logger LOG = LoggerFactory.getLogger(StompEventsMetricsSource.class);
49+
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
50+
51+
@Override
52+
public void start() {
53+
int interval = 60;
54+
LOG.info("Starting stomp stat source.");
55+
try {
56+
executor.scheduleWithFixedDelay(new Runnable() {
57+
@Override
58+
public void run() {
59+
List<SingleMetric> events = getStompStatMetrics();
60+
if (!events.isEmpty()) {
61+
sink.publish(events);
62+
LOG.debug("********* Published stomp stat metrics to sink **********");
63+
}
64+
}
65+
}, interval, interval, TimeUnit.SECONDS);
66+
} catch (Exception e) {
67+
LOG.info("Throwing exception when starting stomp stat source", e);
68+
}
69+
}
70+
71+
protected List<SingleMetric> getStompStatMetrics() {
72+
List<SingleMetric> metrics = new ArrayList<>();
73+
populateStompStatMetrics(metrics, metricsTypes[0], apiStompStats);
74+
populateStompStatMetrics(metrics, metricsTypes[1], agentStompStats);
75+
return metrics;
76+
}
77+
78+
private void populateStompStatMetrics(List<SingleMetric> metricsList, String metricsPrefix,
79+
WebSocketMessageBrokerStats stompStats) {
80+
parseStats(metricsList, metricsPrefix + ".websocket.", webSocketMetrics,
81+
stompStats.getWebSocketSessionStatsInfo());
82+
parseStats(metricsList, metricsPrefix + ".stomp_sub_protocol.", stompSubProtocolMetrics,
83+
stompStats.getStompSubProtocolStatsInfo());
84+
parseStats(metricsList, metricsPrefix + ".inbound_channel.", poolMetrics,
85+
stompStats.getClientInboundExecutorStatsInfo());
86+
parseStats(metricsList, metricsPrefix + ".outbound_channel.", poolMetrics,
87+
stompStats.getClientOutboundExecutorStatsInfo());
88+
parseStats(metricsList, metricsPrefix + ".sockJsScheduler.", poolMetrics,
89+
stompStats.getSockJsTaskSchedulerStatsInfo());
90+
}
91+
92+
private void parseStats(List<SingleMetric> metricsList, String metricsPrefix, String[] metricsNamesList,
93+
String stats) {
94+
if (stats.equals("null")) {
95+
LOG.warn("stats for " + metricsPrefix + " is null");
96+
return;
97+
}
98+
List<Long> statsArray = getValuesFromString(stats);
99+
100+
// sanity check to make sure we have the same number of metrics and values.
101+
if (statsArray.size() != metricsNamesList.length) {
102+
LOG.error("Number of metrics and stats do not match for " + metricsPrefix);
103+
return;
104+
}
105+
long currentTimeMillis = System.currentTimeMillis();
106+
for (int i = 0; i < metricsNamesList.length; i++) {
107+
metricsList
108+
.add(new SingleMetric(metricsPrefix + metricsNamesList[i], statsArray.get(i), currentTimeMillis));
109+
}
110+
}
111+
112+
private List<Long> getValuesFromString(String stats) {
113+
// '\\D+' matches one or more non-digit characters. After that it split the
114+
// string and filter out empty strings. Then we convert the string to long.
115+
return Arrays.stream(stats.split("\\D+"))
116+
.filter(s -> !s.isEmpty())
117+
.map(Long::parseLong)
118+
.collect(Collectors.toList());
119+
}
120+
121+
public void setApiStompStats(WebSocketMessageBrokerStats apiStompStats) {
122+
this.apiStompStats = apiStompStats;
123+
}
124+
125+
public void setAgentStompStats(WebSocketMessageBrokerStats agentStompStats) {
126+
this.agentStompStats = agentStompStats;
127+
}
128+
}

0 commit comments

Comments
 (0)