Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
build
pytest
pytest-mock
nanoid
omegaconf
ipython
Expand Down
6 changes: 5 additions & 1 deletion swanlab/core_python/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,8 @@
2. http 客户端代码
"""

from .client import *
# FIXME 存在循环引用,我们需要更优雅的代码结构
# from . import auth
# from . import uploader
from .client import Client, create_client, reset_client, get_client, create_session
from .utils import timer
102 changes: 102 additions & 0 deletions swanlab/core_python/utils/timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""
@author: cunyue
@file: timer.py
@time: 2025/12/30 22:04
@description: SwanLab 封装的定时器工具,用于定时、循环执行若干函数。

采用 Daemon 线程 + Event 机制,支持手动 cancel 和 join,保证任务完整性。
"""

import threading
from typing import Callable, Union, Optional

from swanlab.log import swanlog


class Timer:
def __init__(self, task: Callable, *, interval: Union[int, float, Callable[[int], float]], immediate: bool = False):
"""
初始化定时器

:param task: 需要定时执行的任务函数
:param interval: 执行间隔(秒)。可以是固定的数字,也可以是返回数字的函数(用于动态间隔),输入为task调用、执行次数
:param immediate 是否立即执行,默认否
"""
self._task = task
self._interval = interval
self._stop_event = threading.Event()
self._thread: Optional[threading.Thread] = None
self._immediate = immediate

# 互斥锁,用于标记“任务正在运行中”
self._run_lock = threading.Lock()
# 标记调用次数
self._count = 0

def run(self) -> "Timer":
"""
启动定时器
"""
if self._thread is not None and self._thread.is_alive():
return swanlog.warning("Timer already running")

# 重置停止信号,允许重启
self._stop_event.clear()

# daemon=True 防止忘记 cancel 导致进程挂死
# 如果用户需要保证数据不丢失,应手动调用 cancel() + join()
self._thread = threading.Thread(target=self._loop, daemon=True)
self._thread.start()
return self

def cancel(self):
"""
发出停止信号。
注意:这不会强制中断正在执行的任务,而是等待当前任务执行完毕后不再进行下一次循环。
"""
self._stop_event.set()

def join(self, timeout=None):
"""
等待定时器线程结束。
通常在调用 cancel() 后调用此方法,以确保最后一次任务完整执行。
"""
if self._thread is not None and self._thread.is_alive():
self._thread.join(timeout)

def _loop(self):
"""
线程主循环
"""
# 第一次立即执行
if self._immediate and not self._stop_event.is_set():
self._safe_execute()

while not self._stop_event.is_set():
# wait 既起到 sleep 的作用,又能响应 set() 事件
# 如果在 sleep 期间调用了 cancel(),这里会立即唤醒并返回 True,从而跳出循环
self._stop_event.wait(self._sleep_time)

if not self._stop_event.is_set():
self._safe_execute()

def _safe_execute(self):
"""
执行任务,并使用锁保护
"""
with self._run_lock:
try:
self._task()
except Exception as e:
swanlog.error(f"Error executing task: {e}")
finally:
self._count += 1

@property
def _sleep_time(self) -> float:
"""
解析间隔时间,支持动态策略
"""
if callable(self._interval):
return self._interval(self._count)
return float(self._interval)
61 changes: 9 additions & 52 deletions swanlab/data/run/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@
@Description:
回调函数操作员,批量处理回调函数的调用
"""
import threading
from enum import Enum
from typing import List, Union, Dict, Any, Tuple, Callable, Optional
from typing import List, Union, Dict, Any, Tuple

from swanlab.data.run.webhook import try_send_webhook
from swanlab.log import swanlog
from swanlab.swanlab_settings import get_settings
from swanlab.toolkit import SwanKitCallback, MetricInfo, ColumnInfo, RuntimeInfo

OperatorReturnType = Dict[str, Any]
Expand Down Expand Up @@ -135,54 +132,14 @@ class SwanLabRunState(Enum):
RUNNING = 0


class MonitorCron:
def monitor_interval(count: int) -> float:
"""
用于定时采集系统信息
根据传入的参数(本质是调用次数)选择间隔
:param count: (调用)次数
"""

def __init__(self, monitor_func: Callable):
self.count = 0 # 计数器,执行次数
self.monitor_interval = get_settings().hardware_interval # 用户设置的采集间隔

def _():
monitor_func()
self.count += 1
self.timer = threading.Timer(self.sleep_time, _)
self.timer.daemon = True
self.timer.start()

# 立即执行
self.timer = threading.Timer(0, _)
self.timer.daemon = True
self.timer.start()

def cancel(self):
if self.timer is not None:
self.timer.cancel()
self.timer.join()

@property
def sleep_time(self):
if self.monitor_interval is not None:
return self.monitor_interval
# 采集10次以下,每次间隔10秒
# 采集10次到50次,每次间隔30秒
# 采集50次以上,每次间隔60秒
if self.count < 10:
return 10
elif self.count < 50:
return 30
else:
return 60


def check_log_level(log_level: Optional[str]) -> str:
"""检查日志等级是否合法"""
valid = ["debug", "info", "warning", "error", "critical"]
if log_level is None:
return "info"
elif log_level.lower() in valid:
return log_level.lower()
if count < 10:
return 10
elif count < 50:
return 30
else:
swanlog.warning(f"The log level you provided is not valid, it has been set to {log_level}.")
return "info"
return 60
12 changes: 7 additions & 5 deletions swanlab/data/run/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import os
from typing import Any, Dict, Optional, List, Tuple

from swanlab.core_python import timer
from swanlab.data.modules import DataWrapper, FloatConvertible, Line, Echarts, PyEchartsBase, PyEchartsTable
from swanlab.env import get_mode
from swanlab.formatter import check_key_format
Expand All @@ -18,7 +19,7 @@
from swanlab.toolkit import MediaType
from .config import SwanLabConfig
from .exp import SwanLabExp
from .helper import SwanLabRunOperator, RuntimeInfo, SwanLabRunState, MonitorCron
from .helper import SwanLabRunOperator, RuntimeInfo, SwanLabRunState, monitor_interval
from .metadata import get_requirements, get_conda, HardwareCollector
from .public import SwanLabPublicConfig
from ..store import get_run_store, reset_run_store
Expand Down Expand Up @@ -59,7 +60,7 @@ def __init__(
# 0. 下面的参数会在实验结束后进行副作用清理
self.__operator = operator
self.__state = SwanLabRunState.RUNNING
self.__monitor_cron: Optional[MonitorCron] = None
self.__monitor_timer: Optional[timer.Timer] = None
self.__config: Optional[SwanLabConfig] = None
# 1. 设置常规参数
self.__mode = get_mode()
Expand Down Expand Up @@ -122,15 +123,16 @@ def monitor_func():
section_type="SYSTEM",
)

self.__monitor_cron = MonitorCron(monitor_func)
self.__monitor_timer = timer.Timer(monitor_func, interval=monitor_interval, immediate=True).run()

def __cleanup(self, error: str = None, interrupt: bool = False):
"""
停止部分功能,内部清理时调用
"""
# 1. 停止硬件监控
if self.__monitor_cron is not None:
self.__monitor_cron.cancel()
if self.__monitor_timer is not None:
self.__monitor_timer.cancel()
self.__monitor_timer.join()
# 2. 更新状态
self.__state = SwanLabRunState.SUCCESS if error is None else SwanLabRunState.CRASHED
# 3. 触发回调
Expand Down
Loading