diff --git a/docs/api/index.rst b/docs/api/index.rst index bde3dcc24..2ca4feec7 100644 --- a/docs/api/index.rst +++ b/docs/api/index.rst @@ -12,6 +12,7 @@ Core Abstractions lenskit.pipeline lenskit.diagnostics lenskit.operations + lenskit.training .. toctree:: :caption: Core @@ -21,6 +22,7 @@ Core Abstractions pipeline operations diagnostics + training Components and Models ~~~~~~~~~~~~~~~~~~~~~ diff --git a/docs/api/training.rst b/docs/api/training.rst new file mode 100644 index 000000000..40b22cf71 --- /dev/null +++ b/docs/api/training.rst @@ -0,0 +1,4 @@ +Model Training +============== + +.. automodule:: lenskit.training diff --git a/docs/guide/conventions.rst b/docs/guide/conventions.rst index fa02eedfa..48930f12a 100644 --- a/docs/guide/conventions.rst +++ b/docs/guide/conventions.rst @@ -24,19 +24,32 @@ Random Seeds .. _SPEC 7: https://scientific-python.org/specs/spec-0007/ -LensKit components follow `SPEC 7`_ for specifying random number seeds. -Components that use randomization (either at runtime, or to set initial -conditions for training) have a constructor parameter `rng` that takes either a -:class:`~numpy.random.Generator` or seed material. If you want reproducible -stochastic pipelines, configure the random seeds for your components. - -This convention is also followed for other LensKit code, such as the `data -splitting support <./splitting>`_. +LensKit components follow `SPEC 7`_ for specifying random number seeds. If you +want reproducible stochastic pipelines, configure the random seeds for your +components and/or training process. + +Components that use randomization at **inference time** take either seed +material or a :class:`~numpy.random.Generator` as an ``rng`` constructor +parameter; if seed material is supplied, that seed should be considered part of +the configuration (see the source code in :mod:`lenskit.basic.random` for +examples). + +Components that use randomization at **training time** (e.g. to shuffle data or +to initialize parameter values) should obtain their generator or seed from the +:attr:`~lenskit.training.TrainingOptions`. This makes it easy to configure a +seed for the training process without needing to configure each component. For +consistent configurability, it's best for components using other frameworks such +as PyTorch to use NumPy to initialize the parameter values and then convert the +initial values to the appropriate compute backend. + +Other LensKit code, such as the `data splitting support <./splitting>`_, follow +SPEC 7 directly by accepting an ``rng`` keyword parameter. .. important:: - If you specify random seeds, we strongly recommend specifying seeds instead of - generators, so that the seed can be included in serialized configurations. + If you specify random seeds for component configurations, we strongly + recommend specifying seeds instead of generators, so that the seed can be + included in serialized configurations. .. versionchanged:: 2025.1 @@ -46,19 +59,10 @@ splitting support <./splitting>`_. LensKit extends SPEC 7 with a global RNG that components can use as a fallback, to make it easier to configure system-wide generation for things like tests. -This is configured with :func:`~lenskit.random.set_global_rng`. - -When implementing a component that uses randomness in its training, we recommend -deferring conversion of the provided RNG into an actual generator until -model-training time, so that serializing an untrained pipeline or its -configuration includes the original seed instead of the resulting generator. -When using the RNG to create initial state for e.g. training a model with -PyTorch, it can be useful to create that state in NumPy and then convert to a -tensor, so that components are consistent in their random number generation -behavior instead of having variation between NumPy and other backends. -Components can use the :func:`~lenskit.random_generator` function to -convert seed material or a generator into a NumPy generator, falling back to the -global RNG if one is specified. +This is configured with :func:`~lenskit.random.set_global_rng`. Components can +use the :func:`~lenskit.random_generator` function to convert seed material or a +generator into a NumPy generator, falling back to the global RNG if one is +specified. Derived Seeds ------------- diff --git a/docs/guide/pipeline.rst b/docs/guide/pipeline.rst index 7ccf035dd..a9106a21e 100644 --- a/docs/guide/pipeline.rst +++ b/docs/guide/pipeline.rst @@ -342,8 +342,9 @@ a component that requires no training or configuration can simply be a Python function. Most components will extend the :class:`Component` base class to expose -configuration capabilities, and implement the :class:`Trainable` protocol if -they contain a model that needs to be trained. +configuration capabilities, and implement the +:class:`lenskit.training.Trainable` protocol if they contain a model that needs +to be trained. Components also must be pickleable, as LensKit uses pickling for shared memory parallelism in its batch-inference code. diff --git a/lenskit-funksvd/lenskit/funksvd.py b/lenskit-funksvd/lenskit/funksvd.py index cb42f016e..c587df868 100644 --- a/lenskit-funksvd/lenskit/funksvd.py +++ b/lenskit-funksvd/lenskit/funksvd.py @@ -20,8 +20,8 @@ from lenskit import util from lenskit.basic import BiasModel, Damping from lenskit.data import Dataset, ItemList, QueryInput, RecQuery, Vocabulary -from lenskit.pipeline import Component, Trainable -from lenskit.random import ConfiguredSeed, random_generator +from lenskit.pipeline import Component +from lenskit.training import Trainable, TrainingOptions _logger = logging.getLogger(__name__) @@ -53,10 +53,6 @@ class FunkSVDConfig(BaseModel): """ Min/max range of ratings to clamp output. """ - rng: ConfiguredSeed = None - """ - RNG seed. - """ @jitclass( @@ -257,18 +253,17 @@ class FunkSVDScorer(Trainable, Component[ItemList]): items_: Vocabulary item_features_: np.ndarray[tuple[int, int], np.dtype[np.float64]] - @property - def is_trained(self) -> bool: - return hasattr(self, "item_features_") - @override - def train(self, data: Dataset): + def train(self, data: Dataset, options: TrainingOptions = TrainingOptions()): """ Train a FunkSVD model. Args: ratings: the ratings data frame. """ + if hasattr(self, "item_features_") and not options.retrain: + return + timer = util.Stopwatch() rate_df = data.interaction_matrix(format="pandas", layout="coo", field="rating") @@ -278,7 +273,7 @@ def train(self, data: Dataset): _logger.info("[%s] preparing rating data for %d samples", timer, len(rate_df)) _logger.debug("shuffling rating data") shuf = np.arange(len(rate_df), dtype=np.int_) - rng = random_generator(self.config.rng) + rng = options.random_generator() rng.shuffle(shuf) rate_df = rate_df.iloc[shuf, :] diff --git a/lenskit-hpf/lenskit/hpf.py b/lenskit-hpf/lenskit/hpf.py index b84bb6809..6e90814b0 100644 --- a/lenskit-hpf/lenskit/hpf.py +++ b/lenskit-hpf/lenskit/hpf.py @@ -12,7 +12,8 @@ from typing_extensions import override from lenskit.data import Dataset, ItemList, QueryInput, RecQuery, Vocabulary -from lenskit.pipeline import Component, Trainable +from lenskit.pipeline import Component +from lenskit.training import Trainable, TrainingOptions _logger = logging.getLogger(__name__) @@ -47,12 +48,11 @@ class HPFScorer(Component[ItemList], Trainable): items_: Vocabulary item_features_: np.ndarray[tuple[int, int], np.dtype[np.float64]] - @property - def is_trained(self) -> bool: - return hasattr(self, "item_features_") - @override - def train(self, data: Dataset): + def train(self, data: Dataset, options: TrainingOptions = TrainingOptions()): + if hasattr(self, "item_features_") and not options.retrain: + return + log = data.interaction_matrix("pandas", field="rating") log = log.rename( columns={ diff --git a/lenskit-hpf/tests/test_hpf.py b/lenskit-hpf/tests/test_hpf.py index 43af6dd68..32ca97804 100644 --- a/lenskit-hpf/tests/test_hpf.py +++ b/lenskit-hpf/tests/test_hpf.py @@ -16,6 +16,7 @@ from lenskit.metrics import quick_measure_model from lenskit.pipeline import topn_pipeline from lenskit.testing import BasicComponentTests, ScorerTests +from lenskit.training import TrainingOptions hpf = importorskip("lenskit.hpf") @@ -47,7 +48,7 @@ def test_hpf_train_large(tmp_path, ml_ratings): assert np.all(a2.item_features_ == algo.item_features_) pipe = topn_pipeline(algo) - pipe.train(ds, retrain=False) + pipe.train(ds, TrainingOptions(retrain=False)) for u in np.random.choice(ratings.user_id.unique(), size=50, replace=False): recs = pipe.run("recommender", query=u, n=50) @@ -76,7 +77,7 @@ def test_hpf_train_binary(tmp_path, ml_ratings): assert np.all(a2.item_features_ == algo.item_features_) pipe = topn_pipeline(algo) - pipe.train(ds, retrain=False) + pipe.train(ds, TrainingOptions(retrain=False)) for u in np.random.choice(ratings.user_id.unique(), size=50, replace=False): recs = pipe.run("recommender", query=u, n=50) diff --git a/lenskit-implicit/lenskit/implicit.py b/lenskit-implicit/lenskit/implicit.py index c661fded5..d3503ef5d 100644 --- a/lenskit-implicit/lenskit/implicit.py +++ b/lenskit-implicit/lenskit/implicit.py @@ -15,7 +15,8 @@ from typing_extensions import override from lenskit.data import Dataset, ItemList, QueryInput, RecQuery, Vocabulary -from lenskit.pipeline import Component, Trainable +from lenskit.pipeline import Component +from lenskit.training import Trainable, TrainingOptions _logger = logging.getLogger(__name__) @@ -62,12 +63,11 @@ class BaseRec(Component[ItemList], Trainable): The item ID mapping from training. """ - @property - def is_trained(self): - return hasattr(self, "matrix_") - @override - def train(self, data: Dataset): + def train(self, data: Dataset, options: TrainingOptions = TrainingOptions()): + if hasattr(self, "delegate") and not options.retrain: + return + matrix = data.interaction_matrix("scipy", layout="csr", legacy=True) uir = matrix * self.weight @@ -100,8 +100,8 @@ def __call__(self, query: QueryInput, items: ItemList) -> ItemList: mask = inos >= 0 good_inos = inos[mask] - ifs = self.delegate.item_factors[good_inos] - uf = self.delegate.user_factors[user_num] + ifs = self.delegate.item_factors[good_inos] # type: ignore + uf = self.delegate.user_factors[user_num] # type: ignore # convert back if these are on CUDA if hasattr(ifs, "to_numpy"): diff --git a/lenskit-sklearn/lenskit/sklearn/svd.py b/lenskit-sklearn/lenskit/sklearn/svd.py index d59143997..0214353a5 100644 --- a/lenskit-sklearn/lenskit/sklearn/svd.py +++ b/lenskit-sklearn/lenskit/sklearn/svd.py @@ -16,7 +16,8 @@ from lenskit.basic import BiasModel, Damping from lenskit.data import Dataset, ItemList, QueryInput, RecQuery from lenskit.data.vocab import Vocabulary -from lenskit.pipeline import Component, Trainable +from lenskit.pipeline import Component +from lenskit.training import Trainable, TrainingOptions from lenskit.util import Stopwatch try: @@ -61,12 +62,11 @@ class BiasedSVDScorer(Component[ItemList], Trainable): items_: Vocabulary user_components_: NDArray[np.float64] - @property - def is_trained(self): - return hasattr(self, "factorization_") - @override - def train(self, data: Dataset): + def train(self, data: Dataset, options: TrainingOptions = TrainingOptions()): + if hasattr(self, "factorization_") and not options.retrain: + return + timer = Stopwatch() _log.info("[%s] computing bias", timer) self.bias_ = BiasModel.learn(data, self.config.damping) @@ -90,7 +90,7 @@ def train(self, data: Dataset): self.config.features, algorithm=self.config.algorithm, n_iter=self.config.n_iter ) _log.info("[%s] training SVD (k=%d)", timer, self.factorization_.n_components) # type: ignore - Xt = self.factorization_.fit_transform(r_mat) + Xt = self.factorization_.fit_transform(r_mat) # type: ignore self.user_components_ = Xt self.users_ = data.users.copy() self.items_ = data.items.copy() diff --git a/lenskit/lenskit/als/_common.py b/lenskit/lenskit/als/_common.py index 17e834b77..954adf181 100644 --- a/lenskit/lenskit/als/_common.py +++ b/lenskit/lenskit/als/_common.py @@ -7,7 +7,7 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import Any, Literal, TypeAlias +from typing import Literal, TypeAlias import numpy as np import structlog @@ -20,8 +20,8 @@ from lenskit.data.types import UIPair from lenskit.logging import item_progress from lenskit.parallel.config import ensure_parallel_init -from lenskit.pipeline import Component, Trainable -from lenskit.random import ConfiguredSeed, RNGInput, RNGLike, random_generator +from lenskit.pipeline import Component +from lenskit.training import Trainable, TrainingOptions EntityClass: TypeAlias = Literal["user", "item"] @@ -43,10 +43,6 @@ class ALSConfig(BaseModel): """ L2 regularization strength. """ - rng: ConfiguredSeed = None - """ - Random number seed. - """ save_user_features: bool = True """ Whether to retain user feature values after training. @@ -139,7 +135,6 @@ class ALSBase(ABC, Component[ItemList], Trainable): """ config: ALSConfig - rng: RNGLike | None = None users_: Vocabulary | None items_: Vocabulary @@ -148,30 +143,24 @@ class ALSBase(ABC, Component[ItemList], Trainable): logger: structlog.stdlib.BoundLogger - def __init__(self, config: ALSConfig | None = None, *, rng: RNGInput = None, **kwargs: Any): - # hadle non-configurable RNG - if isinstance(rng, (np.random.Generator, np.random.BitGenerator)): - self.rng = rng - elif rng is not None: - kwargs = kwargs | {"rng": rng} - super().__init__(config, **kwargs) - - @property - def is_trained(self) -> bool: - return hasattr(self, "item_features_") - @override - def train(self, data: Dataset): + def train(self, data: Dataset, options: TrainingOptions = TrainingOptions()) -> bool: """ Run ALS to train a model. Args: ratings: the ratings data frame. + + Returns: + ``True`` if the model was trained. """ + if hasattr(self, "item_features_") and not options.retrain: + return False + ensure_parallel_init() timer = util.Stopwatch() - for algo in self.fit_iters(data): + for algo in self.fit_iters(data, options): pass # we just need to do the iterations if self.user_features_ is not None: @@ -190,7 +179,9 @@ def train(self, data: Dataset): features=self.config.features, ) - def fit_iters(self, data: Dataset) -> Iterator[Self]: + return True + + def fit_iters(self, data: Dataset, options: TrainingOptions) -> Iterator[Self]: """ Run ALS to train a model, yielding after each iteration. @@ -199,12 +190,13 @@ def fit_iters(self, data: Dataset) -> Iterator[Self]: """ log = self.logger = self.logger.bind(features=self.config.features) + rng = options.random_generator() train = self.prepare_data(data) self.users_ = train.users self.items_ = train.items - self.initialize_params(train) + self.initialize_params(train, rng) assert self.user_features_ is not None assert self.item_features_ is not None @@ -250,11 +242,10 @@ def prepare_data(self, data: Dataset) -> TrainingData: # pragma: no cover """ ... - def initialize_params(self, data: TrainingData): + def initialize_params(self, data: TrainingData, rng: np.random.Generator): """ Initialize the model parameters at the beginning of training. """ - rng = random_generator(self.rng or self.config.rng) self.logger.debug("initializing item matrix") self.item_features_ = self.initial_params(data.n_items, self.config.features, rng) self.logger.debug("|Q|: %f", torch.norm(self.item_features_, "fro")) diff --git a/lenskit/lenskit/als/_implicit.py b/lenskit/lenskit/als/_implicit.py index 3fd615fba..c183618f4 100644 --- a/lenskit/lenskit/als/_implicit.py +++ b/lenskit/lenskit/als/_implicit.py @@ -17,6 +17,7 @@ from lenskit.logging.progress import item_progress_handle, pbh_update from lenskit.math.solve import solve_cholesky from lenskit.parallel.chunking import WorkChunks +from lenskit.training import TrainingOptions from ._common import ALSBase, ALSConfig, TrainContext, TrainingData @@ -71,12 +72,12 @@ class ImplicitMFScorer(ALSBase): OtOr_: torch.Tensor @override - def train(self, data: Dataset): - super().train(data) - - # compute OtOr and save it on the model - reg = self.config.user_reg - self.OtOr_ = _implicit_otor(self.item_features_, reg) + def train(self, data: Dataset, options: TrainingOptions = TrainingOptions()): + if super().train(data, options): + # compute OtOr and save it on the model + reg = self.config.user_reg + self.OtOr_ = _implicit_otor(self.item_features_, reg) + return True @override def prepare_data(self, data: Dataset) -> TrainingData: diff --git a/lenskit/lenskit/basic/bias.py b/lenskit/lenskit/basic/bias.py index 8af2ca1c0..01e923219 100644 --- a/lenskit/lenskit/basic/bias.py +++ b/lenskit/lenskit/basic/bias.py @@ -21,6 +21,7 @@ from lenskit.data import ID, Dataset, ItemList, QueryInput, RecQuery, Vocabulary from lenskit.pipeline.components import Component +from lenskit.training import Trainable, TrainingOptions _logger = logging.getLogger(__name__) BiasEntity: TypeAlias = Literal["user", "item"] @@ -272,7 +273,7 @@ def entity_damping(self, entity: Literal["user", "item"]) -> float: return entity_damping(self.damping, entity) -class BiasScorer(Component[ItemList]): +class BiasScorer(Component[ItemList], Trainable): """ A user-item bias rating prediction model. This component uses :class:`BiasModel` to predict ratings for users and items. @@ -288,11 +289,7 @@ class BiasScorer(Component[ItemList]): config: BiasConfig model_: BiasModel - @property - def is_trained(self) -> bool: - return hasattr(self, "model_") - - def train(self, data: Dataset): + def train(self, data: Dataset, options: TrainingOptions = TrainingOptions()): """ Train the bias model on some rating data. @@ -303,6 +300,9 @@ def train(self, data: Dataset): Returns: The trained bias object. """ + if hasattr(self, "model_") and not options.retrain: + return + self.model_ = BiasModel.learn(data, self.config.damping, entities=self.config.entities) def __call__(self, query: QueryInput, items: ItemList) -> ItemList: diff --git a/lenskit/lenskit/basic/candidates.py b/lenskit/lenskit/basic/candidates.py index 0359e288e..dbdcb2d59 100644 --- a/lenskit/lenskit/basic/candidates.py +++ b/lenskit/lenskit/basic/candidates.py @@ -6,7 +6,8 @@ from typing_extensions import override from lenskit.data import Dataset, ItemList, QueryInput, RecQuery, Vocabulary -from lenskit.pipeline import Component, Trainable +from lenskit.pipeline import Component +from lenskit.training import Trainable, TrainingOptions _logger = logging.getLogger(__name__) @@ -21,17 +22,16 @@ class TrainingCandidateSelectorBase(Component[ItemList], Trainable): config: None items_: Vocabulary - - @property - def is_trained(self) -> bool: - return hasattr(self, "items_") + """ + List of known items from the training data. + """ @override - def train(self, data: Dataset): - self.items_ = data.items.copy() + def train(self, data: Dataset, options: TrainingOptions = TrainingOptions()): + if hasattr(self, "items_") and not options.retrain: + return - def __str__(self): - return self.__class__.__name__ + self.items_ = data.items.copy() class AllTrainingItemsCandidateSelector(TrainingCandidateSelectorBase): diff --git a/lenskit/lenskit/basic/history.py b/lenskit/lenskit/basic/history.py index 9f4408070..962e190d7 100644 --- a/lenskit/lenskit/basic/history.py +++ b/lenskit/lenskit/basic/history.py @@ -15,7 +15,8 @@ from lenskit.data import Dataset, ItemList, QueryInput, RecQuery from lenskit.data.matrix import CSRStructure from lenskit.data.vocab import Vocabulary -from lenskit.pipeline import Component, Trainable +from lenskit.pipeline import Component +from lenskit.training import Trainable, TrainingOptions _logger = logging.getLogger(__name__) @@ -30,13 +31,12 @@ class UserTrainingHistoryLookup(Component[ItemList], Trainable): training_data_: Dataset - @property - def is_trained(self) -> bool: - return hasattr(self, "training_data_") - @override - def train(self, data: Dataset): + def train(self, data: Dataset, options: TrainingOptions = TrainingOptions()): # TODO: find a better data structure for this + if hasattr(self, "training_data_") and not options.retrain: + return + self.training_data_ = data def __call__(self, query: QueryInput) -> RecQuery: @@ -53,9 +53,6 @@ def __call__(self, query: QueryInput) -> RecQuery: return query - def __str__(self): - return self.__class__.__name__ - class KnownRatingScorer(Component[ItemList], Trainable): """ @@ -90,12 +87,11 @@ def __init__( self.score = score self.source = source - @property - def is_trained(self) -> bool: - return hasattr(self, "matrix_") - @override - def train(self, data: Dataset): + def train(self, data: Dataset, options: TrainingOptions = TrainingOptions()): + if hasattr(self, "matrix_") and not options.retrain: + return + if self.source == "query": return @@ -125,6 +121,7 @@ def __call__(self, query: QueryInput, items: ItemList) -> ItemList: assert self.score != "indicator" # get the user's row as a sparse array uarr = self.matrix_[[urow]] + assert isinstance(uarr, csr_array) # create a series scores = pd.Series(uarr.data, index=self.items_.ids(uarr.indices)) elif isinstance(self.matrix_, CSRStructure): @@ -136,7 +133,4 @@ def __call__(self, query: QueryInput, items: ItemList) -> ItemList: scores = scores.reindex( items.ids(), fill_value=0.0 if self.score == "indicator" else np.nan ) - return ItemList(items, scores=scores.values) - - def __str__(self): - return self.__class__.__name__ + return ItemList(items, scores=scores.values) # type: ignore diff --git a/lenskit/lenskit/basic/popularity.py b/lenskit/lenskit/basic/popularity.py index 229b4f5c0..ecb3279d7 100644 --- a/lenskit/lenskit/basic/popularity.py +++ b/lenskit/lenskit/basic/popularity.py @@ -10,7 +10,8 @@ from typing_extensions import override from lenskit.data import Dataset, ItemList, Vocabulary -from lenskit.pipeline import Component, Trainable +from lenskit.pipeline import Component +from lenskit.training import Trainable, TrainingOptions _log = logging.getLogger(__name__) @@ -45,12 +46,11 @@ class PopScorer(Component[ItemList], Trainable): items_: Vocabulary item_scores_: np.ndarray[int, np.dtype[np.float32]] - @property - def is_trained(self) -> bool: - return hasattr(self, "item_scores_") - @override - def train(self, data: Dataset): + def train(self, data: Dataset, options: TrainingOptions = TrainingOptions()): + if hasattr(self, "item_scores_") and not options.retrain: + return + _log.info("counting item popularity") self.items_ = data.items.copy() stats = data.item_stats() @@ -106,15 +106,17 @@ class TimeBoundedPopScore(PopScorer): config: TimeBoundedPopConfig @override - def train(self, data: Dataset, **kwargs): - _log.info("counting time-bounded item popularity") + def train(self, data: Dataset, options: TrainingOptions = TrainingOptions()): + if hasattr(self, "item_scores_") and not options.retrain: + return + _log.info("counting time-bounded item popularity") log = data.interaction_log("numpy") item_scores = None if log.timestamps is None: _log.warning("no timestamps in interaction log; falling back to PopScorer") - super().train(data, **kwargs) + super().train(data, options) return else: counts = np.zeros(data.item_count, dtype=np.int32) @@ -124,7 +126,6 @@ def train(self, data: Dataset, **kwargs): item_scores = super()._train_internal( pd.Series(counts, index=data.items.index), - **kwargs, ) self.items_ = data.items.copy() diff --git a/lenskit/lenskit/knn/item.py b/lenskit/lenskit/knn/item.py index 4616c2b60..4174bcc77 100644 --- a/lenskit/lenskit/knn/item.py +++ b/lenskit/lenskit/knn/item.py @@ -26,7 +26,8 @@ from lenskit.logging.progress import item_progress_handle, pbh_update from lenskit.math.sparse import normalize_sparse_rows, safe_spmv from lenskit.parallel import ensure_parallel_init -from lenskit.pipeline import Component, Trainable +from lenskit.pipeline import Component +from lenskit.training import Trainable, TrainingOptions from lenskit.util.torch import inference_mode _log = get_logger(__name__) @@ -111,13 +112,9 @@ class ItemKNNScorer(Component[ItemList], Trainable): users_: Vocabulary "Vocabulary of user IDs." - @property - def is_trained(self) -> bool: - return hasattr(self, "items_") - @override @inference_mode - def train(self, data: Dataset): + def train(self, data: Dataset, options: TrainingOptions = TrainingOptions()): """ Train a model. @@ -128,6 +125,9 @@ def train(self, data: Dataset): ratings: (user,item,rating) data for computing item similarities. """ + if hasattr(self, "items_") and not options.retrain: + return + ensure_parallel_init() log = _log.bind(n_items=data.item_count, feedback=self.config.feedback) # Training proceeds in 2 steps: diff --git a/lenskit/lenskit/knn/user.py b/lenskit/lenskit/knn/user.py index 5f344c671..6bf053e49 100644 --- a/lenskit/lenskit/knn/user.py +++ b/lenskit/lenskit/knn/user.py @@ -19,7 +19,7 @@ import torch from pydantic import BaseModel, PositiveFloat, PositiveInt, field_validator from scipy.sparse import csc_array -from typing_extensions import NamedTuple, Optional, Self, override +from typing_extensions import NamedTuple, Optional, override from lenskit import util from lenskit.data import Dataset, FeedbackType, ItemList, QueryInput, RecQuery @@ -28,7 +28,8 @@ from lenskit.logging import get_logger from lenskit.math.sparse import normalize_sparse_rows, safe_spmv, torch_sparse_to_scipy from lenskit.parallel.config import ensure_parallel_init -from lenskit.pipeline import Component, Trainable +from lenskit.pipeline import Component +from lenskit.training import Trainable, TrainingOptions _log = get_logger(__name__) @@ -98,12 +99,8 @@ class UserKNNScorer(Component[ItemList], Trainable): user_ratings_: csc_array "Centered but un-normalized rating matrix (COO) to find neighbor ratings." - @property - def is_trained(self) -> bool: - return hasattr(self, "users_") - @override - def train(self, data: Dataset) -> Self: + def train(self, data: Dataset, options: TrainingOptions = TrainingOptions()): """ "Train" a user-user CF model. This memorizes the rating data in a format that is usable for future computations. @@ -111,6 +108,9 @@ def train(self, data: Dataset) -> Self: Args: ratings(pandas.DataFrame): (user, item, rating) data for collaborative filtering. """ + if hasattr(self, "user_ratings_") and not options.retrain: + return + ensure_parallel_init() rmat = data.interaction_matrix("torch", field="rating" if self.config.explicit else None) assert rmat.is_sparse_csr @@ -134,8 +134,6 @@ def train(self, data: Dataset) -> Self: self.user_means_ = means self.items_ = data.items.copy() - return self - @override def __call__(self, query: QueryInput, items: ItemList) -> ItemList: """ diff --git a/lenskit/lenskit/pipeline/__init__.py b/lenskit/lenskit/pipeline/__init__.py index af39710f3..6902a95a7 100644 --- a/lenskit/lenskit/pipeline/__init__.py +++ b/lenskit/lenskit/pipeline/__init__.py @@ -17,7 +17,6 @@ from .components import ( Component, PipelineFunction, - Trainable, ) from .config import PipelineConfig from .nodes import Node @@ -32,7 +31,6 @@ "PipelineState", "Node", "PipelineFunction", - "Trainable", "PipelineConfig", "Lazy", "Component", diff --git a/lenskit/lenskit/pipeline/_impl.py b/lenskit/lenskit/pipeline/_impl.py index 360366308..f2ec38a4f 100644 --- a/lenskit/lenskit/pipeline/_impl.py +++ b/lenskit/lenskit/pipeline/_impl.py @@ -3,20 +3,22 @@ import typing import warnings +from dataclasses import replace from types import FunctionType, UnionType from uuid import NAMESPACE_URL, uuid4, uuid5 +from numpy.random import BitGenerator, Generator, SeedSequence from typing_extensions import Any, Literal, Self, TypeAlias, TypeVar, cast, overload from lenskit.data import Dataset from lenskit.diagnostics import PipelineError, PipelineWarning from lenskit.logging import get_logger +from lenskit.training import Trainable, TrainingOptions from . import config from .components import ( # type: ignore # noqa: F401 Component, PipelineFunction, - Trainable, fallback_on_none, instantiate_component, ) @@ -49,6 +51,9 @@ class Pipeline: If you have a scoring model and just want to generate recommenations with a default setup and minimal configuration, see :func:`topn_pipeline`. + Pipelines are also :class:`~lenskit.training.Trainable`, and train all + trainable components. + Args: name: A name for the pipeline. @@ -555,27 +560,47 @@ def from_config(cls, config: object) -> Self: return pipe - def train(self, data: Dataset, *, retrain: bool = True) -> None: + def train(self, data: Dataset, options: TrainingOptions | None = None) -> None: """ Trains the pipeline's trainable components (those implementing the :class:`TrainableComponent` interface) on some training data. + .. admonition:: Random Number Generation + :class: note + + If :attr:`TrainingOptions.rng` is set and is not a generator or bit + generator (i.e. it is a seed), then this method wraps the seed in a + :class:`~numpy.random.SeedSequence` and calls + :class:`~numpy.random.SeedSequence.spawn()` to generate a distinct + seed for each component in the pipeline. + Args: data: The dataset to train on. - retrain: - Whether to re-train components that have already been trained. - """ - for comp in self._components.values(): - if hasattr(comp, "train"): - comp = cast(Trainable, comp) - if comp.is_trained and not retrain: - _log.debug("component %s already trained", comp) - continue - _log.info("training %s", comp) - comp.train(data) + options: + The training options. If ``None``, default options are used. + """ + log = _log.bind(pipeline=self.name) + if options is None: + options = TrainingOptions() + + if isinstance(options.rng, SeedSequence): + seed = options.rng + elif options.rng is None or isinstance(options.rng, (Generator, BitGenerator)): + seed = None + else: + seed = SeedSequence(options.rng) + + log.info("training pipeline components") + for name, comp in self._components.items(): + clog = log.bind(name=name, component=comp) + if isinstance(comp, Trainable): + # spawn new seed if needed + c_opts = options if seed is None else replace(options, rng=seed.spawn(1)[0]) + clog.info("training component") + comp.train(data, c_opts) else: - _log.debug("component %s does not need training", comp) + clog.debug("training not required") @overload def run(self, /, **kwargs: object) -> object: ... diff --git a/lenskit/lenskit/pipeline/components.py b/lenskit/lenskit/pipeline/components.py index 8d5d3bdb7..5983cff58 100644 --- a/lenskit/lenskit/pipeline/components.py +++ b/lenskit/lenskit/pipeline/components.py @@ -30,8 +30,6 @@ from pydantic import JsonValue, TypeAdapter -from lenskit.data.dataset import Dataset - from .types import Lazy P = ParamSpec("P") @@ -41,49 +39,6 @@ PipelineFunction: TypeAlias = Callable[..., COut] -@runtime_checkable -class Trainable(Protocol): # pragma: nocover - """ - Interface for components that can learn parameters from training data. It - supports trainingand checking if a component has already been trained. - Trained components need to be picklable. - - .. note:: - - Trainable components must also implement ``__call__``. - - .. note:: - - A future LensKit version will add support for extracting model - parameters a la Pytorch's ``state_dict``, but this capability was not - ready for 2025.1. - - Stability: - Full - """ - - @property - def is_trained(self) -> bool: - """ - Check if this model has already been trained. - """ - raise NotImplementedError() - - def train(self, data: Dataset) -> None: - """ - Train the pipeline component to learn its parameters from a training - dataset. - - Args: - data: - The training dataset. - retrain: - If ``True``, retrain the model even if it has already been - trained. - """ - raise NotImplementedError() - - @runtime_checkable class ParameterContainer(Protocol): # pragma: nocover """ diff --git a/lenskit/lenskit/testing/_components.py b/lenskit/lenskit/testing/_components.py index d1e411ef0..546c2535f 100644 --- a/lenskit/lenskit/testing/_components.py +++ b/lenskit/lenskit/testing/_components.py @@ -1,6 +1,7 @@ import inspect import os import pickle +from time import perf_counter from typing import ClassVar, Literal import numpy as np @@ -8,7 +9,8 @@ from pytest import approx, fixture, skip from lenskit.data import Dataset, ItemList, MatrixDataset, RecQuery -from lenskit.pipeline import Component, Trainable +from lenskit.pipeline import Component +from lenskit.training import Trainable, TrainingOptions from ._markers import jit_enabled @@ -78,13 +80,27 @@ def trained_model(self, ml_ds: Dataset): model = self.component() if isinstance(model, Trainable): - model.train(ml_ds) + model.train(ml_ds, TrainingOptions()) yield model - def test_basic_trained(self, ml_ds: Dataset, trained_model: Component): - assert isinstance(trained_model, self.component) - if isinstance(trained_model, Trainable): - assert trained_model.is_trained + def test_skip_retrain(self, ml_ds: Dataset): + self.maybe_skip_nojit() + model = self.component() + if not isinstance(model, Trainable): + skip(f"component {model.__class__.__name__} is not trainable") + + model.train(ml_ds, TrainingOptions()) + v1_data = pickle.dumps(model) + + # train again + t1 = perf_counter() + model.train(ml_ds, TrainingOptions(retrain=False)) + t2 = perf_counter() + # that should be very fast, let's say 10ms + assert t2 - t1 < 0.01 + # the model shouldn't have changed + v2_data = pickle.dumps(model) + assert v2_data == v1_data class ScorerTests(TrainingTests): @@ -239,7 +255,7 @@ def test_train_score_items_missing_data(self, rng: np.random.Generator, ml_ds: D model = self.component() assert isinstance(model, Trainable) - model.train(ds) + model.train(ds, TrainingOptions()) good_u = rng.choice(ml_ds.users.ids(), 10, replace=False) for u in set(good_u) | set(drop_u): diff --git a/lenskit/lenskit/training.py b/lenskit/lenskit/training.py new file mode 100644 index 000000000..717b0ad6b --- /dev/null +++ b/lenskit/lenskit/training.py @@ -0,0 +1,96 @@ +# This file is part of LensKit. +# Copyright (C) 2018-2023 Boise State University +# Copyright (C) 2023-2024 Drexel University +# Licensed under the MIT license, see LICENSE.md for details. +# SPDX-License-Identifier: MIT +""" +Interfaces and support for model training. +""" + +# pyright: strict +from __future__ import annotations + +from dataclasses import dataclass +from typing import ( + Protocol, + runtime_checkable, +) + +import numpy as np + +from lenskit.data.dataset import Dataset +from lenskit.random import RNGInput, random_generator + + +@dataclass(frozen=True) +class TrainingOptions: + """ + Options and context settings that govern model training. + """ + + retrain: bool = True + """ + Whether the model should retrain if it is already trained. If ``False``, + the model should cleanly skip training if it is already trained. + """ + + device: str | None = None + """ + The device on which to train (e.g. ``'cuda'``). May be ignored if the model + does not support the specified device. + """ + + rng: RNGInput = None + """ + Random number generator to use for any randomness in the training process. + This option contains any `SPEC 7`_-compatible random number generator + specification; the :func:`~lenskit.random.random_generator` will convert + that into a NumPy :class:`~numpy.random.Generator`. + """ + + def random_generator(self) -> np.random.Generator: + """ + Obtain a random generator from the configured RNG or seed. + + .. note:: + + Each call to this method will return a fresh generator from the same + seed. Components should call it once at the beginning of their + training procesess. + """ + return random_generator(self.rng) + + +@runtime_checkable +class Trainable(Protocol): # pragma: nocover + """ + Interface for components and objects that can learn parameters from training + data. It supports training and checking if a component has already been + trained. The resulting model should be pickleable. Trainable objects are + usually also components. + + .. note:: + + Trainable components must also implement ``__call__``. + + .. note:: + + A future LensKit version will add support for extracting model + parameters a la Pytorch's ``state_dict``, but this capability was not + ready for 2025.1. + + Stability: + Full + """ + + def train(self, data: Dataset, options: TrainingOptions) -> None: + """ + Train the model to learn its parameters from a training dataset. + + Args: + data: + The training dataset. + options: + The training options. + """ + raise NotImplementedError() diff --git a/lenskit/tests/pipeline/test_train.py b/lenskit/tests/pipeline/test_train.py index d45be476c..07cb304a3 100644 --- a/lenskit/tests/pipeline/test_train.py +++ b/lenskit/tests/pipeline/test_train.py @@ -9,7 +9,7 @@ from lenskit.data.dataset import Dataset from lenskit.data.vocab import Vocabulary from lenskit.pipeline import Pipeline -from lenskit.pipeline.components import Trainable +from lenskit.training import Trainable, TrainingOptions def test_train(ml_ds: Dataset): @@ -33,11 +33,7 @@ class TestComponent: def __call__(self, *, item: int) -> bool: return self.items.number(item, "none") is not None - @property - def is_trained(self) -> bool: - return hasattr(self, "items") - - def train(self, data: Dataset): + def train(self, data: Dataset, options: TrainingOptions): # we just memorize the items self.items = data.items