Source code for recbole.model.abstract_recommender

# @Time   : 2020/6/25
# @Author : Shanlei Mu
# @Email  : slmu@ruc.edu.cn

# UPDATE:
# @Time   : 2020/8/6, 2020/8/25
# @Author : Shanlei Mu, Yupeng Hou
# @Email  : slmu@ruc.edu.cn, houyupeng@ruc.edu.cn

"""
recbole.model.abstract_recommender
##################################
"""

from logging import getLogger

import numpy as np
import torch
import torch.nn as nn

from recbole.model.layers import FMEmbedding, FMFirstOrderLinear
from recbole.utils import ModelType, InputType, FeatureSource, FeatureType, set_color


[docs]class AbstractRecommender(nn.Module): r"""Base class for all models """ def __init__(self): self.logger = getLogger() super(AbstractRecommender, self).__init__()
[docs] def calculate_loss(self, interaction): r"""Calculate the training loss for a batch data. Args: interaction (Interaction): Interaction class of the batch. Returns: torch.Tensor: Training loss, shape: [] """ raise NotImplementedError
[docs] def predict(self, interaction): r"""Predict the scores between users and items. Args: interaction (Interaction): Interaction class of the batch. Returns: torch.Tensor: Predicted scores for given users and items, shape: [batch_size] """ raise NotImplementedError
[docs] def full_sort_predict(self, interaction): r"""full sort prediction function. Given users, calculate the scores between users and all candidate items. Args: interaction (Interaction): Interaction class of the batch. Returns: torch.Tensor: Predicted scores for given users and all candidate items, shape: [n_batch_users * n_candidate_items] """ raise NotImplementedError
[docs] def other_parameter(self): if hasattr(self, 'other_parameter_name'): return {key: getattr(self, key) for key in self.other_parameter_name} return dict()
[docs] def load_other_parameter(self, para): if para is None: return for key, value in para.items(): setattr(self, key, value)
def __str__(self): """ Model prints with number of trainable parameters """ model_parameters = filter(lambda p: p.requires_grad, self.parameters()) params = sum([np.prod(p.size()) for p in model_parameters]) return super().__str__() + set_color('\nTrainable parameters', 'blue') + f': {params}'
[docs]class GeneralRecommender(AbstractRecommender): """This is a abstract general recommender. All the general model should implement this class. The base general recommender class provide the basic dataset and parameters information. """ type = ModelType.GENERAL def __init__(self, config, dataset): super(GeneralRecommender, self).__init__() # load dataset info self.USER_ID = config['USER_ID_FIELD'] self.ITEM_ID = config['ITEM_ID_FIELD'] self.NEG_ITEM_ID = config['NEG_PREFIX'] + self.ITEM_ID self.n_users = dataset.num(self.USER_ID) self.n_items = dataset.num(self.ITEM_ID) # load parameters info self.device = config['device']
[docs]class SequentialRecommender(AbstractRecommender): """ This is a abstract sequential recommender. All the sequential model should implement This class. """ type = ModelType.SEQUENTIAL def __init__(self, config, dataset): super(SequentialRecommender, self).__init__() # load dataset info self.USER_ID = config['USER_ID_FIELD'] self.ITEM_ID = config['ITEM_ID_FIELD'] self.ITEM_SEQ = self.ITEM_ID + config['LIST_SUFFIX'] self.ITEM_SEQ_LEN = config['ITEM_LIST_LENGTH_FIELD'] self.POS_ITEM_ID = self.ITEM_ID self.NEG_ITEM_ID = config['NEG_PREFIX'] + self.ITEM_ID self.max_seq_length = config['MAX_ITEM_LIST_LENGTH'] self.n_items = dataset.num(self.ITEM_ID)
[docs] def gather_indexes(self, output, gather_index): """Gathers the vectors at the specific positions over a minibatch""" gather_index = gather_index.view(-1, 1, 1).expand(-1, -1, output.shape[-1]) output_tensor = output.gather(dim=1, index=gather_index) return output_tensor.squeeze(1)
[docs]class KnowledgeRecommender(AbstractRecommender): """This is a abstract knowledge-based recommender. All the knowledge-based model should implement this class. The base knowledge-based recommender class provide the basic dataset and parameters information. """ type = ModelType.KNOWLEDGE def __init__(self, config, dataset): super(KnowledgeRecommender, self).__init__() # load dataset info self.USER_ID = config['USER_ID_FIELD'] self.ITEM_ID = config['ITEM_ID_FIELD'] self.NEG_ITEM_ID = config['NEG_PREFIX'] + self.ITEM_ID self.ENTITY_ID = config['ENTITY_ID_FIELD'] self.RELATION_ID = config['RELATION_ID_FIELD'] self.HEAD_ENTITY_ID = config['HEAD_ENTITY_ID_FIELD'] self.TAIL_ENTITY_ID = config['TAIL_ENTITY_ID_FIELD'] self.NEG_TAIL_ENTITY_ID = config['NEG_PREFIX'] + self.TAIL_ENTITY_ID self.n_users = dataset.num(self.USER_ID) self.n_items = dataset.num(self.ITEM_ID) self.n_entities = dataset.num(self.ENTITY_ID) self.n_relations = dataset.num(self.RELATION_ID) # load parameters info self.device = config['device']
[docs]class ContextRecommender(AbstractRecommender): """This is a abstract context-aware recommender. All the context-aware model should implement this class. The base context-aware recommender class provide the basic embedding function of feature fields which also contains a first-order part of feature fields. """ type = ModelType.CONTEXT input_type = InputType.POINTWISE def __init__(self, config, dataset): super(ContextRecommender, self).__init__() self.field_names = dataset.fields( source=[ FeatureSource.INTERACTION, FeatureSource.USER, FeatureSource.USER_ID, FeatureSource.ITEM, FeatureSource.ITEM_ID, ] ) self.LABEL = config['LABEL_FIELD'] self.embedding_size = config['embedding_size'] self.device = config['device'] self.double_tower = config['double_tower'] if self.double_tower is None: self.double_tower = False self.token_field_names = [] self.token_field_dims = [] self.float_field_names = [] self.float_field_dims = [] self.token_seq_field_names = [] self.token_seq_field_dims = [] self.num_feature_field = 0 if self.double_tower: self.user_field_names = dataset.fields(source=[FeatureSource.USER, FeatureSource.USER_ID]) self.item_field_names = dataset.fields(source=[FeatureSource.ITEM, FeatureSource.ITEM_ID]) self.field_names = self.user_field_names + self.item_field_names self.user_token_field_num = 0 self.user_float_field_num = 0 self.user_token_seq_field_num = 0 for field_name in self.user_field_names: if dataset.field2type[field_name] == FeatureType.TOKEN: self.user_token_field_num += 1 elif dataset.field2type[field_name] == FeatureType.TOKEN_SEQ: self.user_token_seq_field_num += 1 else: self.user_float_field_num += dataset.num(field_name) self.item_token_field_num = 0 self.item_float_field_num = 0 self.item_token_seq_field_num = 0 for field_name in self.item_field_names: if dataset.field2type[field_name] == FeatureType.TOKEN: self.item_token_field_num += 1 elif dataset.field2type[field_name] == FeatureType.TOKEN_SEQ: self.item_token_seq_field_num += 1 else: self.item_float_field_num += dataset.num(field_name) for field_name in self.field_names: if field_name == self.LABEL: continue if dataset.field2type[field_name] == FeatureType.TOKEN: self.token_field_names.append(field_name) self.token_field_dims.append(dataset.num(field_name)) elif dataset.field2type[field_name] == FeatureType.TOKEN_SEQ: self.token_seq_field_names.append(field_name) self.token_seq_field_dims.append(dataset.num(field_name)) else: self.float_field_names.append(field_name) self.float_field_dims.append(dataset.num(field_name)) self.num_feature_field += 1 if len(self.token_field_dims) > 0: self.token_field_offsets = np.array((0, *np.cumsum(self.token_field_dims)[:-1]), dtype=np.long) self.token_embedding_table = FMEmbedding( self.token_field_dims, self.token_field_offsets, self.embedding_size ) if len(self.float_field_dims) > 0: self.float_embedding_table = nn.Embedding( np.sum(self.float_field_dims, dtype=np.int32), self.embedding_size ) if len(self.token_seq_field_dims) > 0: self.token_seq_embedding_table = nn.ModuleList() for token_seq_field_dim in self.token_seq_field_dims: self.token_seq_embedding_table.append(nn.Embedding(token_seq_field_dim, self.embedding_size)) self.first_order_linear = FMFirstOrderLinear(config, dataset)
[docs] def embed_float_fields(self, float_fields, embed=True): """Embed the float feature columns Args: float_fields (torch.FloatTensor): The input dense tensor. shape of [batch_size, num_float_field] embed (bool): Return the embedding of columns or just the columns itself. Defaults to ``True``. Returns: torch.FloatTensor: The result embedding tensor of float columns. """ # input Tensor shape : [batch_size, num_float_field] if not embed or float_fields is None: return float_fields num_float_field = float_fields.shape[1] # [batch_size, num_float_field] index = torch.arange(0, num_float_field).unsqueeze(0).expand_as(float_fields).long().to(self.device) # [batch_size, num_float_field, embed_dim] float_embedding = self.float_embedding_table(index) float_embedding = torch.mul(float_embedding, float_fields.unsqueeze(2)) return float_embedding
[docs] def embed_token_fields(self, token_fields): """Embed the token feature columns Args: token_fields (torch.LongTensor): The input tensor. shape of [batch_size, num_token_field] Returns: torch.FloatTensor: The result embedding tensor of token columns. """ # input Tensor shape : [batch_size, num_token_field] if token_fields is None: return None # [batch_size, num_token_field, embed_dim] token_embedding = self.token_embedding_table(token_fields) return token_embedding
[docs] def embed_token_seq_fields(self, token_seq_fields, mode='mean'): """Embed the token feature columns Args: token_seq_fields (torch.LongTensor): The input tensor. shape of [batch_size, seq_len] mode (str): How to aggregate the embedding of feature in this field. default=mean Returns: torch.FloatTensor: The result embedding tensor of token sequence columns. """ # input is a list of Tensor shape of [batch_size, seq_len] fields_result = [] for i, token_seq_field in enumerate(token_seq_fields): embedding_table = self.token_seq_embedding_table[i] mask = token_seq_field != 0 # [batch_size, seq_len] mask = mask.float() value_cnt = torch.sum(mask, dim=1, keepdim=True) # [batch_size, 1] token_seq_embedding = embedding_table(token_seq_field) # [batch_size, seq_len, embed_dim] mask = mask.unsqueeze(2).expand_as(token_seq_embedding) # [batch_size, seq_len, embed_dim] if mode == 'max': masked_token_seq_embedding = token_seq_embedding - (1 - mask) * 1e9 # [batch_size, seq_len, embed_dim] result = torch.max(masked_token_seq_embedding, dim=1, keepdim=True) # [batch_size, 1, embed_dim] elif mode == 'sum': masked_token_seq_embedding = token_seq_embedding * mask.float() result = torch.sum(masked_token_seq_embedding, dim=1, keepdim=True) # [batch_size, 1, embed_dim] else: masked_token_seq_embedding = token_seq_embedding * mask.float() result = torch.sum(masked_token_seq_embedding, dim=1) # [batch_size, embed_dim] eps = torch.FloatTensor([1e-8]).to(self.device) result = torch.div(result, value_cnt + eps) # [batch_size, embed_dim] result = result.unsqueeze(1) # [batch_size, 1, embed_dim] fields_result.append(result) if len(fields_result) == 0: return None else: return torch.cat(fields_result, dim=1) # [batch_size, num_token_seq_field, embed_dim]
[docs] def double_tower_embed_input_fields(self, interaction): """Embed the whole feature columns in a double tower way. Args: interaction (Interaction): The input data collection. Returns: torch.FloatTensor: The embedding tensor of token sequence columns in the first part. torch.FloatTensor: The embedding tensor of float sequence columns in the first part. torch.FloatTensor: The embedding tensor of token sequence columns in the second part. torch.FloatTensor: The embedding tensor of float sequence columns in the second part. """ if not self.double_tower: raise RuntimeError('Please check your model hyper parameters and set \'double tower\' as True') sparse_embedding, dense_embedding = self.embed_input_fields(interaction) if dense_embedding is not None: first_dense_embedding, second_dense_embedding = \ torch.split(dense_embedding, [self.user_float_field_num, self.item_float_field_num], dim=1) else: first_dense_embedding, second_dense_embedding = None, None if sparse_embedding is not None: sizes = [ self.user_token_seq_field_num, self.item_token_seq_field_num, self.user_token_field_num, self.item_token_field_num ] first_token_seq_embedding, second_token_seq_embedding, first_token_embedding, second_token_embedding = \ torch.split(sparse_embedding, sizes, dim=1) first_sparse_embedding = torch.cat([first_token_seq_embedding, first_token_embedding], dim=1) second_sparse_embedding = torch.cat([second_token_seq_embedding, second_token_embedding], dim=1) else: first_sparse_embedding, second_sparse_embedding = None, None return first_sparse_embedding, first_dense_embedding, second_sparse_embedding, second_dense_embedding
[docs] def concat_embed_input_fields(self, interaction): sparse_embedding, dense_embedding = self.embed_input_fields(interaction) all_embeddings = [] if sparse_embedding is not None: all_embeddings.append(sparse_embedding) if dense_embedding is not None and len(dense_embedding.shape) == 3: all_embeddings.append(dense_embedding) return torch.cat(all_embeddings, dim=1) # [batch_size, num_field, embed_dim]
[docs] def embed_input_fields(self, interaction): """Embed the whole feature columns. Args: interaction (Interaction): The input data collection. Returns: torch.FloatTensor: The embedding tensor of token sequence columns. torch.FloatTensor: The embedding tensor of float sequence columns. """ float_fields = [] for field_name in self.float_field_names: if len(interaction[field_name].shape) == 2: float_fields.append(interaction[field_name]) else: float_fields.append(interaction[field_name].unsqueeze(1)) if len(float_fields) > 0: float_fields = torch.cat(float_fields, dim=1) # [batch_size, num_float_field] else: float_fields = None # [batch_size, num_float_field] or [batch_size, num_float_field, embed_dim] or None float_fields_embedding = self.embed_float_fields(float_fields) token_fields = [] for field_name in self.token_field_names: token_fields.append(interaction[field_name].unsqueeze(1)) if len(token_fields) > 0: token_fields = torch.cat(token_fields, dim=1) # [batch_size, num_token_field] else: token_fields = None # [batch_size, num_token_field, embed_dim] or None token_fields_embedding = self.embed_token_fields(token_fields) token_seq_fields = [] for field_name in self.token_seq_field_names: token_seq_fields.append(interaction[field_name]) # [batch_size, num_token_seq_field, embed_dim] or None token_seq_fields_embedding = self.embed_token_seq_fields(token_seq_fields) if token_fields_embedding is None: sparse_embedding = token_seq_fields_embedding else: if token_seq_fields_embedding is None: sparse_embedding = token_fields_embedding else: sparse_embedding = torch.cat([token_fields_embedding, token_seq_fields_embedding], dim=1) dense_embedding = float_fields_embedding # sparse_embedding shape: [batch_size, num_token_seq_field+num_token_field, embed_dim] or None # dense_embedding shape: [batch_size, num_float_field] or [batch_size, num_float_field, embed_dim] or None return sparse_embedding, dense_embedding