Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add prometheus metrics #418

Merged
merged 40 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
4d5672e
add prometheus metrics to server
dsschult Jan 24, 2025
5bb2abd
<bot> update requirements-docs.txt
invalid-email-address Jan 24, 2025
167308f
<bot> update requirements-tests.txt
invalid-email-address Jan 24, 2025
c5e5863
<bot> update requirements.txt
invalid-email-address Jan 24, 2025
95ee252
fix tests
dsschult Jan 24, 2025
f41310d
fix async vs non
dsschult Jan 24, 2025
badfccc
<bot> update requirements-docs.txt
invalid-email-address Jan 24, 2025
354977f
<bot> update requirements-tests.txt
invalid-email-address Jan 24, 2025
5026ec5
<bot> update requirements.txt
invalid-email-address Jan 24, 2025
a9335d4
fix async generator
dsschult Jan 24, 2025
912102c
make condor history a generator to attempt to increase speed
dsschult Jan 27, 2025
b347d5d
add async task groups to better parallelize calls
dsschult Jan 27, 2025
b075053
<bot> update requirements-docs.txt
invalid-email-address Jan 27, 2025
f364691
<bot> update requirements-tests.txt
invalid-email-address Jan 27, 2025
c613b49
<bot> update requirements.txt
invalid-email-address Jan 27, 2025
31a1f2a
better histogram buckets
dsschult Jan 28, 2025
31aa891
<bot> update requirements-docs.txt
invalid-email-address Jan 28, 2025
024ba13
<bot> update requirements-tests.txt
invalid-email-address Jan 28, 2025
168caae
<bot> update requirements.txt
invalid-email-address Jan 28, 2025
e11a67d
fix flake8
dsschult Jan 28, 2025
6ca9550
fix negative times and bad bucket sizes
dsschult Jan 28, 2025
cb5d481
add prometheus to rest server
dsschult Jan 28, 2025
1eb000c
fix flake8
dsschult Jan 28, 2025
435fe83
expand to materialization and credentials servers
dsschult Jan 28, 2025
5a68bc1
remove another statsd reference
dsschult Jan 28, 2025
830188e
fix label format
dsschult Jan 28, 2025
260981d
add prometheus to website
dsschult Jan 28, 2025
7ee82bd
put prom api metric into a mixin
dsschult Jan 28, 2025
b29359e
convert dataset monitor to prometheus
dsschult Jan 28, 2025
1e0c9bd
remove statsd from pip install
dsschult Jan 28, 2025
2d19633
fix metric names
dsschult Jan 28, 2025
1142a47
<bot> update setup.cfg
invalid-email-address Jan 28, 2025
b002f20
<bot> update requirements-docs.txt
invalid-email-address Jan 28, 2025
415289b
<bot> update requirements-tests.txt
invalid-email-address Jan 28, 2025
d313e51
<bot> update requirements.txt
invalid-email-address Jan 28, 2025
bc49a37
still not ready for python 3.13, because of htcondor
dsschult Jan 28, 2025
b822122
<bot> update setup.cfg
invalid-email-address Jan 28, 2025
49142ba
try harder for labels
dsschult Jan 28, 2025
b5b85b9
drop two old tests
dsschult Jan 28, 2025
8632ee6
reset on gridftp issues
dsschult Jan 28, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions iceprod/credentials/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import time

from prometheus_client import Info, start_http_server
import pymongo
import pymongo.errors
import motor.motor_asyncio
Expand All @@ -16,9 +17,9 @@
from tornado.web import RequestHandler as TornadoRequestHandler
from wipac_dev_tools import from_environment

from iceprod import __version__ as version_string
from iceprod.rest.auth import authorization
from iceprod.rest.base_handler import IceProdRestConfig, APIBase
from iceprod.server.module import FakeStatsClient, StatsClientIgnoreErrors
from iceprod.server.util import nowstr, datetime2str
from .service import RefreshService, get_expiration, is_expired

Expand Down Expand Up @@ -427,8 +428,7 @@ def __init__(self):
'DB_URL': 'mongodb://localhost/creds',
'DB_TIMEOUT': 60,
'DB_WRITE_CONCERN': 1,
'STATSD_ADDRESS': '',
'STATSD_PREFIX': 'credentials',
'PROMETHEUS_PORT': 0,
'CI_TESTING': '',
}
config = from_environment(default_config)
Expand All @@ -453,17 +453,8 @@ def __init__(self):
else:
raise RuntimeError('OPENID_URL not specified, and CI_TESTING not enabled!')

statsd = FakeStatsClient()
if config['STATSD_ADDRESS']:
try:
addr = config['STATSD_ADDRESS']
port = 8125
if ':' in addr:
addr,port = addr.split(':')
port = int(port)
statsd = StatsClientIgnoreErrors(addr, port=port, prefix=config['STATSD_PREFIX'])
except Exception:
logger.warning('failed to connect to statsd: %r', config['STATSD_ADDRESS'], exc_info=True)
# enable monitoring
self.prometheus_port = config['PROMETHEUS_PORT'] if config['PROMETHEUS_PORT'] > 0 else None

logging_url = config["DB_URL"].split('@')[-1] if '@' in config["DB_URL"] else config["DB_URL"]
logging.info(f'DB: {logging_url}')
Expand Down Expand Up @@ -506,7 +497,7 @@ def __init__(self):
)
self.refresh_service_task = None

kwargs = IceProdRestConfig(rest_config, statsd=statsd, database=self.db)
kwargs = IceProdRestConfig(rest_config, database=self.db)
kwargs['refresh_service'] = self.refresh_service
kwargs['rest_client'] = rest_client

Expand All @@ -522,6 +513,15 @@ def __init__(self):
self.server = server

async def start(self):
if self.prometheus_port:
logging.info("starting prometheus on {}", self.prometheus_port)
start_http_server(self.prometheus_port)
i = Info('iceprod', 'IceProd information')
i.info({
'version': version_string,
'type': 'credentials',
})

for collection in self.indexes:
existing = await self.db[collection].index_information()
for name in self.indexes[collection]:
Expand Down
30 changes: 15 additions & 15 deletions iceprod/materialization/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import time
import uuid

from prometheus_client import Info, start_http_server
import pymongo
import pymongo.errors
import motor.motor_asyncio
Expand All @@ -18,9 +19,9 @@
from tornado.web import RequestHandler as TornadoRequestHandler
from wipac_dev_tools import from_environment

from iceprod import __version__ as version_string
from iceprod.rest.auth import authorization, attr_auth
from iceprod.rest.base_handler import IceProdRestConfig, APIBase
from iceprod.server.module import FakeStatsClient, StatsClientIgnoreErrors
from iceprod.server.util import nowstr, datetime2str
from .service import MaterializationService

Expand Down Expand Up @@ -279,8 +280,7 @@ def __init__(self):
'DB_URL': 'mongodb://localhost/datasets',
'DB_TIMEOUT': 60,
'DB_WRITE_CONCERN': 1,
'STATSD_ADDRESS': '',
'STATSD_PREFIX': 'rest_api',
'PROMETHEUS_PORT': 0,
'CI_TESTING': '',
}
config = from_environment(default_config)
Expand All @@ -305,17 +305,8 @@ def __init__(self):
else:
raise RuntimeError('OPENID_URL not specified, and CI_TESTING not enabled!')

statsd = FakeStatsClient()
if config['STATSD_ADDRESS']:
try:
addr = config['STATSD_ADDRESS']
port = 8125
if ':' in addr:
addr,port = addr.split(':')
port = int(port)
statsd = StatsClientIgnoreErrors(addr, port=port, prefix=config['STATSD_PREFIX'])
except Exception:
logger.warning('failed to connect to statsd: %r', config['STATSD_ADDRESS'], exc_info=True)
# enable monitoring
self.prometheus_port = config['PROMETHEUS_PORT'] if config['PROMETHEUS_PORT'] > 0 else None

logging_url = config["DB_URL"].split('@')[-1] if '@' in config["DB_URL"] else config["DB_URL"]
logging.info(f'DB: {logging_url}')
Expand Down Expand Up @@ -351,7 +342,7 @@ def __init__(self):
self.materialization_service = MaterializationService(self.db, rest_client)
self.materialization_service_task = None

kwargs = IceProdRestConfig(rest_config, statsd=statsd, database=self.db)
kwargs = IceProdRestConfig(rest_config, database=self.db)
kwargs['materialization_service'] = self.materialization_service
kwargs['rest_client'] = rest_client

Expand All @@ -369,6 +360,15 @@ def __init__(self):
self.server = server

async def start(self):
if self.prometheus_port:
logging.info("starting prometheus on {}", self.prometheus_port)
start_http_server(self.prometheus_port)
i = Info('iceprod', 'IceProd information')
i.info({
'version': version_string,
'type': 'materialization',
})

for collection in self.indexes:
existing = await self.db[collection].index_information()
for name in self.indexes[collection]:
Expand Down
51 changes: 51 additions & 0 deletions iceprod/prom_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""
Some Prometheus utilities.
"""

import time

from prometheus_client import Histogram


class HistogramBuckets:
"""Prometheus histogram buckets"""

# DEFAULT = [.005, .01, .025, .05, .075, .1, .25, .5, .75, 1, 2.5, 5, 7.5, 10]

#: Database bucket centered around 5ms, with outliers up to 10s
DB = [.001, .002, .003, .004, .005, .006, .007, .008, .009, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10]

#: API bucket centered around 50ms, up to 10s
API = [.005, .01, .02, .03, .04, .05, .06, .07, .08, .09, .1, .25, .5, .75, 1, 2.5, 5, 7.5, 10]

#: Timer bucket up to 1 second
SECOND = [.0001, .0005, .001, .0025, .005, .0075, .01, .025, .05, .075, .1, .25, .5, .75, 1]

#: Timer bucket up to 10 seconds
TENSECOND = [.001, .0025, .005, .0075, .01, .025, .05, .075, .1, .25, .5, .75, 1, 2.5, 5, 10]

#: Timer bucket up to 1 minute
MINUTE = [.1, .5, 1, 2.5, 5, 7.5, 10, 15, 20, 25, 30, 45, 60]

#: Timer bucket up to 10 minutes
TENMINUTE = [1, 5, 10, 15, 20, 25, 30, 45, 60, 90, 120, 150, 180, 240, 300, 360, 420, 480, 540, 600]

#: Timer bucket up to 1 hour
HOUR = [10, 60, 120, 300, 600, 1200, 1800, 2400, 3000, 3600]


class PromRequestMixin:
PromHTTPHistogram = Histogram('http_request_duration_seconds', 'HTTP request duration in seconds', labelnames=('verb', 'path', 'status'), buckets=HistogramBuckets.API)

def prepare(self):
super().prepare()
self._prom_start_time = time.monotonic()

def on_finish(self):
super().on_finish()
end_time = time.monotonic()
self.PromHTTPHistogram.labels(
verb=self.request.method,
path=self.request.path,
status=self.get_status(),
).observe(end_time - self._prom_start_time)
25 changes: 7 additions & 18 deletions iceprod/rest/base_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,33 @@

from rest_tools.server import RestHandlerSetup, RestHandler

import iceprod
from iceprod import __version__ as version_string
from iceprod.prom_utils import PromRequestMixin
from .auth import AttrAuthMixin

logger = logging.getLogger('rest')


def IceProdRestConfig(config=None, statsd=None, database=None, auth_database=None, s3conn=None):
config['server_header'] = 'IceProd/' + iceprod.__version__
def IceProdRestConfig(config=None, database=None, auth_database=None, s3conn=None):
config['server_header'] = 'IceProd/' + version_string
ret = RestHandlerSetup(config)
ret['statsd'] = statsd
ret['database'] = database
ret['auth_database'] = auth_database
ret['s3'] = s3conn
return ret


class APIBase(AttrAuthMixin, RestHandler):
class APIBase(AttrAuthMixin, PromRequestMixin, RestHandler):
"""Default REST handler"""
def initialize(self, database=None, auth_database=None, statsd=None, s3=None, **kwargs):
def initialize(self, database=None, auth_database=None, s3=None, **kwargs):
super().initialize(**kwargs)
self.db = database
self.auth_db = auth_database
self.statsd = statsd
self.s3 = s3

def prepare(self):
super().prepare()
if self.statsd:
self.statsd.incr(f'prepare.{self.__class__.__name__}.{self.request.method}')

def on_finish(self):
super().on_finish()
if self.statsd:
self.statsd.incr(f'finish.{self.__class__.__name__}.{self.request.method}.{self.get_status()}')

def get_template_namespace(self):
namespace = super().get_template_namespace()
namespace['version'] = iceprod.__version__
namespace['version'] = version_string
return namespace

def get_current_user(self):
Expand Down
4 changes: 0 additions & 4 deletions iceprod/rest/handlers/pilots.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,6 @@ async def patch(self, pilot_id):
if not ret:
self.send_error(404, reason="Pilot not found")
else:
if 'site' in ret and ret['site']:
self.module.statsd.incr('site.{}.pilot'.format(ret['site']))
self.write(ret)
self.finish()

Expand All @@ -194,6 +192,4 @@ async def delete(self, pilot_id):
if not ret:
self.send_error(404, reason="Pilot not found")
else:
if 'site' in ret and ret['site']:
self.module.statsd.incr('site.{}.pilot_delete'.format(ret['site']))
self.write({})
25 changes: 1 addition & 24 deletions iceprod/rest/handlers/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,6 @@ async def post(self):
logger.info('filter_query: %r', filter_query)
self.send_error(404, reason="Task not found")
else:
self.statsd.incr('site.{}.task_queued'.format(site))
self.write(ret)
self.finish()

Expand Down Expand Up @@ -903,25 +902,6 @@ async def post(self, task_id):
if 'site' in data:
site = data['site']
update_query['$set']['site'] = site
if self.statsd and 'reason' in data and data['reason']:
reason = 'other'
reasons = [
('Exception: failed to download', 'download_failure'),
('Exception: failed to upload', 'upload_failure'),
('Exception: module failed', 'module_failure'),
('Resource overusage for cpu', 'cpu_overuse'),
('Resource overusage for gpu', 'gpu_overuse'),
('Resource overusage for memory', 'memory_overuse'),
('Resource overusage for disk', 'disk_overuse'),
('Resource overusage for time', 'time_overuse'),
('pilot SIGTERM', 'sigterm'),
('killed', 'killed'),
]
for text,r in reasons:
if text in data['reason']:
reason = r
break
self.statsd.incr('site.{}.task_{}.{}'.format(site, self.final_status, reason))

ret = await self.db.tasks.find_one_and_update(
filter_query,
Expand Down Expand Up @@ -985,11 +965,8 @@ async def post(self, task_id):

if 'time_used' in data:
update_query['$set']['walltime'] = data['time_used']/3600.
site = 'unknown'
if 'site' in data:
site = data['site']
update_query['$set']['site'] = site
self.statsd.incr('site.{}.task_complete'.format(site))
update_query['$set']['site'] = data['site']

ret = await self.db.tasks.find_one_and_update(
filter_query,
Expand Down
30 changes: 15 additions & 15 deletions iceprod/rest/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
import pkgutil

import motor.motor_asyncio
from prometheus_client import Info, start_http_server
from rest_tools.server import RestServer
from tornado.web import RequestHandler, HTTPError
from wipac_dev_tools import from_environment

from iceprod import __version__ as version_string
from ..s3 import boto3, S3
from ..server.module import FakeStatsClient, StatsClientIgnoreErrors
from .base_handler import IceProdRestConfig

logger = logging.getLogger('rest-server')
Expand All @@ -41,8 +42,7 @@ def __init__(self, s3_override=None):
'DB_URL': 'mongodb://localhost/iceprod',
'DB_TIMEOUT': 60,
'DB_WRITE_CONCERN': 1,
'STATSD_ADDRESS': '',
'STATSD_PREFIX': 'rest_api',
'PROMETHEUS_PORT': 0,
'S3_ADDRESS': '',
'S3_ACCESS_KEY': '',
'S3_SECRET_KEY': '',
Expand Down Expand Up @@ -79,17 +79,8 @@ def __init__(self, s3_override=None):
else:
raise RuntimeError('OPENID_URL not specified, and CI_TESTING not enabled!')

statsd = FakeStatsClient()
if config['STATSD_ADDRESS']:
try:
addr = config['STATSD_ADDRESS']
port = 8125
if ':' in addr:
addr,port = addr.split(':')
port = int(port)
statsd = StatsClientIgnoreErrors(addr, port=port, prefix=config['STATSD_PREFIX'])
except Exception:
logger.warning('failed to connect to statsd: %r', config['STATSD_ADDRESS'], exc_info=True)
# enable monitoring
self.prometheus_port = config['PROMETHEUS_PORT'] if config['PROMETHEUS_PORT'] > 0 else None

s3conn = None
if s3_override:
Expand All @@ -111,7 +102,7 @@ def __init__(self, s3_override=None):
logging.info(f'DB name: {db_name}')
self.indexes = defaultdict(partial(defaultdict, dict))

kwargs = IceProdRestConfig(rest_config, statsd=statsd, database=self.db, s3conn=s3conn)
kwargs = IceProdRestConfig(rest_config, database=self.db, s3conn=s3conn)

server = RestServer(debug=config['DEBUG'], max_body_size=config['MAX_BODY_SIZE'])

Expand All @@ -136,6 +127,15 @@ def __init__(self, s3_override=None):
self.server = server

async def start(self):
if self.prometheus_port:
logging.info("starting prometheus on {}", self.prometheus_port)
start_http_server(self.prometheus_port)
i = Info('iceprod', 'IceProd information')
i.info({
'version': version_string,
'type': 'api',
})

for database in self.indexes:
db = self.db[database]
for collection in self.indexes[database]:
Expand Down
Loading