Skip to content

Commit

Permalink
Merge pull request #548 from mdekstrand/feature/measure-run
Browse files Browse the repository at this point in the history
Add resource measurement for “tasks”
  • Loading branch information
mdekstrand authored Dec 13, 2024
2 parents 19f03d5 + a3bd64d commit 114b5da
Show file tree
Hide file tree
Showing 7 changed files with 440 additions and 23 deletions.
15 changes: 14 additions & 1 deletion conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import os
import warnings

import structlog
import torch
from numpy.random import Generator, default_rng

Expand All @@ -20,12 +21,24 @@

logging.getLogger("numba").setLevel(logging.INFO)

_log = logging.getLogger("lenskit.tests")
_log = structlog.stdlib.get_logger("lenskit.tests")
RNG_SEED = 42
if "LK_TEST_FREE_RNG" in os.environ:
warnings.warn("using nondeterministic RNG initialization")
RNG_SEED = None

structlog.configure(
[
structlog.stdlib.filter_by_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.MaybeTimeStamper(fmt="iso"),
structlog.processors.KeyValueRenderer(key_order=["timestamp", "event"]),
],
wrapper_class=structlog.stdlib.BoundLogger,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)


@fixture
def rng() -> Generator:
Expand Down
15 changes: 14 additions & 1 deletion lenskit/lenskit/logging/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import structlog
import zmq

from .tasks import Task

SIGNAL_ADDR = "inproc://lenskit-monitor-signal"

_log = structlog.stdlib.get_logger(__name__)
Expand Down Expand Up @@ -167,7 +169,10 @@ def _pump_message(self):
self._handle_signal()

if self.log_sock in ready:
self._handle_log_message()
try:
self._handle_log_message()
except Exception as e:
_log.error("error handling message: %s", e)

if self.state == MonitorState.DRAINING and not ready:
self.state = MonitorState.SHUTDOWN
Expand Down Expand Up @@ -211,5 +216,13 @@ def _handle_log_message(self):
data = json.loads(data)
method = getattr(logger, data["method"])
method(**data["event"])
elif engine == "lenskit.logging.tasks":
task = Task.model_validate_json(data)
_log.debug("received subtask", task_id=str(task.task_id))
current = Task.current()
if current:
current.add_subtask(task)
else:
_log.debug("no active task for subtask reporting")
else:
_log.error("invalid log backend")
86 changes: 86 additions & 0 deletions lenskit/lenskit/logging/resource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""
Measure resource consumption.
"""

# pyright: strict
from __future__ import annotations

import os
import time
from dataclasses import dataclass
from pathlib import Path

import structlog
import torch

_log = structlog.get_logger(__name__)


@dataclass
class ResourceMeasurement:
"""
Single measurement of resources. Two measurements can be subtracted to
compute the time resources consumed in an interval (memory resources are
left unchanged).
"""

wall_time: float
perf_time: float
user_time: float
system_time: float

max_rss: int | None = None
max_gpu: int | None = None

@classmethod
def current(cls):
"""
Get the current resource measurements.
"""
wall = time.time()
perf = time.perf_counter()
rss = None
gpu = None
try:
import resource

ru = resource.getrusage(resource.RUSAGE_SELF)
user = ru.ru_utime
system = ru.ru_stime
rss = ru.ru_maxrss
except ImportError:
ts = os.times()
user = ts.user
system = ts.system

if torch.cuda.is_available():
gpu = torch.cuda.max_memory_allocated()

return cls(wall, perf, user, system, rss, gpu)

@property
def cpu_time(self) -> float:
"Total CPU time (user + system)."
return self.user_time + self.system_time

def __sub__(self, other: ResourceMeasurement):
return ResourceMeasurement(
self.wall_time - other.wall_time,
self.perf_time - other.perf_time,
self.user_time - other.user_time,
self.system_time - other.system_time,
self.max_rss,
)


def reset_linux_hwm():
pid = os.getpid()
reset_file = Path(f"/proc/{pid}/clear_refs")
if reset_file.exists():
try:
reset_file.write_text("5")
except IOError:
_log.warn("cannot clear refs", pid=pid)

if torch.cuda.is_available():
torch.cuda.reset_peak_memory_stats()
Loading

0 comments on commit 114b5da

Please sign in to comment.