Skip to content

Commit

Permalink
C++-parallelized implementation of SLIM. (#29)
Browse files Browse the repository at this point in the history
* The first version for C++ slim

* L2 only version.

* It basically works.

* optimized performance

* Adding a doc

* Doc fixed

* added a test for cpp slim
  • Loading branch information
tohtsky committed Jan 11, 2021
1 parent ce08bae commit 256d8f1
Show file tree
Hide file tree
Showing 10 changed files with 330 additions and 97 deletions.
10 changes: 5 additions & 5 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ set(CPACK_PROJECT_VERSION ${PROJECT_VERSION})
include(CPack)

add_subdirectory(pybind11)
pybind11_add_module(irspack.recommenders._ials cpp_source/als/wrapper.cpp)
pybind11_add_module(irspack.recommenders._knn cpp_source/knn/wrapper.cpp)
pybind11_add_module(irspack.utils._util_cpp cpp_source/util.cpp)
pybind11_add_module(irspack._evapuator cpp_source/evaluator.cpp)
pybind11_add_module(irspack._rwr cpp_source/rws.cpp)
#pybind11_add_module(irspack.recommenders._ials cpp_source/als/wrapper.cpp)
#pybind11_add_module(irspack.recommenders._knn cpp_source/knn/wrapper.cpp)
pybind11_add_module(_util_cpp cpp_source/util.cpp)
#pybind11_add_module(irspack._evapuator cpp_source/evaluator.cpp)
#pybind11_add_module(irspack._rwr cpp_source/rws.cpp)
8 changes: 8 additions & 0 deletions cpp_source/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,12 @@ PYBIND11_MODULE(_util_cpp, m) {
py::arg("X"), py::arg("k1") = 1.2, py::arg("b") = 0.75);
m.def("tf_idf_weight", &sparse_util::tf_idf_weight<double>, py::arg("X"),
py::arg("smooth") = true);

m.def("slim_weight_allow_negative", &sparse_util::SLIM<float, false>,
py::arg("X"), py::arg("n_threads"), py::arg("n_iter"),
py::arg("l2_coeff"), py::arg("l1_coeff"));

m.def("slim_weight_positive_only", &sparse_util::SLIM<float, true>,
py::arg("X"), py::arg("n_threads"), py::arg("n_iter"),
py::arg("l2_coeff"), py::arg("l1_coeff"));
}
166 changes: 166 additions & 0 deletions cpp_source/util.hpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
#pragma once
#include <Eigen/Core>
#include <Eigen/Sparse>
#include <atomic>
#include <bits/stdint-intn.h>
#include <cstddef>
#include <future>
#include <iostream>
#include <random>
#include <stdexcept>
#include <thread>
#include <tuple>
#include <unordered_map>
#include <vector>

#include "argcheck.hpp"

Expand All @@ -16,9 +21,20 @@ namespace sparse_util {
template <typename Real>
using CSRMatrix = Eigen::SparseMatrix<Real, Eigen::RowMajor>;

template <typename Real>
using RowMajorMatrix =
Eigen::Matrix<Real, Eigen::Dynamic, Eigen::Dynamic, Eigen::RowMajor>;

template <typename Real>
using ColMajorMatrix =
Eigen::Matrix<Real, Eigen::Dynamic, Eigen::Dynamic, Eigen::ColMajor>;

template <typename Real>
using DenseVector = Eigen::Matrix<Real, Eigen::Dynamic, 1>;

template <typename Real>
using DenseColVector = Eigen::Matrix<Real, 1, Eigen::Dynamic, Eigen::RowMajor>;

template <typename Real>
using CSCMatrix = Eigen::SparseMatrix<Real, Eigen::ColMajor>;

Expand Down Expand Up @@ -170,5 +186,155 @@ CSRMatrix<Real> remove_diagonal(const CSRMatrix<Real> &X) {
return result;
}

template <typename Real, bool positive_only = false,
int block_size = Eigen::internal::packet_traits<Real>::size>
inline CSCMatrix<Real> SLIM(const CSRMatrix<Real> &X, size_t n_threads,
size_t n_iter, Real l2_coeff, Real l1_coeff) {
check_arg(n_threads > 0, "n_threads must be > 0.");
check_arg(n_iter > 0, "n_iter must be > 0.");
check_arg(l2_coeff >= 0, "l2_coeff must be > 0.");
check_arg(l1_coeff >= 0, "l1_coeff must be > 0.");
using MatrixType =
Eigen::Matrix<Real, Eigen::Dynamic, block_size, Eigen::RowMajor>;
using VectorType = Eigen::Matrix<Real, block_size, 1>;

// CSRMatrix<Real> X_csr(X);
CSCMatrix<Real> X_csc(X);
X_csc.makeCompressed();
using TripletType = Eigen::Triplet<Real>;
using CSCIter = typename CSCMatrix<Real>::InnerIterator;
std::vector<std::future<std::vector<TripletType>>> workers;
std::atomic<int64_t> cursor(0);
for (size_t th = 0; th < n_threads; th++) {
workers.emplace_back(std::async(std::launch::async, [th, &cursor, &X_csc,
l2_coeff, l1_coeff,
n_iter] {
const int64_t F = X_csc.cols();
MatrixType remnants(X_csc.rows(), block_size);
MatrixType coeffs(F, block_size);
VectorType linear(block_size);
VectorType linear_plus(block_size);
VectorType linear_minus(block_size);

std::vector<TripletType> local_resuts;
while (true) {
int64_t current_cursor = cursor.fetch_add(block_size);
if (current_cursor >= F) {
break;
}
int64_t block_begin = current_cursor;
int64_t block_end = std::min(block_begin + block_size, F);
int64_t valid_block_size = block_end - block_begin;
remnants.array() = 0;
coeffs.array() = 0;

for (int64_t f_cursor = block_begin; f_cursor < block_end; f_cursor++) {
const int64_t internal_col_position = f_cursor - block_begin;
for (CSCIter iter(X_csc, f_cursor); iter; ++iter) {
remnants(iter.row(), internal_col_position) = -iter.value();
}
}

for (size_t cd_iteration = 0; cd_iteration < n_iter; cd_iteration++) {
for (int64_t feature_index = 0; feature_index < F; feature_index++) {
linear.array() = static_cast<Real>(0.0);
Real x2_sum = static_cast<Real>(0.0);
for (CSCIter nnz_iter(X_csc, feature_index); nnz_iter; ++nnz_iter) {
Real x = nnz_iter.value();

const int64_t row = nnz_iter.row();
x2_sum += x * x;
/*
loss = \sum_u (remnant_u - w^old _f X_uf + w^new _f X_uf ) ^2
z_new =
CONST
+ \sum_u X_{uf} ^2 w^new_f ^2
+ 2 * w^new_f \sum_u X_{uf} ( remnant_u - X_{uf} w^{old}_f )
LINEAR_COEFF =
\sum_u X_{uf} ( remnant_u ) -
- \sum _u ( X_{uf} ^2) w^{old}_f
*/
remnants.row(row).noalias() -= x * coeffs.row(feature_index);
linear.noalias() += x * remnants.row(row);
}

Real quadratic = x2_sum + l2_coeff;
linear_plus.array() = (-linear.array() - l1_coeff) / quadratic;
linear_minus.array() = (-linear.array() + l1_coeff) / quadratic;
// linear_plus /= quadratic;

Real *ptr_location = coeffs.data() + feature_index * block_size;
Real *lp_ptr = linear_plus.data();
Real *lm_ptr = linear_minus.data();

for (int64_t inner_cursor_position = 0;
inner_cursor_position < block_size; inner_cursor_position++) {
Real lplus = *(lp_ptr++);
Real lminus = *(lm_ptr++);
int64_t original_cursor_position =
inner_cursor_position + block_begin;
if (original_cursor_position == feature_index) {
*(ptr_location++) = 0.0;
continue;
}
if (positive_only) {
if (lplus > 0) {
*(ptr_location++) = lplus;
} else {
*(ptr_location++) = static_cast<Real>(0.0);
}

} else {
if (lplus > 0) {
*(ptr_location++) = lplus;
} else {
if (lminus < 0) {
*(ptr_location++) = lminus;
} else {
*(ptr_location++) = static_cast<Real>(0.0);
}
}
} // allow nagative block
}

for (CSCIter nnz_iter(X_csc, feature_index); nnz_iter; ++nnz_iter) {
Real x = nnz_iter.value();
const int64_t row = nnz_iter.row();
remnants.row(row).noalias() += x * coeffs.row(feature_index);
}
}
}

for (int64_t f = 0; f < F; f++) {
for (int64_t inner_cursor_position = 0;
inner_cursor_position < valid_block_size;
inner_cursor_position++) {
int64_t original_location = inner_cursor_position + block_begin;
Real c = coeffs(f, inner_cursor_position);
if (c != 0.0) {
local_resuts.emplace_back(f, original_location, c);
}
}
}
}
return local_resuts;
}));
}
std::vector<TripletType> nnzs;
for (auto &fres : workers) {
auto result = fres.get();
for (const auto &w : result) {
nnzs.emplace_back(w);
}
}

CSCMatrix<Real> result(X.cols(), X.cols());
result.setFromTriplets(nnzs.begin(), nnzs.end());
result.makeCompressed();
return result;
}

} // namespace sparse_util
} // namespace irspack
2 changes: 2 additions & 0 deletions docs/source/api_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Recommenders
AsymmetricCosineKNNRecommender
JaccardKNNRecommender
TverskyIndexKNNRecommender
SLIMRecommender

A LightFM wrapper for BPR matrix factorization (requires a separate installation of `lightFM <https://github.com/lyst/lightfm>`_).

Expand Down Expand Up @@ -73,6 +74,7 @@ Optimizers
AsymmetricCosineKNNOptimizer
JaccardKNNOptimizer
TverskyIndexKNNOptimizer
SLIMOptimizer
MultVAEOptimizer

.. currentmodule:: irspack.split
Expand Down
6 changes: 2 additions & 4 deletions examples/movielens/movielens_20m_cold.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
dim_z=200, enc_hidden_dims=600, kl_anneal_goal=0.2
), # nothing to tune, use the parameters used in the paper.
),
# (SLIMOptimizer, 40),
# (SLIMOptimizer, 40, dict()), # Note: this is a heavy one.
]
for optimizer_class, n_trials, config in test_configs:
recommender_name = optimizer_class.recommender_class.__name__
Expand All @@ -98,9 +98,7 @@
metric="ndcg",
fixed_params=config,
)
(best_param, validation_result_df) = optimizer.optimize(
n_trials=n_trials, timeout=14400
)
(best_param, validation_result_df) = optimizer.optimize(n_trials=n_trials)
validation_result_df["recommender_name"] = recommender_name
validation_results.append(validation_result_df)
pd.concat(validation_results).to_csv(f"validation_scores.csv")
Expand Down
4 changes: 2 additions & 2 deletions irspack/optimizers/_optimizers.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ class RandomWalkWithRestartOptimizer(BaseOptimizer):

class SLIMOptimizer(BaseOptimizer):
default_tune_range = [
UniformSuggestion("alpha", 0, 1),
LogUniformSuggestion("l1_ratio", 1e-6, 1),
LogUniformSuggestion("alpha", 1e-5, 1),
UniformSuggestion("l1_ratio", 0, 1),
]
recommender_class = SLIMRecommender

Expand Down
3 changes: 2 additions & 1 deletion irspack/recommenders/ials.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ class IALSRecommender(
Frequency of validation score measurement (if any). Defaults to 5.
score_degradation_max (int, optional):
Maximal number of allowed score degradation. Defaults to 5.
n_threads (Optional[int], optional): Specifies the number of threads to use for the computation.
n_threads (Optional[int], optional):
Specifies the number of threads to use for the computation.
If ``None``, the environment variable ``"IRSPACK_NUM_THREADS_DEFAULT"`` will be looked up,
and if there is no such an environment variable, it will be set to 1. Defaults to None.
max_epoch (int, optional):
Expand Down
99 changes: 65 additions & 34 deletions irspack/recommenders/slim.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,81 @@
from typing import Optional

from scipy import sparse as sps
from sklearn.linear_model import ElasticNet

from ..definitions import InteractionMatrix
from .base import BaseSimilarityRecommender


def slim_weight(X: InteractionMatrix, alpha: float, l1_ratio: float) -> sps.csr_matrix:
model = ElasticNet(
fit_intercept=False,
positive=True,
copy_X=False,
precompute=True,
selection="random",
max_iter=100,
tol=1e-4,
alpha=alpha,
l1_ratio=l1_ratio,
)
coeff_all = []
A: sps.csc_matrix = X.tocsc()
for i in range(X.shape[1]):
if i % 1000 == 0:
print(f"Slim Iteration: {i}")
start_pos = int(A.indptr[i])
end_pos = int(A.indptr[i + 1])
current_item_data_backup = A.data[start_pos:end_pos].copy()
target = A[:, i].toarray().ravel()
A.data[start_pos:end_pos] = 0.0
model.fit(A, target)
coeff_all.append(model.sparse_coef_)
A.data[start_pos:end_pos] = current_item_data_backup
return sps.vstack(coeff_all, format="csr")
from irspack.definitions import InteractionMatrix
from irspack.recommenders.base import BaseSimilarityRecommender
from irspack.utils import get_n_threads
from irspack.utils._util_cpp import (
slim_weight_allow_negative,
slim_weight_positive_only,
)


class SLIMRecommender(BaseSimilarityRecommender):
"""`SLIM <https://dl.acm.org/doi/10.1109/ICDM.2011.134>`_ with ElasticNet-type loss function:
.. math ::
\mathrm{loss} = \\frac{1}{2} ||X - XB|| ^2 _F + \\frac{\\alpha (1 - l_1) U}{2} ||B|| ^2 _FF + \\alpha l_1 U |B|
The implementation relies on a simple (parallelized) cyclic-coordinate descent method.
Currently, this does not support:
- shuffling of item indices
- elaborate convergence check
Args:
X_train_all:
Input interaction matrix.
alpha:
Determines the strength of L1/L2 regularization (see above). Defaults to 0.05.
l1_ratio:
Determines the strength of L1 regularization relative to alpha. Defaults to 0.01.
positive_only:
Whether we constrain the weight matrix to be non-negative. Defaults to True.
n_iter:
The number of coordinate-descent iterations. Defaults to 10.
n_threads:
Specifies the number of threads to use for the computation.
If ``None``, the environment variable ``"IRSPACK_NUM_THREADS_DEFAULT"`` will be looked up,
and if there is no such an environment variable, it will be set to 1. Defaults to None.
"""

def __init__(
self,
X_train_all: InteractionMatrix,
alpha: float = 0.05,
l1_ratio: float = 0.01,
positive_only: bool = True,
n_iter: int = 10,
n_threads: Optional[int] = None,
):
super(SLIMRecommender, self).__init__(X_train_all)
super().__init__(X_train_all)
self.alpha = alpha
self.l1_ratio = l1_ratio
self.positive_only = positive_only
self.n_threads = get_n_threads(n_threads)
self.n_iter = n_iter

def _learn(self) -> None:
self.W_ = slim_weight(
self.X_train_all, alpha=self.alpha, l1_ratio=self.l1_ratio
)
l2_coeff = self.n_users * self.alpha * (1 - self.l1_ratio)
l1_coeff = self.n_users * self.alpha * self.l1_ratio

if self.positive_only:
self.W_ = slim_weight_positive_only(
self.X_train_all,
n_threads=self.n_threads,
n_iter=self.n_iter,
l2_coeff=l2_coeff,
l1_coeff=l1_coeff,
)
else:
self.W_ = slim_weight_allow_negative(
self.X_train_all,
n_threads=self.n_threads,
n_iter=self.n_iter,
l2_coeff=l2_coeff,
l1_coeff=l1_coeff,
)
Loading

0 comments on commit 256d8f1

Please sign in to comment.