Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
build
pytest
pytest-mock
nanoid
omegaconf
ipython
Expand Down
6 changes: 5 additions & 1 deletion swanlab/core_python/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,8 @@
2. http 客户端代码
"""

from .client import *
# FIXME 存在循环引用,我们需要更优雅的代码结构
# from . import auth
# from . import uploader
from .client import Client, create_client, reset_client, get_client, create_session
from .utils import timer
98 changes: 98 additions & 0 deletions swanlab/core_python/utils/timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""
@author: cunyue
@file: timer.py
@time: 2025/12/30 22:04
@description: SwanLab 封装的定时器工具,用于定时、循环执行若干函数。

采用 Daemon 线程 + Event 机制,支持手动 cancel 和 join,保证任务完整性。
"""

import threading
from typing import Callable, Union, Optional

from swanlab.log import swanlog


class Timer:
def __init__(self, task: Callable, *, interval: Union[int, float, Callable[[int], float]], immediate: bool = False):
"""
初始化定时器

:param task: 需要定时执行的任务函数
:param interval: 执行间隔(秒)。可以是固定的数字,也可以是返回数字的函数(用于动态间隔),输入为task调用、执行次数
:param immediate 是否立即执行,默认否
"""
self._task = task
self._interval = interval
self._stop_event = threading.Event()
self._thread: Optional[threading.Thread] = None
self._immediate = immediate
# 标记调用次数
self._count = 0

def run(self) -> "Timer":
"""
启动定时器
"""
if self._thread is not None and self._thread.is_alive():
return swanlog.warning("Timer already running")

# 重置停止信号,允许重启
self._stop_event.clear()

# daemon=True 防止忘记 cancel 导致进程挂死
# 如果用户需要保证数据不丢失,应手动调用 cancel() + join()
self._thread = threading.Thread(target=self._loop, daemon=True)
self._thread.start()
return self

def cancel(self):
"""
发出停止信号。
注意:这不会强制中断正在执行的任务,而是等待当前任务执行完毕后不再进行下一次循环。
"""
self._stop_event.set()

def join(self, timeout=None):
"""
等待定时器线程结束。
通常在调用 cancel() 后调用此方法,以确保最后一次任务完整执行。
"""
if self._thread is not None and self._thread.is_alive():
self._thread.join(timeout)

def _loop(self):
"""
线程主循环
"""
# 第一次立即执行
if self._immediate and not self._stop_event.is_set():
self._safe_execute()

while not self._stop_event.is_set():
# wait 既起到 sleep 的作用,又能响应 set() 事件
# 如果在 sleep 期间调用了 cancel(),这里会立即唤醒并返回 True,从而跳出循环
self._stop_event.wait(self._sleep_time)

if not self._stop_event.is_set():
self._safe_execute()

def _safe_execute(self):
"""
执行任务,并使用锁保护
"""
try:
self._task()
except Exception as e:
swanlog.error(f"Error executing task: {e}")
finally:
self._count += 1

@property
def _sleep_time(self) -> float:
"""
解析间隔时间,支持动态策略
"""
if callable(self._interval):
return self._interval(self._count)
return float(self._interval)
61 changes: 9 additions & 52 deletions swanlab/data/run/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@
@Description:
回调函数操作员,批量处理回调函数的调用
"""
import threading
from enum import Enum
from typing import List, Union, Dict, Any, Tuple, Callable, Optional
from typing import List, Union, Dict, Any, Tuple

from swanlab.data.run.webhook import try_send_webhook
from swanlab.log import swanlog
from swanlab.swanlab_settings import get_settings
from swanlab.toolkit import SwanKitCallback, MetricInfo, ColumnInfo, RuntimeInfo

OperatorReturnType = Dict[str, Any]
Expand Down Expand Up @@ -135,54 +132,14 @@ class SwanLabRunState(Enum):
RUNNING = 0


class MonitorCron:
def monitor_interval(count: int) -> float:
"""
用于定时采集系统信息
根据传入的参数(本质是调用次数)选择间隔
:param count: (调用)次数
"""

def __init__(self, monitor_func: Callable):
self.count = 0 # 计数器,执行次数
self.monitor_interval = get_settings().hardware_interval # 用户设置的采集间隔

def _():
monitor_func()
self.count += 1
self.timer = threading.Timer(self.sleep_time, _)
self.timer.daemon = True
self.timer.start()

# 立即执行
self.timer = threading.Timer(0, _)
self.timer.daemon = True
self.timer.start()

def cancel(self):
if self.timer is not None:
self.timer.cancel()
self.timer.join()

@property
def sleep_time(self):
if self.monitor_interval is not None:
return self.monitor_interval
# 采集10次以下,每次间隔10秒
# 采集10次到50次,每次间隔30秒
# 采集50次以上,每次间隔60秒
if self.count < 10:
return 10
elif self.count < 50:
return 30
else:
return 60


def check_log_level(log_level: Optional[str]) -> str:
"""检查日志等级是否合法"""
valid = ["debug", "info", "warning", "error", "critical"]
if log_level is None:
return "info"
elif log_level.lower() in valid:
return log_level.lower()
if count < 10:
return 10
elif count < 50:
return 30
else:
swanlog.warning(f"The log level you provided is not valid, it has been set to {log_level}.")
return "info"
return 60
12 changes: 7 additions & 5 deletions swanlab/data/run/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import os
from typing import Any, Dict, Optional, List, Tuple

from swanlab.core_python import timer
from swanlab.data.modules import DataWrapper, FloatConvertible, Line, Echarts, PyEchartsBase, PyEchartsTable
from swanlab.env import get_mode
from swanlab.formatter import check_key_format
Expand All @@ -18,7 +19,7 @@
from swanlab.toolkit import MediaType
from .config import SwanLabConfig
from .exp import SwanLabExp
from .helper import SwanLabRunOperator, RuntimeInfo, SwanLabRunState, MonitorCron
from .helper import SwanLabRunOperator, RuntimeInfo, SwanLabRunState, monitor_interval
from .metadata import get_requirements, get_conda, HardwareCollector
from .public import SwanLabPublicConfig
from ..store import get_run_store, reset_run_store
Expand Down Expand Up @@ -59,7 +60,7 @@ def __init__(
# 0. 下面的参数会在实验结束后进行副作用清理
self.__operator = operator
self.__state = SwanLabRunState.RUNNING
self.__monitor_cron: Optional[MonitorCron] = None
self.__monitor_timer: Optional[timer.Timer] = None
self.__config: Optional[SwanLabConfig] = None
# 1. 设置常规参数
self.__mode = get_mode()
Expand Down Expand Up @@ -122,15 +123,16 @@ def monitor_func():
section_type="SYSTEM",
)

self.__monitor_cron = MonitorCron(monitor_func)
self.__monitor_timer = timer.Timer(monitor_func, interval=monitor_interval, immediate=True).run()

def __cleanup(self, error: str = None, interrupt: bool = False):
"""
停止部分功能,内部清理时调用
"""
# 1. 停止硬件监控
if self.__monitor_cron is not None:
self.__monitor_cron.cancel()
if self.__monitor_timer is not None:
self.__monitor_timer.cancel()
self.__monitor_timer.join()
# 2. 更新状态
self.__state = SwanLabRunState.SUCCESS if error is None else SwanLabRunState.CRASHED
# 3. 触发回调
Expand Down
Loading