Skip to content

Commit

Permalink
Expose wsrelay metrics
Browse files Browse the repository at this point in the history
* We were already maintaining prometheus metrics for the websocket relay
  service. This change exposes those metrics over the local per-service
  prometheus server.
  for this instead.
* Remove run_wsrelay --status in favor of directing users at prometheus
  metrics
  • Loading branch information
chrismeyersfsu committed Apr 4, 2024
1 parent ae1235b commit d6e8294
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 241 deletions.
212 changes: 102 additions & 110 deletions awx/main/analytics/broadcast_websocket.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import datetime
import asyncio
import logging
import redis
import redis.asyncio
import re

from prometheus_client import (
Expand All @@ -11,15 +8,11 @@
Counter,
Enum,
CollectorRegistry,
parser,
)

from django.conf import settings


BROADCAST_WEBSOCKET_REDIS_KEY_NAME = 'broadcast_websocket_stats'


logger = logging.getLogger('awx.analytics.broadcast_websocket')


Expand All @@ -36,134 +29,133 @@ def safe_name(s):
return re.sub('[^0-9a-zA-Z]+', '_', s)


# Second granularity; Per-minute
class FixedSlidingWindow:
def __init__(self, start_time=None):
self.buckets = dict()
self.start_time = start_time or now_seconds()
class ConnectionState:
CONNECTED = 'connected'
DISCONNECTED = 'disconnected'

def cleanup(self, now_bucket=None):
now_bucket = now_bucket or now_seconds()
if self.start_time + 60 < now_bucket:
self.start_time = now_bucket - 60

# Delete old entries
for k in list(self.buckets.keys()):
if k < self.start_time:
del self.buckets[k]
class Metrics:
"""
Attributes with underscores are NOT registered
Attributes WITHOUT underscores ARE registered
"""

def record(self, ts=None):
now_bucket = ts or dt_to_seconds(datetime.datetime.now())
CONNECTION_STATE = ConnectionState()

val = self.buckets.get(now_bucket, 0)
self.buckets[now_bucket] = val + 1
def __init__(self, namespace=settings.METRICS_SERVICE_WEBSOCKET_RELAY):
self.messages_received_total = Counter(
f'{namespace}_messages_received_total',
'Number of messages received, to be forwarded, by the broadcast websocket system',
['remote_host'],
registry=None,
)
self.messages_sent = Counter(
f'{namespace}_messages_sent_total',
'Number of messages sent (relayed)',
['remote_host'],
registry=None,
)
self.connection = Enum(
f'{namespace}_connection',
'Websocket broadcast connection established status',
['remote_host'],
states=[self.CONNECTION_STATE.DISCONNECTED, self.CONNECTION_STATE.CONNECTED],
registry=None,
)
self.connection_start = Gauge(
f'{namespace}_connection_start_time_seconds', 'Time the connection was established since unix epoch in seconds', ['remote_host'], registry=None
)
self.producers = Gauge(
f'{namespace}_producers_count',
'Number of async workers',
['remote_host'],
registry=None,
)

self.cleanup(now_bucket)
def record_message_received(self, remote_host):
self.messages_received_total.labels(remote_host=remote_host).inc()

def render(self, ts=None):
self.cleanup(now_bucket=ts)
return sum(self.buckets.values()) or 0
def record_connection_established(self, remote_host):
self.connection.labels(remote_host=remote_host).state(self.CONNECTION_STATE.CONNECTED)
self.connection_start.labels(remote_host=remote_host).set_to_current_time()

def record_connection_lost(self, remote_host):
self.connection.labels(remote_host=remote_host).state(self.CONNECTION_STATE.DISCONNECTED)

class RelayWebsocketStatsManager:
def __init__(self, event_loop, local_hostname):
self._local_hostname = local_hostname
def record_producer_start(self, remote_host):
self.producers.labels(remote_host=remote_host).inc()

self._event_loop = event_loop
self._stats = dict()
self._redis_key = BROADCAST_WEBSOCKET_REDIS_KEY_NAME
def record_producer_stop(self, remote_host):
self.producers.labels(remote_host=remote_host).dec()

def new_remote_host_stats(self, remote_hostname):
self._stats[remote_hostname] = RelayWebsocketStats(self._local_hostname, remote_hostname)
return self._stats[remote_hostname]
def record_message_sent(self, remote_host):
self.messages_sent.labels(remote_host=remote_host).inc()

def delete_remote_host_stats(self, remote_hostname):
del self._stats[remote_hostname]
def init_host(self, remote_host):
self.messages_received_total.labels(remote_host=remote_host)
self.connection.labels(remote_host=remote_host)
self.connection_start.labels(remote_host=remote_host)

async def run_loop(self):
try:
redis_conn = await redis.asyncio.Redis.from_url(settings.BROKER_URL)
while True:
stats_data_str = ''.join(stat.serialize() for stat in self._stats.values())
await redis_conn.set(self._redis_key, stats_data_str)

await asyncio.sleep(settings.BROADCAST_WEBSOCKET_STATS_POLL_RATE_SECONDS)
except Exception as e:
logger.warning(e)
await asyncio.sleep(settings.BROADCAST_WEBSOCKET_STATS_POLL_RATE_SECONDS)
self.start()
class MetricsForHost:
def __init__(self, metrics: Metrics, remote_host):
self._metrics = metrics
self._remote_host = remote_host

def start(self):
self.async_task = self._event_loop.create_task(self.run_loop())
return self.async_task
self._metrics.init_host(self._remote_host)

@classmethod
def get_stats_sync(cls):
"""
Stringified verion of all the stats
"""
redis_conn = redis.Redis.from_url(settings.BROKER_URL)
stats_str = redis_conn.get(BROADCAST_WEBSOCKET_REDIS_KEY_NAME) or b''
return parser.text_string_to_metric_families(stats_str.decode('UTF-8'))
def record_message_received(self):
self._metrics.record_message_received(self._remote_host)

def record_producer_start(self):
self._metrics.record_producer_start(self._remote_host)

class RelayWebsocketStats:
def __init__(self, local_hostname, remote_hostname):
self._local_hostname = local_hostname
self._remote_hostname = remote_hostname
self._registry = CollectorRegistry()
def record_producer_stop(self):
self._metrics.record_producer_stop(self._remote_host)

# TODO: More robust replacement
self.name = safe_name(self._local_hostname)
self.remote_name = safe_name(self._remote_hostname)
def record_connection_established(self):
self._metrics.record_connection_established(self._remote_host)

self._messages_received_total = Counter(
f'awx_{self.remote_name}_messages_received_total',
'Number of messages received, to be forwarded, by the broadcast websocket system',
registry=self._registry,
)
self._messages_received_current_conn = Gauge(
f'awx_{self.remote_name}_messages_received_currrent_conn',
'Number forwarded messages received by the broadcast websocket system, for the duration of the current connection',
registry=self._registry,
)
self._connection = Enum(
f'awx_{self.remote_name}_connection', 'Websocket broadcast connection', states=['disconnected', 'connected'], registry=self._registry
)
self._connection.state('disconnected')
self._connection_start = Gauge(f'awx_{self.remote_name}_connection_start', 'Time the connection was established', registry=self._registry)
def record_connection_lost(self):
self._metrics.record_connection_lost(self._remote_host)

self._messages_received_per_minute = Gauge(
f'awx_{self.remote_name}_messages_received_per_minute', 'Messages received per minute', registry=self._registry
)
self._internal_messages_received_per_minute = FixedSlidingWindow()
def record_message_send(self):
self._metrics.record_message_send(self._remote_host)

def unregister(self):
self._registry.unregister(f'awx_{self.remote_name}_messages_received')
self._registry.unregister(f'awx_{self.remote_name}_connection')

def record_message_received(self):
self._internal_messages_received_per_minute.record()
self._messages_received_current_conn.inc()
self._messages_received_total.inc()
class MetricsRegistryBridge:
"""
Scope: Prometheus CollectorRegistry, Metrics
"""

def record_connection_established(self):
self._connection.state('connected')
self._connection_start.set_to_current_time()
self._messages_received_current_conn.set(0)
def __init__(self, metrics: Metrics, registry: CollectorRegistry, autoregister=True):
self._metrics = metrics
self._registry = registry
self.registered = False

if autoregister:
self.register_all_metrics()
self.registered = True

def register_all_metrics(self):
for metric in [v for k, v in vars(self._metrics).items() if not k.startswith('_')]:
self._registry.register(metric)

def record_connection_lost(self):
self._connection.state('disconnected')

def get_connection_duration(self):
return (datetime.datetime.now() - self._connection_established_ts).total_seconds()
class MetricsManager:
def __init__(self, metrics: Metrics):
self._metrics = metrics

def render(self):
msgs_per_min = self._internal_messages_received_per_minute.render()
self._messages_received_per_minute.set(msgs_per_min)
def allocate(self, remote_hostname):
return MetricsForHost(self._metrics, safe_name(remote_hostname))

def serialize(self):
self.render()
def deallocate(self, remote_hostname):
"""
Intentionally do nothing.
Keep this function around. The code responsible for calling allocate() should call deallocate().
Knowing where deallocation should happen is useful.
registry_data = generate_latest(self._registry).decode('UTF-8')
return registry_data
It seems to be a patterns and best practice to _not_ delete metrics.
"""
pass
16 changes: 6 additions & 10 deletions awx/main/analytics/subsystem_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ class MetricsServerSettings(MetricsNamespace):
def port(self):
return settings.METRICS_SUBSYSTEM_CONFIG['server'][self._namespace]['port']

def address(self):
return settings.METRICS_SUBSYSTEM_CONFIG['server'][self._namespace]['address']


class MetricsServer(MetricsServerSettings):
def __init__(self, namespace, registry):
Expand All @@ -36,7 +39,7 @@ def __init__(self, namespace, registry):
def start(self):
try:
# TODO: addr for ipv6 ?
prometheus_client.start_http_server(self.port(), addr='localhost', registry=self._registry)
prometheus_client.start_http_server(self.port(), addr=self.address(), registry=self._registry)
except Exception:
logger.error(f"MetricsServer failed to start for service '{self._namespace}.")
raise
Expand Down Expand Up @@ -452,19 +455,12 @@ def collect(self):
class CallbackReceiverMetricsServer(MetricsServer):
def __init__(self):
registry = CollectorRegistry(auto_describe=True)
registry.register(CustomToPrometheusMetricsCollector(DispatcherMetrics(metrics_have_changed=False)))
registry.register(CustomToPrometheusMetricsCollector(CallbackReceiverMetrics(metrics_have_changed=False)))
super().__init__(settings.METRICS_SERVICE_CALLBACK_RECEIVER, registry)


class DispatcherMetricsServer(MetricsServer):
def __init__(self):
registry = CollectorRegistry(auto_describe=True)
registry.register(CustomToPrometheusMetricsCollector(CallbackReceiverMetrics(metrics_have_changed=False)))
registry.register(CustomToPrometheusMetricsCollector(DispatcherMetrics(metrics_have_changed=False)))
super().__init__(settings.METRICS_SERVICE_DISPATCHER, registry)


class WebsocketsMetricsServer(MetricsServer):
def __init__(self):
registry = CollectorRegistry(auto_describe=True)
# registry.register()
super().__init__(settings.METRICS_SERVICE_WEBSOCKETS, registry)

0 comments on commit d6e8294

Please sign in to comment.