Skip to content

Commit

Permalink
Improve id mapping (#87)
Browse files Browse the repository at this point in the history
* Use typervar

* allowed_item_ids and per_user_item_ids

* Allowed_item_ids can be set uniformly

* pre-commit should run on every push

* improve test

* Run isort

Co-authored-by: tohtsky <[email protected]>
  • Loading branch information
tohtsky and tohtsky authored Nov 1, 2021
1 parent 2b3328e commit 33a9877
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 47 deletions.
1 change: 0 additions & 1 deletion .github/workflows/pre-commit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ name: pre-commit
on:
pull_request:
push:
branches: [master]
jobs:
pre-commit:
runs-on: ubuntu-latest
Expand Down
13 changes: 11 additions & 2 deletions cpp_source/util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ retrieve_recommend_from_score(
check_arg(n_threads > 0, "n_threads must not be 0.");
check_arg(
(score.rows() == static_cast<int64_t>(allowed_indices.size())) ||
allowed_indices.empty(),
(allowed_indices.size() == 1u) || allowed_indices.empty(),
"allowed_indices, if not empty, must have a size equal to X.rows()");
std::vector<std::vector<score_and_index>> result(score.rows());
std::vector<std::future<void>> workers;
Expand All @@ -458,7 +458,16 @@ retrieve_recommend_from_score(

index_holder.clear();
if (!allowed_indices.empty()) {
for (auto item_index : allowed_indices.at(current)) {
std::vector<int64_t>::const_iterator begin, end;
if (allowed_indices.size() == 1u) {
begin = allowed_indices[0].cbegin();
end = allowed_indices[0].cend();
} else {
begin = allowed_indices.at(current).cbegin();
end = allowed_indices.at(current).cend();
}
for (; begin != end; begin++) {
auto item_index = *begin;
if ((item_index < n_items) && (item_index >= 0)) {
index_holder.emplace_back(item_index, score_ptr[item_index]);
}
Expand Down
140 changes: 97 additions & 43 deletions irspack/utils/id_mapping.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from typing import (
TYPE_CHECKING,
Any,
Dict,
Generic,
Iterable,
List,
Optional,
Sequence,
Tuple,
TypeVar,
Union,
)

Expand Down Expand Up @@ -43,7 +44,11 @@ def retrieve_recommend_from_score(
raise ValueError("Only float32 or float64 are allowed.")


class IDMappedRecommender:
UserIdType = TypeVar("UserIdType")
ItemIdType = TypeVar("ItemIdType")


class IDMappedRecommender(Generic[UserIdType, ItemIdType]):
"""A utility class that helps mapping user/item ids to index, retrieving recommendation score,
and making a recommendation.
Expand All @@ -63,7 +68,10 @@ class IDMappedRecommender:
"""

def __init__(
self, recommender: "BaseRecommender", user_ids: List[Any], item_ids: List[Any]
self,
recommender: "BaseRecommender",
user_ids: List[UserIdType],
item_ids: List[ItemIdType],
):

if (recommender.n_users != len(user_ids)) or (
Expand All @@ -79,11 +87,11 @@ def __init__(
self.user_id_to_index = {user_id: i for i, user_id in enumerate(user_ids)}
self.item_id_to_index = {item_id: i for i, item_id in enumerate(item_ids)}

def _item_id_list_to_index_list(self, ids: Iterable[Any]) -> List[int]:
def _item_id_list_to_index_list(self, ids: Iterable[ItemIdType]) -> List[int]:
return [self.item_id_to_index[id] for id in ids if id in self.item_id_to_index]

def _user_profile_to_data_col(
self, profile: Union[List[Any], Dict[Any, float]]
self, profile: Union[List[ItemIdType], Dict[ItemIdType, float]]
) -> Tuple[List[float], List[int]]:
data: List[float]
cols: List[int]
Expand All @@ -101,7 +109,7 @@ def _user_profile_to_data_col(
return data, cols

def _list_of_user_profile_to_matrix(
self, users_info: Sequence[Union[List[Any], Dict[Any, float]]]
self, users_info: Sequence[Union[List[ItemIdType], Dict[ItemIdType, float]]]
) -> sps.csr_matrix:
data: List[float] = []
indptr: List[int] = [0]
Expand All @@ -120,11 +128,11 @@ def _list_of_user_profile_to_matrix(

def get_recommendation_for_known_user_id(
self,
user_id: Any,
user_id: UserIdType,
cutoff: int = 20,
allowed_item_ids: Optional[List[Any]] = None,
forbidden_item_ids: Optional[List[Any]] = None,
) -> List[Tuple[Any, float]]:
allowed_item_ids: Optional[List[ItemIdType]] = None,
forbidden_item_ids: Optional[List[ItemIdType]] = None,
) -> List[Tuple[ItemIdType, float]]:
"""Retrieve recommendation result for a known user.
Args:
user_id:
Expand All @@ -151,7 +159,7 @@ def get_recommendation_for_known_user_id(
)

score = self.recommender.get_score_remove_seen(user_index)[0, :]
return self._score_to_recommended_items(
return self.score_to_recommended_items(
score,
cutoff=cutoff,
allowed_item_ids=allowed_item_ids,
Expand All @@ -160,12 +168,13 @@ def get_recommendation_for_known_user_id(

def get_recommendation_for_known_user_batch(
self,
user_ids: List[Any],
user_ids: List[UserIdType],
cutoff: int = 20,
allowed_item_ids: Optional[List[List[Any]]] = None,
forbidden_item_ids: Optional[List[List[Any]]] = None,
allowed_item_ids: Optional[List[ItemIdType]] = None,
per_user_allowed_item_ids: Optional[List[List[ItemIdType]]] = None,
forbidden_item_ids: Optional[List[List[ItemIdType]]] = None,
n_threads: Optional[int] = None,
) -> List[List[Tuple[Any, float]]]:
) -> List[List[Tuple[ItemIdType, float]]]:
"""Retrieve recommendation result for a list of known users.
Args:
Expand All @@ -174,13 +183,21 @@ def get_recommendation_for_known_user_batch(
cutoff:
Maximal number of recommendations allowed.
allowed_item_ids:
If not ``None``, defines "a list of recommendable item IDs".
Ignored if `per_user_allowed_item_ids` is set.
per_user_allowed_item_ids:
If not ``None``, defines "a list of list of recommendable item IDs"
and ``len(allowed_item_ids)`` must be equal to ``len(item_ids)``.
and ``len(allowed_item_ids)`` must be equal to ``score.shape[0]``.
Defaults to ``None``.
forbidden_item_ids:
If not ``None``, defines "a list of list of forbidden item IDs"
and ``len(allowed_item_ids)`` must be equal to ``len(item_ids)``
Defaults to ``None``.
n_threads:
Specifies the number of threads to use for the computation.
If ``None``, the environment variable ``"IRSPACK_NUM_THREADS_DEFAULT"`` will be looked up,
and if the variable is not set, it will be set to ``os.cpu_count()``. Defaults to None.
Returns:
A list of list of tuples consisting of ``(item_id, score)``.
Expand All @@ -191,21 +208,22 @@ def get_recommendation_for_known_user_batch(
)

score = self.recommender.get_score_remove_seen(user_indexes)
return self._score_to_recommended_items_batch(
return self.score_to_recommended_items_batch(
score,
cutoff=cutoff,
allowed_item_ids=allowed_item_ids,
per_user_allowed_item_ids=per_user_allowed_item_ids,
forbidden_item_ids=forbidden_item_ids,
n_threads=get_n_threads(n_threads=n_threads),
)

def get_recommendation_for_new_user(
self,
user_profile: Union[List[Any], Dict[Any, float]],
user_profile: Union[List[ItemIdType], Dict[ItemIdType, float]],
cutoff: int = 20,
allowed_item_ids: Optional[List[Any]] = None,
forbidden_item_ids: Optional[List[Any]] = None,
) -> List[Tuple[Any, float]]:
allowed_item_ids: Optional[List[ItemIdType]] = None,
forbidden_item_ids: Optional[List[ItemIdType]] = None,
) -> List[Tuple[ItemIdType, float]]:
"""Retrieve recommendation result for a previously unseen user using item ids with which he or she interacted.
Args:
Expand All @@ -229,7 +247,7 @@ def get_recommendation_for_new_user(
(data, cols, [0, len(cols)]), shape=(1, len(self.item_ids))
)
score = self.recommender.get_score_cold_user_remove_seen(X_input)[0]
return self._score_to_recommended_items(
return self.score_to_recommended_items(
score,
cutoff,
allowed_item_ids=allowed_item_ids,
Expand All @@ -238,12 +256,13 @@ def get_recommendation_for_new_user(

def get_recommendation_for_new_user_batch(
self,
user_profiles: Sequence[Union[List[Any], Dict[Any, float]]],
user_profiles: Sequence[Union[List[ItemIdType], Dict[ItemIdType, float]]],
cutoff: int = 20,
allowed_item_ids: Optional[List[List[Any]]] = None,
forbidden_item_ids: Optional[List[List[Any]]] = None,
allowed_item_ids: Optional[List[ItemIdType]] = None,
per_user_allowed_item_ids: Optional[List[List[ItemIdType]]] = None,
forbidden_item_ids: Optional[List[List[ItemIdType]]] = None,
n_threads: Optional[int] = None,
) -> List[List[Tuple[Any, float]]]:
) -> List[List[Tuple[ItemIdType, float]]]:
"""Retrieve recommendation result for a previously unseen users using item ids with which they have interacted.
Args:
Expand All @@ -254,35 +273,43 @@ def get_recommendation_for_new_user_batch(
cutoff:
Maximal number of recommendations allowed.
allowed_item_ids:
If not ``None``, defines "a list of recommendable item IDs".
Ignored if `per_user_allowed_item_ids` is set.
per_user_allowed_item_ids:
If not ``None``, defines "a list of list of recommendable item IDs"
and ``len(allowed_item_ids)`` must be equal to ``len(item_ids)``.
and ``len(allowed_item_ids)`` must be equal to ``score.shape[0]``.
Defaults to ``None``.
forbidden_item_ids:
If not ``None``, defines "a list of list of forbidden item IDs"
and ``len(allowed_item_ids)`` must be equal to ``len(item_ids)``
Defaults to ``None``.
n_threads:
Specifies the number of threads to use for the computation.
If ``None``, the environment variable ``"IRSPACK_NUM_THREADS_DEFAULT"`` will be looked up,
and if the variable is not set, it will be set to ``os.cpu_count()``. Defaults to None.
Returns:
A list of list of tuples consisting of ``(item_id, score)``.
Each internal list corresponds to the recommender's recommendation output.
"""
X_input = self._list_of_user_profile_to_matrix(user_profiles)
score = self.recommender.get_score_cold_user_remove_seen(X_input)
return self._score_to_recommended_items_batch(
return self.score_to_recommended_items_batch(
score,
cutoff,
allowed_item_ids=allowed_item_ids,
per_user_allowed_item_ids=per_user_allowed_item_ids,
forbidden_item_ids=forbidden_item_ids,
n_threads=get_n_threads(n_threads=n_threads),
)

def _score_to_recommended_items(
def score_to_recommended_items(
self,
score: DenseScoreArray,
cutoff: int,
allowed_item_ids: Optional[List[Any]] = None,
forbidden_item_ids: Optional[List[Any]] = None,
) -> List[Tuple[Any, float]]:
allowed_item_ids: Optional[List[ItemIdType]] = None,
forbidden_item_ids: Optional[List[ItemIdType]] = None,
) -> List[Tuple[ItemIdType, float]]:
if allowed_item_ids is not None:
allowed_item_indices = np.asarray(
self._item_id_list_to_index_list(allowed_item_ids), dtype=np.int64
Expand All @@ -292,7 +319,7 @@ def _score_to_recommended_items(
]
else:
high_score_inds = score.argsort()[::-1]
recommendations: List[Tuple[Any, float]] = []
recommendations: List[Tuple[ItemIdType, float]] = []
for i in high_score_inds:
i_int = int(i)
score_this = score[i_int]
Expand All @@ -307,24 +334,51 @@ def _score_to_recommended_items(
break
return recommendations

def _score_to_recommended_items_batch(
def score_to_recommended_items_batch(
self,
score: DenseScoreArray,
cutoff: int,
allowed_item_ids: Optional[List[List[Any]]] = None,
forbidden_item_ids: Optional[List[List[Any]]] = None,
n_threads: int = 1,
) -> List[List[Tuple[Any, float]]]:
allowed_item_ids: Optional[List[ItemIdType]] = None,
per_user_allowed_item_ids: Optional[List[List[ItemIdType]]] = None,
forbidden_item_ids: Optional[List[List[ItemIdType]]] = None,
n_threads: Optional[int] = None,
) -> List[List[Tuple[ItemIdType, float]]]:
r"""Retrieve recommendation from score array.
Args:
score:
1d numpy ndarray for score.
cutoff:
Maximal number of recommendations allowed.
allowed_item_ids:
If not ``None``, defines "a list of recommendable item IDs".
Ignored if `per_user_allowed_item_ids` is set.
per_user_allowed_item_ids:
If not ``None``, defines "a list of list of recommendable item IDs"
and ``len(allowed_item_ids)`` must be equal to ``score.shape[0]``.
Defaults to ``None``.
allowed_item_ids:
If not ``None``, defines "a list of list of recommendable item IDs"
and ``len(allowed_item_ids)`` must be equal to ``len(item_ids)``.
Defaults to ``None``.
forbidden_item_ids:
If not ``None``, defines "a list of list of forbidden item IDs"
and ``len(allowed_item_ids)`` must be equal to ``len(item_ids)``
Defaults to ``None``.
"""

if forbidden_item_ids is not None:
assert len(forbidden_item_ids) == score.shape[0]
if allowed_item_ids is not None:
assert len(allowed_item_ids) == score.shape[0]
if per_user_allowed_item_ids is not None:
assert len(per_user_allowed_item_ids) == score.shape[0]

allowed_item_indices: List[List[int]] = []
if allowed_item_ids is not None:
if per_user_allowed_item_ids is not None:
allowed_item_indices = [
self._item_id_list_to_index_list(_) for _ in allowed_item_ids
self._item_id_list_to_index_list(_) for _ in per_user_allowed_item_ids
]
elif allowed_item_ids is not None:
allowed_item_indices = [self._item_id_list_to_index_list(allowed_item_ids)]
if forbidden_item_ids is not None:
for u, forbidden_ids_per_user in enumerate(forbidden_item_ids):
score[
Expand All @@ -335,7 +389,7 @@ def _score_to_recommended_items_batch(
score,
allowed_item_indices,
cutoff,
n_threads=n_threads,
n_threads=get_n_threads(n_threads),
)
return [
[
Expand Down
18 changes: 17 additions & 1 deletion tests/utils/test_id_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ def test_basic_usecase(dtype: str) -> None:
nonzero_batch,
cutoff=n_items,
forbidden_item_ids=forbidden_items_batch,
allowed_item_ids=allowed_items_batch,
per_user_allowed_item_ids=allowed_items_batch,
n_threads=1,
)
assert len(batch_result_masked_restricted) == n_users
Expand Down Expand Up @@ -256,3 +256,19 @@ def test_basic_usecase(dtype: str) -> None:
softmax_denom = X.shape[1] - nnz + np.exp(2) * nnz
for _, score in result:
assert score == pytest.approx(1 / softmax_denom)

allowed_items_uniform = [str(x) for x in RNS.choice(item_ids, size=2)]
batch_result_masked_uniform_allowed_ids = (
mapped_rec.get_recommendation_for_new_user_batch(
nonzero_batch,
cutoff=n_items,
allowed_item_ids=allowed_items_uniform,
n_threads=1,
)
)
cnt = 0
for x in batch_result_masked_uniform_allowed_ids:
for rec_id, score in x:
assert rec_id in allowed_items_uniform
cnt += 1
assert cnt > 0

0 comments on commit 33a9877

Please sign in to comment.