Source code for recbole.model.abstract_recommender

# @Time   : 2020/6/25
# @Author : Shanlei Mu
# @Email  : slmu@ruc.edu.cn

# UPDATE:
# @Time   : 2022/7/16, 2020/8/6, 2020/8/25, 2023/4/24
# @Author : Zhen Tian, Shanlei Mu, Yupeng Hou, Chenglong Ma
# @Email  : chenyuwuxinn@gmail.com, slmu@ruc.edu.cn, houyupeng@ruc.edu.cn, chenglong.m@outlook.com

"""
recbole.model.abstract_recommender
##################################
"""

from logging import getLogger

import numpy as np
import torch
import torch.nn as nn

from recbole.model.layers import FMEmbedding, FMFirstOrderLinear, FLEmbedding
from recbole.utils import ModelType, InputType, FeatureSource, FeatureType, set_color


[docs]class AbstractRecommender(nn.Module): r"""Base class for all models""" def __init__(self): self.logger = getLogger() super(AbstractRecommender, self).__init__()
[docs] def calculate_loss(self, interaction): r"""Calculate the training loss for a batch data. Args: interaction (Interaction): Interaction class of the batch. Returns: torch.Tensor: Training loss, shape: [] """ raise NotImplementedError
[docs] def predict(self, interaction): r"""Predict the scores between users and items. Args: interaction (Interaction): Interaction class of the batch. Returns: torch.Tensor: Predicted scores for given users and items, shape: [batch_size] """ raise NotImplementedError
[docs] def full_sort_predict(self, interaction): r"""full sort prediction function. Given users, calculate the scores between users and all candidate items. Args: interaction (Interaction): Interaction class of the batch. Returns: torch.Tensor: Predicted scores for given users and all candidate items, shape: [n_batch_users * n_candidate_items] """ raise NotImplementedError
[docs] def other_parameter(self): if hasattr(self, "other_parameter_name"): return {key: getattr(self, key) for key in self.other_parameter_name} return dict()
[docs] def load_other_parameter(self, para): if para is None: return for key, value in para.items(): setattr(self, key, value)
def __str__(self): """ Model prints with number of trainable parameters """ model_parameters = filter(lambda p: p.requires_grad, self.parameters()) params = sum([np.prod(p.size()) for p in model_parameters]) return ( super().__str__() + set_color("\nTrainable parameters", "blue") + f": {params}" )
[docs]class GeneralRecommender(AbstractRecommender): """This is a abstract general recommender. All the general model should implement this class. The base general recommender class provide the basic dataset and parameters information. """ type = ModelType.GENERAL def __init__(self, config, dataset): super(GeneralRecommender, self).__init__() # load dataset info self.USER_ID = config["USER_ID_FIELD"] self.ITEM_ID = config["ITEM_ID_FIELD"] self.NEG_ITEM_ID = config["NEG_PREFIX"] + self.ITEM_ID self.n_users = dataset.num(self.USER_ID) self.n_items = dataset.num(self.ITEM_ID) # load parameters info self.device = config["device"]
[docs]class AutoEncoderMixin(object): """This is a common part of auto-encoders. All the auto-encoder models should inherit this class, including CDAE, MacridVAE, MultiDAE, MultiVAE, RaCT and RecVAE. The base AutoEncoderMixin class provides basic dataset information and rating matrix function. """
[docs] def build_histroy_items(self, dataset): self.history_item_id, self.history_item_value, _ = dataset.history_item_matrix() self.history_item_id = self.history_item_id.to(self.device) self.history_item_value = self.history_item_value.to(self.device)
[docs] def get_rating_matrix(self, user): r"""Get a batch of user's feature with the user's id and history interaction matrix. Args: user (torch.LongTensor): The input tensor that contains user's id, shape: [batch_size, ] Returns: torch.FloatTensor: The user's feature of a batch of user, shape: [batch_size, n_items] """ # Following lines construct tensor of shape [B,n_items] using the tensor of shape [B,H] col_indices = self.history_item_id[user].flatten() row_indices = torch.arange(user.shape[0]).repeat_interleave( self.history_item_id.shape[1], dim=0 ) rating_matrix = torch.zeros(1, device=self.device).repeat( user.shape[0], self.n_items ) rating_matrix.index_put_( (row_indices, col_indices), self.history_item_value[user].flatten() ) return rating_matrix
[docs]class SequentialRecommender(AbstractRecommender): """ This is a abstract sequential recommender. All the sequential model should implement This class. """ type = ModelType.SEQUENTIAL def __init__(self, config, dataset): super(SequentialRecommender, self).__init__() # load dataset info self.USER_ID = config["USER_ID_FIELD"] self.ITEM_ID = config["ITEM_ID_FIELD"] self.ITEM_SEQ = self.ITEM_ID + config["LIST_SUFFIX"] self.ITEM_SEQ_LEN = config["ITEM_LIST_LENGTH_FIELD"] self.POS_ITEM_ID = self.ITEM_ID self.NEG_ITEM_ID = config["NEG_PREFIX"] + self.ITEM_ID self.max_seq_length = config["MAX_ITEM_LIST_LENGTH"] self.n_items = dataset.num(self.ITEM_ID) # load parameters info self.device = config["device"]
[docs] def gather_indexes(self, output, gather_index): """Gathers the vectors at the specific positions over a minibatch""" gather_index = gather_index.view(-1, 1, 1).expand(-1, -1, output.shape[-1]) output_tensor = output.gather(dim=1, index=gather_index) return output_tensor.squeeze(1)
[docs] def get_attention_mask(self, item_seq, bidirectional=False): """Generate left-to-right uni-directional or bidirectional attention mask for multi-head attention.""" attention_mask = item_seq != 0 extended_attention_mask = attention_mask.unsqueeze(1).unsqueeze(2) # torch.bool if not bidirectional: extended_attention_mask = torch.tril( extended_attention_mask.expand((-1, -1, item_seq.size(-1), -1)) ) extended_attention_mask = torch.where(extended_attention_mask, 0.0, -10000.0) return extended_attention_mask
[docs]class KnowledgeRecommender(AbstractRecommender): """This is a abstract knowledge-based recommender. All the knowledge-based model should implement this class. The base knowledge-based recommender class provide the basic dataset and parameters information. """ type = ModelType.KNOWLEDGE def __init__(self, config, dataset): super(KnowledgeRecommender, self).__init__() # load dataset info self.USER_ID = config["USER_ID_FIELD"] self.ITEM_ID = config["ITEM_ID_FIELD"] self.NEG_ITEM_ID = config["NEG_PREFIX"] + self.ITEM_ID self.ENTITY_ID = config["ENTITY_ID_FIELD"] self.RELATION_ID = config["RELATION_ID_FIELD"] self.HEAD_ENTITY_ID = config["HEAD_ENTITY_ID_FIELD"] self.TAIL_ENTITY_ID = config["TAIL_ENTITY_ID_FIELD"] self.NEG_TAIL_ENTITY_ID = config["NEG_PREFIX"] + self.TAIL_ENTITY_ID self.n_users = dataset.num(self.USER_ID) self.n_items = dataset.num(self.ITEM_ID) self.n_entities = dataset.num(self.ENTITY_ID) self.n_relations = dataset.num(self.RELATION_ID) # load parameters info self.device = config["device"]
[docs]class ContextRecommender(AbstractRecommender): """This is a abstract context-aware recommender. All the context-aware model should implement this class. The base context-aware recommender class provide the basic embedding function of feature fields which also contains a first-order part of feature fields. """ type = ModelType.CONTEXT input_type = InputType.POINTWISE def __init__(self, config, dataset): super(ContextRecommender, self).__init__() self.field_names = dataset.fields( source=[ FeatureSource.INTERACTION, FeatureSource.USER, FeatureSource.USER_ID, FeatureSource.ITEM, FeatureSource.ITEM_ID, ] ) self.LABEL = config["LABEL_FIELD"] self.embedding_size = config["embedding_size"] self.device = config["device"] self.double_tower = config["double_tower"] self.numerical_features = config["numerical_features"] if self.double_tower is None: self.double_tower = False self.token_field_names = [] self.token_field_dims = [] self.float_field_names = [] self.float_field_dims = [] self.token_seq_field_names = [] self.token_seq_field_dims = [] self.float_seq_field_names = [] self.float_seq_field_dims = [] self.num_feature_field = 0 if self.double_tower: self.user_field_names = dataset.fields( source=[FeatureSource.USER, FeatureSource.USER_ID] ) self.item_field_names = dataset.fields( source=[FeatureSource.ITEM, FeatureSource.ITEM_ID] ) self.field_names = self.user_field_names + self.item_field_names self.user_token_field_num = 0 self.user_float_field_num = 0 self.user_token_seq_field_num = 0 for field_name in self.user_field_names: if dataset.field2type[field_name] == FeatureType.TOKEN: self.user_token_field_num += 1 elif dataset.field2type[field_name] == FeatureType.TOKEN_SEQ: self.user_token_seq_field_num += 1 else: self.user_float_field_num += 1 self.item_token_field_num = 0 self.item_float_field_num = 0 self.item_token_seq_field_num = 0 for field_name in self.item_field_names: if dataset.field2type[field_name] == FeatureType.TOKEN: self.item_token_field_num += 1 elif dataset.field2type[field_name] == FeatureType.TOKEN_SEQ: self.item_token_seq_field_num += 1 else: self.item_float_field_num += 1 for field_name in self.field_names: if field_name == self.LABEL: continue if dataset.field2type[field_name] == FeatureType.TOKEN: self.token_field_names.append(field_name) self.token_field_dims.append(dataset.num(field_name)) elif dataset.field2type[field_name] == FeatureType.TOKEN_SEQ: self.token_seq_field_names.append(field_name) self.token_seq_field_dims.append(dataset.num(field_name)) elif ( dataset.field2type[field_name] == FeatureType.FLOAT and field_name in self.numerical_features ): self.float_field_names.append(field_name) self.float_field_dims.append(dataset.num(field_name)) elif ( dataset.field2type[field_name] == FeatureType.FLOAT_SEQ and field_name in self.numerical_features ): self.float_seq_field_names.append(field_name) self.float_seq_field_dims.append(dataset.num(field_name)) else: continue self.num_feature_field += 1 if len(self.token_field_dims) > 0: self.token_field_offsets = np.array( (0, *np.cumsum(self.token_field_dims)[:-1]), dtype=np.long ) self.token_embedding_table = FMEmbedding( self.token_field_dims, self.token_field_offsets, self.embedding_size ) if len(self.float_field_dims) > 0: self.float_field_offsets = np.array( (0, *np.cumsum(self.float_field_dims)[:-1]), dtype=np.long ) self.float_embedding_table = FLEmbedding( self.float_field_dims, self.float_field_offsets, self.embedding_size ) if len(self.token_seq_field_dims) > 0: self.token_seq_embedding_table = nn.ModuleList() for token_seq_field_dim in self.token_seq_field_dims: self.token_seq_embedding_table.append( nn.Embedding(token_seq_field_dim, self.embedding_size) ) if len(self.float_seq_field_dims) > 0: self.float_seq_embedding_table = nn.ModuleList() for float_seq_field_dim in self.float_seq_field_dims: self.float_seq_embedding_table.append( nn.Embedding(float_seq_field_dim, self.embedding_size) ) self.first_order_linear = FMFirstOrderLinear(config, dataset)
[docs] def embed_float_fields(self, float_fields): """Embed the float feature columns Args: float_fields (torch.FloatTensor): The input dense tensor. shape of [batch_size, num_float_field] Returns: torch.FloatTensor: The result embedding tensor of float columns. """ # input Tensor shape : [batch_size, num_float_field] if float_fields is None: return None # [batch_size, num_float_field, embed_dim] float_embedding = self.float_embedding_table(float_fields) return float_embedding
[docs] def embed_float_seq_fields(self, float_seq_fields, mode="mean"): """Embed the float feature columns Args: float_seq_fields (torch.LongTensor): The input tensor. shape of [batch_size, seq_len] mode (str): How to aggregate the embedding of feature in this field. default=mean Returns: torch.FloatTensor: The result embedding tensor of token sequence columns. """ # input is a list of Tensor shape of [batch_size, seq_len, 2] fields_result = [] for i, float_seq_field in enumerate(float_seq_fields): embedding_table = self.float_seq_embedding_table[i] base, index = torch.split(float_seq_field, [1, 1], dim=-1) index = index.squeeze(-1) mask = index != 0 # [batch_size, seq_len] mask = mask.float() value_cnt = torch.sum(mask, dim=1, keepdim=True) # [batch_size, 1] float_seq_embedding = base * embedding_table( index.long() ) # [batch_size, seq_len, embed_dim] mask = mask.unsqueeze(2).expand_as( float_seq_embedding ) # [batch_size, seq_len, embed_dim] if mode == "max": masked_float_seq_embedding = ( float_seq_embedding - (1 - mask) * 1e9 ) # [batch_size, seq_len, embed_dim] result = torch.max( masked_float_seq_embedding, dim=1, keepdim=True ) # [batch_size, 1, embed_dim] elif mode == "sum": masked_float_seq_embedding = float_seq_embedding * mask.float() result = torch.sum( masked_float_seq_embedding, dim=1, keepdim=True ) # [batch_size, 1, embed_dim] else: masked_float_seq_embedding = float_seq_embedding * mask.float() result = torch.sum( masked_float_seq_embedding, dim=1 ) # [batch_size, embed_dim] eps = torch.FloatTensor([1e-8]).to(self.device) result = torch.div(result, value_cnt + eps) # [batch_size, embed_dim] result = result.unsqueeze(1) # [batch_size, 1, embed_dim] fields_result.append(result) if len(fields_result) == 0: return None else: return torch.cat( fields_result, dim=1 ) # [batch_size, num_token_seq_field, embed_dim]
[docs] def embed_token_fields(self, token_fields): """Embed the token feature columns Args: token_fields (torch.LongTensor): The input tensor. shape of [batch_size, num_token_field] Returns: torch.FloatTensor: The result embedding tensor of token columns. """ # input Tensor shape : [batch_size, num_token_field] if token_fields is None: return None # [batch_size, num_token_field, embed_dim] token_embedding = self.token_embedding_table(token_fields) return token_embedding
[docs] def embed_token_seq_fields(self, token_seq_fields, mode="mean"): """Embed the token feature columns Args: token_seq_fields (torch.LongTensor): The input tensor. shape of [batch_size, seq_len] mode (str): How to aggregate the embedding of feature in this field. default=mean Returns: torch.FloatTensor: The result embedding tensor of token sequence columns. """ # input is a list of Tensor shape of [batch_size, seq_len] fields_result = [] for i, token_seq_field in enumerate(token_seq_fields): embedding_table = self.token_seq_embedding_table[i] mask = token_seq_field != 0 # [batch_size, seq_len] mask = mask.float() value_cnt = torch.sum(mask, dim=1, keepdim=True) # [batch_size, 1] token_seq_embedding = embedding_table( token_seq_field ) # [batch_size, seq_len, embed_dim] mask = mask.unsqueeze(2).expand_as( token_seq_embedding ) # [batch_size, seq_len, embed_dim] if mode == "max": masked_token_seq_embedding = ( token_seq_embedding - (1 - mask) * 1e9 ) # [batch_size, seq_len, embed_dim] result = torch.max( masked_token_seq_embedding, dim=1, keepdim=True ) # [batch_size, 1, embed_dim] elif mode == "sum": masked_token_seq_embedding = token_seq_embedding * mask.float() result = torch.sum( masked_token_seq_embedding, dim=1, keepdim=True ) # [batch_size, 1, embed_dim] else: masked_token_seq_embedding = token_seq_embedding * mask.float() result = torch.sum( masked_token_seq_embedding, dim=1 ) # [batch_size, embed_dim] eps = torch.FloatTensor([1e-8]).to(self.device) result = torch.div(result, value_cnt + eps) # [batch_size, embed_dim] result = result.unsqueeze(1) # [batch_size, 1, embed_dim] fields_result.append(result) if len(fields_result) == 0: return None else: return torch.cat( fields_result, dim=1 ) # [batch_size, num_token_seq_field, embed_dim]
[docs] def double_tower_embed_input_fields(self, interaction): """Embed the whole feature columns in a double tower way. Args: interaction (Interaction): The input data collection. Returns: torch.FloatTensor: The embedding tensor of token sequence columns in the first part. torch.FloatTensor: The embedding tensor of float sequence columns in the first part. torch.FloatTensor: The embedding tensor of token sequence columns in the second part. torch.FloatTensor: The embedding tensor of float sequence columns in the second part. """ if not self.double_tower: raise RuntimeError( "Please check your model hyper parameters and set 'double tower' as True" ) sparse_embedding, dense_embedding = self.embed_input_fields(interaction) if dense_embedding is not None: first_dense_embedding, second_dense_embedding = torch.split( dense_embedding, [self.user_float_field_num, self.item_float_field_num], dim=1, ) else: first_dense_embedding, second_dense_embedding = None, None if sparse_embedding is not None: sizes = [ self.user_token_seq_field_num, self.item_token_seq_field_num, self.user_token_field_num, self.item_token_field_num, ] ( first_token_seq_embedding, second_token_seq_embedding, first_token_embedding, second_token_embedding, ) = torch.split(sparse_embedding, sizes, dim=1) first_sparse_embedding = torch.cat( [first_token_seq_embedding, first_token_embedding], dim=1 ) second_sparse_embedding = torch.cat( [second_token_seq_embedding, second_token_embedding], dim=1 ) else: first_sparse_embedding, second_sparse_embedding = None, None return ( first_sparse_embedding, first_dense_embedding, second_sparse_embedding, second_dense_embedding, )
[docs] def concat_embed_input_fields(self, interaction): sparse_embedding, dense_embedding = self.embed_input_fields(interaction) all_embeddings = [] if sparse_embedding is not None: all_embeddings.append(sparse_embedding) if dense_embedding is not None and len(dense_embedding.shape) == 3: all_embeddings.append(dense_embedding) return torch.cat(all_embeddings, dim=1) # [batch_size, num_field, embed_dim]
[docs] def embed_input_fields(self, interaction): """Embed the whole feature columns. Args: interaction (Interaction): The input data collection. Returns: torch.FloatTensor: The embedding tensor of token sequence columns. torch.FloatTensor: The embedding tensor of float sequence columns. """ float_fields = [] for field_name in self.float_field_names: if len(interaction[field_name].shape) == 3: float_fields.append(interaction[field_name]) else: float_fields.append(interaction[field_name].unsqueeze(1)) if len(float_fields) > 0: float_fields = torch.cat( float_fields, dim=1 ) # [batch_size, num_float_field, 2] else: float_fields = None # [batch_size, num_float_field] or [batch_size, num_float_field, embed_dim] or None float_fields_embedding = self.embed_float_fields(float_fields) float_seq_fields = [] for field_name in self.float_seq_field_names: float_seq_fields.append(interaction[field_name]) float_seq_fields_embedding = self.embed_float_seq_fields(float_seq_fields) if float_fields_embedding is None: dense_embedding = float_seq_fields_embedding else: if float_seq_fields_embedding is None: dense_embedding = float_fields_embedding else: dense_embedding = torch.cat( [float_seq_fields_embedding, float_fields_embedding], dim=1 ) token_fields = [] for field_name in self.token_field_names: token_fields.append(interaction[field_name].unsqueeze(1)) if len(token_fields) > 0: token_fields = torch.cat( token_fields, dim=1 ) # [batch_size, num_token_field, 2] else: token_fields = None # [batch_size, num_token_field, embed_dim] or None token_fields_embedding = self.embed_token_fields(token_fields) token_seq_fields = [] for field_name in self.token_seq_field_names: token_seq_fields.append(interaction[field_name]) # [batch_size, num_token_seq_field, embed_dim] or None token_seq_fields_embedding = self.embed_token_seq_fields(token_seq_fields) if token_fields_embedding is None: sparse_embedding = token_seq_fields_embedding else: if token_seq_fields_embedding is None: sparse_embedding = token_fields_embedding else: sparse_embedding = torch.cat( [token_seq_fields_embedding, token_fields_embedding], dim=1 ) # sparse_embedding shape: [batch_size, num_token_seq_field+num_token_field, embed_dim] or None # dense_embedding shape: [batch_size, num_float_field, 2] or [batch_size, num_float_field, embed_dim] or None return sparse_embedding, dense_embedding