Skip to content

Commit

Permalink
Add prometheus metrics (#418)
Browse files Browse the repository at this point in the history
* add prometheus metrics to server

* <bot> update requirements-docs.txt

* <bot> update requirements-tests.txt

* <bot> update requirements.txt

* fix tests

* fix async vs non

* <bot> update requirements-docs.txt

* <bot> update requirements-tests.txt

* <bot> update requirements.txt

* fix async generator

* make condor history a generator to attempt to increase speed

* add async task groups to better parallelize calls

* <bot> update requirements-docs.txt

* <bot> update requirements-tests.txt

* <bot> update requirements.txt

* better histogram buckets

* <bot> update requirements-docs.txt

* <bot> update requirements-tests.txt

* <bot> update requirements.txt

* fix flake8

* fix negative times and bad bucket sizes

* add prometheus to rest server

* fix flake8

* expand to materialization and credentials servers

* remove another statsd reference

* fix label format

* add prometheus to website

* put prom api metric into a mixin

* convert dataset monitor to prometheus

* remove statsd from pip install

* fix metric names

* <bot> update setup.cfg

* <bot> update requirements-docs.txt

* <bot> update requirements-tests.txt

* <bot> update requirements.txt

* still not ready for python 3.13, because of htcondor

* <bot> update setup.cfg

* try harder for labels

* drop two old tests

* reset on gridftp issues

---------

Co-authored-by: github-actions <[email protected]>
  • Loading branch information
dsschult and github-actions authored Jan 28, 2025
1 parent 94becb0 commit 6e51584
Show file tree
Hide file tree
Showing 25 changed files with 511 additions and 590 deletions.
30 changes: 15 additions & 15 deletions iceprod/credentials/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import time

from prometheus_client import Info, start_http_server
import pymongo
import pymongo.errors
import motor.motor_asyncio
Expand All @@ -16,9 +17,9 @@
from tornado.web import RequestHandler as TornadoRequestHandler
from wipac_dev_tools import from_environment

from iceprod import __version__ as version_string
from iceprod.rest.auth import authorization
from iceprod.rest.base_handler import IceProdRestConfig, APIBase
from iceprod.server.module import FakeStatsClient, StatsClientIgnoreErrors
from iceprod.server.util import nowstr, datetime2str
from .service import RefreshService, get_expiration, is_expired

Expand Down Expand Up @@ -427,8 +428,7 @@ def __init__(self):
'DB_URL': 'mongodb://localhost/creds',
'DB_TIMEOUT': 60,
'DB_WRITE_CONCERN': 1,
'STATSD_ADDRESS': '',
'STATSD_PREFIX': 'credentials',
'PROMETHEUS_PORT': 0,
'CI_TESTING': '',
}
config = from_environment(default_config)
Expand All @@ -453,17 +453,8 @@ def __init__(self):
else:
raise RuntimeError('OPENID_URL not specified, and CI_TESTING not enabled!')

statsd = FakeStatsClient()
if config['STATSD_ADDRESS']:
try:
addr = config['STATSD_ADDRESS']
port = 8125
if ':' in addr:
addr,port = addr.split(':')
port = int(port)
statsd = StatsClientIgnoreErrors(addr, port=port, prefix=config['STATSD_PREFIX'])
except Exception:
logger.warning('failed to connect to statsd: %r', config['STATSD_ADDRESS'], exc_info=True)
# enable monitoring
self.prometheus_port = config['PROMETHEUS_PORT'] if config['PROMETHEUS_PORT'] > 0 else None

logging_url = config["DB_URL"].split('@')[-1] if '@' in config["DB_URL"] else config["DB_URL"]
logging.info(f'DB: {logging_url}')
Expand Down Expand Up @@ -506,7 +497,7 @@ def __init__(self):
)
self.refresh_service_task = None

kwargs = IceProdRestConfig(rest_config, statsd=statsd, database=self.db)
kwargs = IceProdRestConfig(rest_config, database=self.db)
kwargs['refresh_service'] = self.refresh_service
kwargs['rest_client'] = rest_client

Expand All @@ -522,6 +513,15 @@ def __init__(self):
self.server = server

async def start(self):
if self.prometheus_port:
logging.info("starting prometheus on {}", self.prometheus_port)
start_http_server(self.prometheus_port)
i = Info('iceprod', 'IceProd information')
i.info({
'version': version_string,
'type': 'credentials',
})

for collection in self.indexes:
existing = await self.db[collection].index_information()
for name in self.indexes[collection]:
Expand Down
30 changes: 15 additions & 15 deletions iceprod/materialization/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import time
import uuid

from prometheus_client import Info, start_http_server
import pymongo
import pymongo.errors
import motor.motor_asyncio
Expand All @@ -18,9 +19,9 @@
from tornado.web import RequestHandler as TornadoRequestHandler
from wipac_dev_tools import from_environment

from iceprod import __version__ as version_string
from iceprod.rest.auth import authorization, attr_auth
from iceprod.rest.base_handler import IceProdRestConfig, APIBase
from iceprod.server.module import FakeStatsClient, StatsClientIgnoreErrors
from iceprod.server.util import nowstr, datetime2str
from .service import MaterializationService

Expand Down Expand Up @@ -279,8 +280,7 @@ def __init__(self):
'DB_URL': 'mongodb://localhost/datasets',
'DB_TIMEOUT': 60,
'DB_WRITE_CONCERN': 1,
'STATSD_ADDRESS': '',
'STATSD_PREFIX': 'rest_api',
'PROMETHEUS_PORT': 0,
'CI_TESTING': '',
}
config = from_environment(default_config)
Expand All @@ -305,17 +305,8 @@ def __init__(self):
else:
raise RuntimeError('OPENID_URL not specified, and CI_TESTING not enabled!')

statsd = FakeStatsClient()
if config['STATSD_ADDRESS']:
try:
addr = config['STATSD_ADDRESS']
port = 8125
if ':' in addr:
addr,port = addr.split(':')
port = int(port)
statsd = StatsClientIgnoreErrors(addr, port=port, prefix=config['STATSD_PREFIX'])
except Exception:
logger.warning('failed to connect to statsd: %r', config['STATSD_ADDRESS'], exc_info=True)
# enable monitoring
self.prometheus_port = config['PROMETHEUS_PORT'] if config['PROMETHEUS_PORT'] > 0 else None

logging_url = config["DB_URL"].split('@')[-1] if '@' in config["DB_URL"] else config["DB_URL"]
logging.info(f'DB: {logging_url}')
Expand Down Expand Up @@ -351,7 +342,7 @@ def __init__(self):
self.materialization_service = MaterializationService(self.db, rest_client)
self.materialization_service_task = None

kwargs = IceProdRestConfig(rest_config, statsd=statsd, database=self.db)
kwargs = IceProdRestConfig(rest_config, database=self.db)
kwargs['materialization_service'] = self.materialization_service
kwargs['rest_client'] = rest_client

Expand All @@ -369,6 +360,15 @@ def __init__(self):
self.server = server

async def start(self):
if self.prometheus_port:
logging.info("starting prometheus on {}", self.prometheus_port)
start_http_server(self.prometheus_port)
i = Info('iceprod', 'IceProd information')
i.info({
'version': version_string,
'type': 'materialization',
})

for collection in self.indexes:
existing = await self.db[collection].index_information()
for name in self.indexes[collection]:
Expand Down
51 changes: 51 additions & 0 deletions iceprod/prom_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""
Some Prometheus utilities.
"""

import time

from prometheus_client import Histogram


class HistogramBuckets:
"""Prometheus histogram buckets"""

# DEFAULT = [.005, .01, .025, .05, .075, .1, .25, .5, .75, 1, 2.5, 5, 7.5, 10]

#: Database bucket centered around 5ms, with outliers up to 10s
DB = [.001, .002, .003, .004, .005, .006, .007, .008, .009, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10]

#: API bucket centered around 50ms, up to 10s
API = [.005, .01, .02, .03, .04, .05, .06, .07, .08, .09, .1, .25, .5, .75, 1, 2.5, 5, 7.5, 10]

#: Timer bucket up to 1 second
SECOND = [.0001, .0005, .001, .0025, .005, .0075, .01, .025, .05, .075, .1, .25, .5, .75, 1]

#: Timer bucket up to 10 seconds
TENSECOND = [.001, .0025, .005, .0075, .01, .025, .05, .075, .1, .25, .5, .75, 1, 2.5, 5, 10]

#: Timer bucket up to 1 minute
MINUTE = [.1, .5, 1, 2.5, 5, 7.5, 10, 15, 20, 25, 30, 45, 60]

#: Timer bucket up to 10 minutes
TENMINUTE = [1, 5, 10, 15, 20, 25, 30, 45, 60, 90, 120, 150, 180, 240, 300, 360, 420, 480, 540, 600]

#: Timer bucket up to 1 hour
HOUR = [10, 60, 120, 300, 600, 1200, 1800, 2400, 3000, 3600]


class PromRequestMixin:
PromHTTPHistogram = Histogram('http_request_duration_seconds', 'HTTP request duration in seconds', labelnames=('verb', 'path', 'status'), buckets=HistogramBuckets.API)

def prepare(self):
super().prepare()
self._prom_start_time = time.monotonic()

def on_finish(self):
super().on_finish()
end_time = time.monotonic()
self.PromHTTPHistogram.labels(
verb=self.request.method,
path=self.request.path,
status=self.get_status(),
).observe(end_time - self._prom_start_time)
25 changes: 7 additions & 18 deletions iceprod/rest/base_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,33 @@

from rest_tools.server import RestHandlerSetup, RestHandler

import iceprod
from iceprod import __version__ as version_string
from iceprod.prom_utils import PromRequestMixin
from .auth import AttrAuthMixin

logger = logging.getLogger('rest')


def IceProdRestConfig(config=None, statsd=None, database=None, auth_database=None, s3conn=None):
config['server_header'] = 'IceProd/' + iceprod.__version__
def IceProdRestConfig(config=None, database=None, auth_database=None, s3conn=None):
config['server_header'] = 'IceProd/' + version_string
ret = RestHandlerSetup(config)
ret['statsd'] = statsd
ret['database'] = database
ret['auth_database'] = auth_database
ret['s3'] = s3conn
return ret


class APIBase(AttrAuthMixin, RestHandler):
class APIBase(AttrAuthMixin, PromRequestMixin, RestHandler):
"""Default REST handler"""
def initialize(self, database=None, auth_database=None, statsd=None, s3=None, **kwargs):
def initialize(self, database=None, auth_database=None, s3=None, **kwargs):
super().initialize(**kwargs)
self.db = database
self.auth_db = auth_database
self.statsd = statsd
self.s3 = s3

def prepare(self):
super().prepare()
if self.statsd:
self.statsd.incr(f'prepare.{self.__class__.__name__}.{self.request.method}')

def on_finish(self):
super().on_finish()
if self.statsd:
self.statsd.incr(f'finish.{self.__class__.__name__}.{self.request.method}.{self.get_status()}')

def get_template_namespace(self):
namespace = super().get_template_namespace()
namespace['version'] = iceprod.__version__
namespace['version'] = version_string
return namespace

def get_current_user(self):
Expand Down
4 changes: 0 additions & 4 deletions iceprod/rest/handlers/pilots.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,6 @@ async def patch(self, pilot_id):
if not ret:
self.send_error(404, reason="Pilot not found")
else:
if 'site' in ret and ret['site']:
self.module.statsd.incr('site.{}.pilot'.format(ret['site']))
self.write(ret)
self.finish()

Expand All @@ -194,6 +192,4 @@ async def delete(self, pilot_id):
if not ret:
self.send_error(404, reason="Pilot not found")
else:
if 'site' in ret and ret['site']:
self.module.statsd.incr('site.{}.pilot_delete'.format(ret['site']))
self.write({})
25 changes: 1 addition & 24 deletions iceprod/rest/handlers/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,6 @@ async def post(self):
logger.info('filter_query: %r', filter_query)
self.send_error(404, reason="Task not found")
else:
self.statsd.incr('site.{}.task_queued'.format(site))
self.write(ret)
self.finish()

Expand Down Expand Up @@ -903,25 +902,6 @@ async def post(self, task_id):
if 'site' in data:
site = data['site']
update_query['$set']['site'] = site
if self.statsd and 'reason' in data and data['reason']:
reason = 'other'
reasons = [
('Exception: failed to download', 'download_failure'),
('Exception: failed to upload', 'upload_failure'),
('Exception: module failed', 'module_failure'),
('Resource overusage for cpu', 'cpu_overuse'),
('Resource overusage for gpu', 'gpu_overuse'),
('Resource overusage for memory', 'memory_overuse'),
('Resource overusage for disk', 'disk_overuse'),
('Resource overusage for time', 'time_overuse'),
('pilot SIGTERM', 'sigterm'),
('killed', 'killed'),
]
for text,r in reasons:
if text in data['reason']:
reason = r
break
self.statsd.incr('site.{}.task_{}.{}'.format(site, self.final_status, reason))

ret = await self.db.tasks.find_one_and_update(
filter_query,
Expand Down Expand Up @@ -985,11 +965,8 @@ async def post(self, task_id):

if 'time_used' in data:
update_query['$set']['walltime'] = data['time_used']/3600.
site = 'unknown'
if 'site' in data:
site = data['site']
update_query['$set']['site'] = site
self.statsd.incr('site.{}.task_complete'.format(site))
update_query['$set']['site'] = data['site']

ret = await self.db.tasks.find_one_and_update(
filter_query,
Expand Down
30 changes: 15 additions & 15 deletions iceprod/rest/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
import pkgutil

import motor.motor_asyncio
from prometheus_client import Info, start_http_server
from rest_tools.server import RestServer
from tornado.web import RequestHandler, HTTPError
from wipac_dev_tools import from_environment

from iceprod import __version__ as version_string
from ..s3 import boto3, S3
from ..server.module import FakeStatsClient, StatsClientIgnoreErrors
from .base_handler import IceProdRestConfig

logger = logging.getLogger('rest-server')
Expand All @@ -41,8 +42,7 @@ def __init__(self, s3_override=None):
'DB_URL': 'mongodb://localhost/iceprod',
'DB_TIMEOUT': 60,
'DB_WRITE_CONCERN': 1,
'STATSD_ADDRESS': '',
'STATSD_PREFIX': 'rest_api',
'PROMETHEUS_PORT': 0,
'S3_ADDRESS': '',
'S3_ACCESS_KEY': '',
'S3_SECRET_KEY': '',
Expand Down Expand Up @@ -79,17 +79,8 @@ def __init__(self, s3_override=None):
else:
raise RuntimeError('OPENID_URL not specified, and CI_TESTING not enabled!')

statsd = FakeStatsClient()
if config['STATSD_ADDRESS']:
try:
addr = config['STATSD_ADDRESS']
port = 8125
if ':' in addr:
addr,port = addr.split(':')
port = int(port)
statsd = StatsClientIgnoreErrors(addr, port=port, prefix=config['STATSD_PREFIX'])
except Exception:
logger.warning('failed to connect to statsd: %r', config['STATSD_ADDRESS'], exc_info=True)
# enable monitoring
self.prometheus_port = config['PROMETHEUS_PORT'] if config['PROMETHEUS_PORT'] > 0 else None

s3conn = None
if s3_override:
Expand All @@ -111,7 +102,7 @@ def __init__(self, s3_override=None):
logging.info(f'DB name: {db_name}')
self.indexes = defaultdict(partial(defaultdict, dict))

kwargs = IceProdRestConfig(rest_config, statsd=statsd, database=self.db, s3conn=s3conn)
kwargs = IceProdRestConfig(rest_config, database=self.db, s3conn=s3conn)

server = RestServer(debug=config['DEBUG'], max_body_size=config['MAX_BODY_SIZE'])

Expand All @@ -136,6 +127,15 @@ def __init__(self, s3_override=None):
self.server = server

async def start(self):
if self.prometheus_port:
logging.info("starting prometheus on {}", self.prometheus_port)
start_http_server(self.prometheus_port)
i = Info('iceprod', 'IceProd information')
i.info({
'version': version_string,
'type': 'api',
})

for database in self.indexes:
db = self.db[database]
for collection in self.indexes[database]:
Expand Down
Loading

0 comments on commit 6e51584

Please sign in to comment.