Source code for recbole.model.layers

# -*- coding: utf-8 -*-
# @Time   : 2020/6/27 16:40
# @Author : Shanlei Mu
# @Email  : slmu@ruc.edu.cn
# @File   : layers.py

# UPDATE:
# @Time   : 2020/8/24 14:58, 2020/9/16, 2020/9/21, 2020/10/9
# @Author : Yujie Lu, Xingyu Pan, Zhichao Feng, Hui Wang
# @Email  : yujielu1998@gmail.com, panxy@ruc.edu.cn, fzcbupt@gmail.com, hui.wang@ruc.edu.cn

"""
recbole.model.layers
#############################
Common Layers in recommender system
"""

from logging import getLogger
import numpy as np
import copy
import math
import torch
import torch.nn as nn
import torch.nn.functional as fn
from torch.nn.init import normal_
from recbole.utils import ModelType, InputType, FeatureType


[docs]class MLPLayers(nn.Module): r""" MLPLayers Args: - layers(list): a list contains the size of each layer in mlp layers - dropout(float): probability of an element to be zeroed. Default: 0 - activation(str): activation function after each layer in mlp layers. Default: 'relu' candidates: 'sigmoid', 'tanh', 'relu', 'leekyrelu', 'none' Shape: - Input: (:math:`N`, \*, :math:`H_{in}`) where \* means any number of additional dimensions :math:`H_{in}` must equal to the first value in `layers` - Output: (:math:`N`, \*, :math:`H_{out}`) where :math:`H_{out}` equals to the last value in `layers` Examples:: >>> m = MLPLayers([64, 32, 16], 0.2, 'relu') >>> input = torch.randn(128, 64) >>> output = m(input) >>> print(output.size()) >>> torch.Size([128, 16]) """ def __init__(self, layers, dropout=0, activation='relu', bn=False, init_method=None): super(MLPLayers, self).__init__() self.layers = layers self.dropout = dropout self.activation = activation self.use_bn = bn self.init_method = init_method mlp_modules = [] for idx, (input_size, output_size) in enumerate(zip(self.layers[:-1], self.layers[1:])): mlp_modules.append(nn.Dropout(p=self.dropout)) mlp_modules.append(nn.Linear(input_size, output_size)) if self.use_bn: mlp_modules.append(nn.BatchNorm1d(num_features=output_size)) activation_func = activation_layer(self.activation, output_size) if activation_func is not None: mlp_modules.append(activation_func) self.mlp_layers = nn.Sequential(*mlp_modules) if self.init_method is not None: self.apply(self.init_weights)
[docs] def init_weights(self, module): # We just initialize the module with normal distribution as the paper said if isinstance(module, nn.Linear): if self.init_method == 'norm': normal_(module.weight.data, 0, 0.01) if module.bias is not None: module.bias.data.fill_(0.0)
[docs] def forward(self, input_feature): return self.mlp_layers(input_feature)
[docs]def activation_layer(activation_name='relu', emb_dim=None): """Construct activation layers Args: activation_name: str, name of activation function emb_dim: int, used for Dice activation Return: activation: activation layer """ if activation_name is None: activation = None elif isinstance(activation_name, str): if activation_name.lower() == 'sigmoid': activation = nn.Sigmoid() elif activation_name.lower() == 'tanh': activation = nn.Tanh() elif activation_name.lower() == 'relu': activation = nn.ReLU() elif activation_name.lower() == 'leakyrelu': activation = nn.LeakyReLU() elif activation_name.lower() == 'dice': activation = Dice(emb_dim) elif activation_name.lower() == 'none': activation = None elif issubclass(activation_name, nn.Module): activation = activation_name() else: raise NotImplementedError("activation function {} is not implemented".format(activation_name)) return activation
[docs]class FMEmbedding(nn.Module): r""" Embedding for token fields. Args: field_dims: list, the number of tokens in each token fields offsets: list, the dimension offset of each token field embed_dim: int, the dimension of output embedding vectors Input: input_x: tensor, A 3D tensor with shape:``(batch_size,field_size)``. Return: output: tensor, A 3D tensor with shape: ``(batch_size,field_size,embed_dim)``. """ def __init__(self, field_dims, offsets, embed_dim): super(FMEmbedding, self).__init__() self.embedding = nn.Embedding(sum(field_dims), embed_dim) self.offsets = offsets
[docs] def forward(self, input_x): input_x = input_x + input_x.new_tensor(self.offsets).unsqueeze(0) output = self.embedding(input_x) return output
[docs]class BaseFactorizationMachine(nn.Module): r"""Calculate FM result over the embeddings Args: reduce_sum: bool, whether to sum the result, default is True. Input: input_x: tensor, A 3D tensor with shape:``(batch_size,field_size,embed_dim)``. Output output: tensor, A 3D tensor with shape: ``(batch_size,1)`` or ``(batch_size, embed_dim)``. """ def __init__(self, reduce_sum=True): super(BaseFactorizationMachine, self).__init__() self.reduce_sum = reduce_sum
[docs] def forward(self, input_x): square_of_sum = torch.sum(input_x, dim=1) ** 2 sum_of_square = torch.sum(input_x ** 2, dim=1) output = square_of_sum - sum_of_square if self.reduce_sum: output = torch.sum(output, dim=1, keepdim=True) output = 0.5 * output return output
[docs]class BiGNNLayer(nn.Module): r"""Propagate a layer of Bi-interaction GNN .. math:: output = (L+I)EW_1 + LE \otimes EW_2 """ def __init__(self, in_dim, out_dim): super(BiGNNLayer, self).__init__() self.in_dim = in_dim self.out_dim = out_dim self.linear = torch.nn.Linear(in_features=in_dim, out_features=out_dim) self.interActTransform = torch.nn.Linear(in_features=in_dim, out_features=out_dim)
[docs] def forward(self, lap_matrix, eye_matrix, features): # for GCF ajdMat is a (N+M) by (N+M) mat # lap_matrix L = D^-1(A)D^-1 # 拉普拉斯矩阵 x = torch.sparse.mm(lap_matrix, features) inter_part1 = self.linear(features + x) inter_feature = torch.mul(x, features) inter_part2 = self.interActTransform(inter_feature) return inter_part1 + inter_part2
[docs]class AttLayer(nn.Module): """Calculate the attention signal(weight) according the input tensor. Args: infeatures (torch.FloatTensor): A 3D input tensor with shape of[batch_size, M, embed_dim]. Returns: torch.FloatTensor: Attention weight of input. shape of [batch_size, M]. """ def __init__(self, in_dim, att_dim): super(AttLayer, self).__init__() self.in_dim = in_dim self.att_dim = att_dim self.w = torch.nn.Linear(in_features=in_dim, out_features=att_dim, bias=False) self.h = nn.Parameter(torch.randn(att_dim), requires_grad=True)
[docs] def forward(self, infeatures): att_singal = self.w(infeatures) # [batch_size, M, att_dim] att_singal = fn.relu(att_singal) # [batch_size, M, att_dim] att_singal = torch.mul(att_singal, self.h) # [batch_size, M, att_dim] att_singal = torch.sum(att_singal, dim=2) # [batch_size, M] att_singal = fn.softmax(att_singal, dim=1) # [batch_size, M] return att_singal
[docs]class Dice(nn.Module): r"""Dice activation function .. math:: f(s)=p(s) \cdot s+(1-p(s)) \cdot \alpha s .. math:: p(s)=\frac{1} {1 + e^{-\frac{s-E[s]} {\sqrt {Var[s] + \epsilon}}}} """ def __init__(self, emb_size): super(Dice, self).__init__() self.sigmoid = nn.Sigmoid() self.alpha = torch.zeros((emb_size,))
[docs] def forward(self, score): self.alpha = self.alpha.to(score.device) score_p = self.sigmoid(score) return self.alpha * (1 - score_p) * score + score_p * score
[docs]class SequenceAttLayer(nn.Module): """Attention Layer. Get the representation of each user in the batch. Args: queries (torch.Tensor): candidate ads, [B, H], H means embedding_size * feat_num keys (torch.Tensor): user_hist, [B, T, H] keys_length (torch.Tensor): mask, [B] Returns: torch.Tensor: result """ def __init__(self, mask_mat, att_hidden_size=(80, 40), activation='sigmoid', softmax_stag=False, return_seq_weight=True): super(SequenceAttLayer, self).__init__() self.att_hidden_size = att_hidden_size self.activation = activation self.softmax_stag = softmax_stag self.return_seq_weight = return_seq_weight self.mask_mat = mask_mat self.att_mlp_layers = MLPLayers(self.att_hidden_size, activation='Sigmoid', bn=False) self.dense = nn.Linear(self.att_hidden_size[-1], 1)
[docs] def forward(self, queries, keys, keys_length): embbedding_size = queries.shape[-1] # H hist_len = keys.shape[1] # T queries = queries.repeat(1, hist_len) queries = queries.view(-1, hist_len, embbedding_size) # MLP Layer input_tensor = torch.cat([queries, keys, queries - keys, queries * keys], dim=-1) output = self.att_mlp_layers(input_tensor) output = torch.transpose(self.dense(output), -1, -2) # get mask output = output.squeeze(1) mask = self.mask_mat.repeat(output.size(0), 1) mask = (mask >= keys_length.unsqueeze(1)) # mask if self.softmax_stag: mask_value = -np.inf else: mask_value = 0.0 output = output.masked_fill(mask=mask, value=torch.tensor(mask_value)) output = output.unsqueeze(1) output = output / (embbedding_size ** 0.5) # get the weight of each user's history list about the target item if self.softmax_stag: output = fn.softmax(output, dim=2) # [B, 1, T] if not self.return_seq_weight: output = torch.matmul(output, keys) # [B, 1, H] return output
[docs]class VanillaAttention(nn.Module): """ Vanilla attention layer is implemented by linear layer. Args: input_tensor (torch.Tensor): the input of the attention layer Returns: hidden_states (torch.Tensor): the outputs of the attention layer weights (torch.Tensor): the attention weights """ def __init__(self, hidden_dim, attn_dim): super().__init__() self.projection = nn.Sequential( nn.Linear(hidden_dim, attn_dim), nn.ReLU(True), nn.Linear(attn_dim, 1) )
[docs] def forward(self, input_tensor): # (B, Len, num, H) -> (B, Len, num, 1) energy = self.projection(input_tensor) weights = torch.softmax(energy.squeeze(-1), dim=-1) # (B, Len, num, H) * (B, Len, num, 1) -> (B, len, H) hidden_states = (input_tensor * weights.unsqueeze(-1)).sum(dim=-2) return hidden_states, weights
[docs]class MultiHeadAttention(nn.Module): """ Multi-head Self-attention layers, a attention score dropout layer is introduced. Args: input_tensor (torch.Tensor): the input of the multi-head self-attention layer attention_mask (torch.Tensor): the attention mask for input tensor Returns: hidden_states (torch.Tensor): the output of the multi-head self-attention layer """ def __init__(self, n_heads, hidden_size, hidden_dropout_prob, attn_dropout_prob, layer_norm_eps): super(MultiHeadAttention, self).__init__() if hidden_size % n_heads != 0: raise ValueError( "The hidden size (%d) is not a multiple of the number of attention " "heads (%d)" % (hidden_size, n_heads)) self.num_attention_heads = n_heads self.attention_head_size = int(hidden_size / n_heads) self.all_head_size = self.num_attention_heads * self.attention_head_size self.query = nn.Linear(hidden_size, self.all_head_size) self.key = nn.Linear(hidden_size, self.all_head_size) self.value = nn.Linear(hidden_size, self.all_head_size) self.attn_dropout = nn.Dropout(attn_dropout_prob) self.dense = nn.Linear(hidden_size, hidden_size) self.LayerNorm = nn.LayerNorm(hidden_size, eps=layer_norm_eps) self.out_dropout = nn.Dropout(hidden_dropout_prob)
[docs] def transpose_for_scores(self, x): new_x_shape = x.size()[:-1] + (self.num_attention_heads, self.attention_head_size) x = x.view(*new_x_shape) return x.permute(0, 2, 1, 3)
[docs] def forward(self, input_tensor, attention_mask): mixed_query_layer = self.query(input_tensor) mixed_key_layer = self.key(input_tensor) mixed_value_layer = self.value(input_tensor) query_layer = self.transpose_for_scores(mixed_query_layer) key_layer = self.transpose_for_scores(mixed_key_layer) value_layer = self.transpose_for_scores(mixed_value_layer) # Take the dot product between "query" and "key" to get the raw attention scores. attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2)) attention_scores = attention_scores / math.sqrt(self.attention_head_size) # Apply the attention mask is (precomputed for all layers in BertModel forward() function) # [batch_size heads seq_len seq_len] scores # [batch_size 1 1 seq_len] attention_scores = attention_scores + attention_mask # Normalize the attention scores to probabilities. attention_probs = nn.Softmax(dim=-1)(attention_scores) # This is actually dropping out entire tokens to attend to, which might # seem a bit unusual, but is taken from the original Transformer paper. attention_probs = self.attn_dropout(attention_probs) context_layer = torch.matmul(attention_probs, value_layer) context_layer = context_layer.permute(0, 2, 1, 3).contiguous() new_context_layer_shape = context_layer.size()[:-2] + (self.all_head_size,) context_layer = context_layer.view(*new_context_layer_shape) hidden_states = self.dense(context_layer) hidden_states = self.out_dropout(hidden_states) hidden_states = self.LayerNorm(hidden_states + input_tensor) return hidden_states
[docs]class FeedForward(nn.Module): """ Point-wise feed-forward layer is implemented by two dense layers. Args: input_tensor (torch.Tensor): the input of the point-wise feed-forward layer Returns: hidden_states (torch.Tensor): the output of the point-wise feed-forward layer """ def __init__(self, hidden_size, inner_size, hidden_dropout_prob, hidden_act, layer_norm_eps): super(FeedForward, self).__init__() self.dense_1 = nn.Linear(hidden_size, inner_size) self.intermediate_act_fn = self.get_hidden_act(hidden_act) self.dense_2 = nn.Linear(inner_size, hidden_size) self.LayerNorm = nn.LayerNorm(hidden_size, eps=layer_norm_eps) self.dropout = nn.Dropout(hidden_dropout_prob)
[docs] def get_hidden_act(self, act): ACT2FN = { "gelu": self.gelu, "relu": fn.relu, "swish": self.swish, "tanh": torch.tanh, "sigmoid": torch.sigmoid, } return ACT2FN[act]
[docs] def gelu(self, x): """Implementation of the gelu activation function. For information: OpenAI GPT's gelu is slightly different (and gives slightly different results):: 0.5 * x * (1 + torch.tanh(math.sqrt(2 / math.pi) * (x + 0.044715 * torch.pow(x, 3)))) Also see https://arxiv.org/abs/1606.08415 """ return x * 0.5 * (1.0 + torch.erf(x / math.sqrt(2.0)))
[docs] def swish(self, x): return x * torch.sigmoid(x)
[docs] def forward(self, input_tensor): hidden_states = self.dense_1(input_tensor) hidden_states = self.intermediate_act_fn(hidden_states) hidden_states = self.dense_2(hidden_states) hidden_states = self.dropout(hidden_states) hidden_states = self.LayerNorm(hidden_states + input_tensor) return hidden_states
[docs]class TransformerLayer(nn.Module): """ One transformer layer consists of a multi-head self-attention layer and a point-wise feed-forward layer. Args: hidden_states (torch.Tensor): the input of the multi-head self-attention sublayer attention_mask (torch.Tensor): the attention mask for the multi-head self-attention sublayer Returns: feedforward_output (torch.Tensor): the output of the point-wise feed-forward sublayer, is the output of the transformer layer """ def __init__(self, n_heads, hidden_size, intermediate_size, hidden_dropout_prob, attn_dropout_prob, hidden_act, layer_norm_eps): super(TransformerLayer, self).__init__() self.multi_head_attention = MultiHeadAttention(n_heads, hidden_size, hidden_dropout_prob, attn_dropout_prob, layer_norm_eps) self.feed_forward = FeedForward(hidden_size, intermediate_size, hidden_dropout_prob, hidden_act, layer_norm_eps)
[docs] def forward(self, hidden_states, attention_mask): attention_output = self.multi_head_attention(hidden_states, attention_mask) feedforward_output = self.feed_forward(attention_output) return feedforward_output
[docs]class TransformerEncoder(nn.Module): r""" One TransformerEncoder consists of several TransformerLayers. - n_layers(num): num of transformer layers in transformer encoder. Default: 2 - n_heads(num): num of attention heads for multi-head attention layer. Default: 2 - hidden_size(num): the input and output hidden size. Default: 64 - inner_size(num): the dimensionality in feed-forward layer. Default: 256 - hidden_dropout_prob(float): probability of an element to be zeroed. Default: 0.5 - attn_dropout_prob(float): probability of an attention score to be zeroed. Default: 0.5 - hidden_act(str): activation function in feed-forward layer. Default: 'gelu' candidates: 'gelu', 'relu', 'swish', 'tanh', 'sigmoid' - layer_norm_eps(float): a value added to the denominator for numerical stability. Default: 1e-12 """ def __init__(self, n_layers=2, n_heads=2, hidden_size=64, inner_size=256, hidden_dropout_prob=0.5, attn_dropout_prob=0.5, hidden_act='gelu', layer_norm_eps=1e-12): super(TransformerEncoder, self).__init__() layer = TransformerLayer(n_heads, hidden_size, inner_size, hidden_dropout_prob, attn_dropout_prob, hidden_act, layer_norm_eps) self.layer = nn.ModuleList([copy.deepcopy(layer) for _ in range(n_layers)])
[docs] def forward(self, hidden_states, attention_mask, output_all_encoded_layers=True): """ Args: hidden_states (torch.Tensor): the input of the TrandformerEncoder attention_mask (torch.Tensor): the attention mask for the input hidden_states output_all_encoded_layers (Bool): whether output all transformer layers' output Returns: all_encoder_layers (list): if output_all_encoded_layers is True, return a list consists of all transformer layers' output, otherwise return a list only consists of the output of last transformer layer. """ all_encoder_layers = [] for layer_module in self.layer: hidden_states = layer_module(hidden_states, attention_mask) if output_all_encoded_layers: all_encoder_layers.append(hidden_states) if not output_all_encoded_layers: all_encoder_layers.append(hidden_states) return all_encoder_layers
[docs]class ContextSeqEmbAbstractLayer(nn.Module): """For Deep Interest Network and feature-rich sequential recommender systems, return features embedding matrices.""" def __init__(self): super(ContextSeqEmbAbstractLayer, self).__init__()
[docs] def get_fields_name_dim(self): """get user feature field and item feature field. """ self.token_field_offsets = {} self.token_embedding_table = {} self.float_embedding_table = {} self.token_seq_embedding_table = {} self.token_field_names = {type: [] for type in self.types} self.token_field_dims = {type: [] for type in self.types} self.float_field_names = {type: [] for type in self.types} self.float_field_dims = {type: [] for type in self.types} self.token_seq_field_names = {type: [] for type in self.types} self.token_seq_field_dims = {type: [] for type in self.types} self.num_feature_field = {type: 0 for type in self.types} for type in self.types: for field_name in self.field_names[type]: if self.dataset.field2type[field_name] == FeatureType.TOKEN: self.token_field_names[type].append(field_name) self.token_field_dims[type].append(self.dataset.num(field_name)) elif self.dataset.field2type[field_name] == FeatureType.TOKEN_SEQ: self.token_seq_field_names[type].append(field_name) self.token_seq_field_dims[type].append(self.dataset.num(field_name)) else: self.float_field_names[type].append(field_name) self.float_field_dims[type].append(self.dataset.num(field_name)) self.num_feature_field[type] += 1
[docs] def get_embedding(self): """get embedding of all features. """ for type in self.types: if len(self.token_field_dims[type]) > 0: self.token_field_offsets[type] = np.array((0, *np.cumsum(self.token_field_dims[type])[:-1]), dtype=np.long) self.token_embedding_table[type] = FMEmbedding(self.token_field_dims[type], self.token_field_offsets[type], self.embedding_size).to(self.device) if len(self.float_field_dims[type]) > 0: self.float_embedding_table[type] = nn.Embedding(np.sum(self.float_field_dims[type], dtype=np.int32), self.embedding_size).to(self.device) if len(self.token_seq_field_dims) > 0: self.token_seq_embedding_table[type] = nn.ModuleList() for token_seq_field_dim in self.token_seq_field_dims[type]: self.token_seq_embedding_table[type].append( nn.Embedding(token_seq_field_dim, self.embedding_size).to(self.device))
[docs] def embed_float_fields(self, float_fields, type, embed=True): """Get the embedding of float fields. In the following three functions("embed_float_fields" "embed_token_fields" "embed_token_seq_fields") when the type is user, [batch_size, max_item_length] should be recognised as [batch_size] Args: float_fields(torch.Tensor): [batch_size, max_item_length, num_float_field] type(str): user or item embed(bool): embed or not Returns: torch.Tensor: float fields embedding. [batch_size, max_item_length, num_float_field, embed_dim] """ if not embed or float_fields is None: return float_fields num_float_field = float_fields.shape[-1] # [batch_size, max_item_length, num_float_field] index = torch.arange(0, num_float_field).unsqueeze(0).expand_as(float_fields).long().to(self.device) # [batch_size, max_item_length, num_float_field, embed_dim] float_embedding = self.float_embedding_table[type](index) float_embedding = torch.mul(float_embedding, float_fields.unsqueeze(-1)) return float_embedding
[docs] def embed_token_fields(self, token_fields, type): """Get the embedding of toekn fields Args: token_fields(torch.Tensor): input, [batch_size, max_item_length, num_token_field] type(str): user or item Returns: torch.Tensor: token fields embedding, [batch_size, max_item_length, num_token_field, embed_dim] """ if token_fields is None: return None # [batch_size, max_item_length, num_token_field, embed_dim] if type == 'item': embedding_shape = token_fields.shape + (-1,) token_fields = token_fields.reshape(-1, token_fields.shape[-1]) token_embedding = self.token_embedding_table[type](token_fields) token_embedding = token_embedding.view(embedding_shape) else: token_embedding = self.token_embedding_table[type](token_fields) return token_embedding
[docs] def embed_token_seq_fields(self, token_seq_fields, type): """Get the embedding of token_seq fields. Args: token_seq_fields(torch.Tensor): input, [batch_size, max_item_length, seq_len]` type(str): user or item mode(str): mean/max/sum Returns: torch.Tensor: result [batch_size, max_item_length, num_token_seq_field, embed_dim] """ fields_result = [] for i, token_seq_field in enumerate(token_seq_fields): embedding_table = self.token_seq_embedding_table[type][i] mask = token_seq_field != 0 # [batch_size, max_item_length, seq_len] mask = mask.float() value_cnt = torch.sum(mask, dim=-1, keepdim=True) # [batch_size, max_item_length, 1] token_seq_embedding = embedding_table(token_seq_field) # [batch_size, max_item_length, seq_len, embed_dim] mask = mask.unsqueeze(-1).expand_as( token_seq_embedding) # [batch_size, max_item_length, seq_len, embed_dim] if self.pooling_mode == 'max': masked_token_seq_embedding = token_seq_embedding - ( 1 - mask) * 1e9 # [batch_size, max_item_length, seq_len, embed_dim] result = torch.max(masked_token_seq_embedding, dim=-2, keepdim=True) # [batch_size, max_item_length, 1, embed_dim] result = result.values elif self.pooling_mode == 'sum': masked_token_seq_embedding = token_seq_embedding * mask.float() result = torch.sum(masked_token_seq_embedding, dim=-2, keepdim=True) # [batch_size, max_item_length, 1, embed_dim] else: masked_token_seq_embedding = token_seq_embedding * mask.float() result = torch.sum(masked_token_seq_embedding, dim=-2) # [batch_size, max_item_length, embed_dim] eps = torch.FloatTensor([1e-8]).to(self.device) result = torch.div(result, value_cnt + eps) # [batch_size, max_item_length, embed_dim] result = result.unsqueeze(-2) # [batch_size, max_item_length, 1, embed_dim] fields_result.append(result) if len(fields_result) == 0: return None else: return torch.cat(fields_result, dim=-2) # [batch_size, max_item_length, num_token_seq_field, embed_dim]
[docs] def embed_input_fields(self, user_idx, item_idx): """Get the embedding of user_idx and item_idx Args: user_idx(torch.Tensor): interaction['user_id'] item_idx(torch.Tensor): interaction['item_id_list'] Returns: dict: embedding of user feature and item feature """ user_item_feat = {'user': self.user_feat, 'item': self.item_feat} user_item_idx = {'user': user_idx, 'item': item_idx} float_fields_embedding = {} token_fields_embedding = {} token_seq_fields_embedding = {} sparse_embedding = {} dense_embedding = {} for type in self.types: float_fields = [] for field_name in self.float_field_names[type]: feature = user_item_feat[type][field_name][user_item_idx[type]] float_fields.append(feature if len(feature.shape) == (2 + (type == 'item')) else feature.unsqueeze(-1)) if len(float_fields) > 0: float_fields = torch.cat(float_fields, dim=1) # [batch_size, max_item_length, num_float_field] else: float_fields = None # [batch_size, max_item_length, num_float_field] # or [batch_size, max_item_length, num_float_field, embed_dim] or None float_fields_embedding[type] = self.embed_float_fields(float_fields, type) token_fields = [] for field_name in self.token_field_names[type]: feature = user_item_feat[type][field_name][user_item_idx[type]] token_fields.append(feature.unsqueeze(-1)) if len(token_fields) > 0: token_fields = torch.cat(token_fields, dim=-1) # [batch_size, max_item_length, num_token_field] else: token_fields = None # [batch_size, max_item_length, num_token_field, embed_dim] or None token_fields_embedding[type] = self.embed_token_fields(token_fields, type) token_seq_fields = [] for field_name in self.token_seq_field_names[type]: feature = user_item_feat[type][field_name][user_item_idx[type]] token_seq_fields.append(feature) # [batch_size, max_item_length, num_token_seq_field, embed_dim] or None token_seq_fields_embedding[type] = self.embed_token_seq_fields(token_seq_fields, type) if token_fields_embedding[type] is None: sparse_embedding[type] = token_seq_fields_embedding[type] else: if token_seq_fields_embedding[type] is None: sparse_embedding[type] = token_fields_embedding[type] else: sparse_embedding[type] = torch.cat([token_fields_embedding[type], token_seq_fields_embedding[type]], dim=-2) dense_embedding[type] = float_fields_embedding[type] # sparse_embedding[type] shape: [batch_size, max_item_length, num_token_seq_field+num_token_field, embed_dim] or None # dense_embedding[type] shape: [batch_size, max_item_length, num_float_field] or [batch_size, max_item_length, num_float_field, embed_dim] or None return sparse_embedding, dense_embedding
[docs] def forward(self, user_idx, item_idx): return self.embed_input_fields(user_idx, item_idx)
[docs]class ContextSeqEmbLayer(ContextSeqEmbAbstractLayer): """For Deep Interest Network, return all features (including user features and item features) embedding matrices.""" def __init__(self, dataset, embedding_size, pooling_mode, device): super(ContextSeqEmbLayer, self).__init__() self.device = device self.embedding_size = embedding_size self.dataset = dataset self.user_feat = self.dataset.get_user_feature().to(self.device) self.item_feat = self.dataset.get_item_feature().to(self.device) self.field_names = {'user': list(self.user_feat.interaction.keys()), 'item': list(self.item_feat.interaction.keys())} self.types = ['user', 'item'] self.pooling_mode = pooling_mode try: assert self.pooling_mode in ['mean', 'max', 'sum'] except AssertionError: raise AssertionError("Make sure 'pooling_mode' in ['mean', 'max', 'sum']!") self.get_fields_name_dim() self.get_embedding()
[docs]class FeatureSeqEmbLayer(ContextSeqEmbAbstractLayer): """For feature-rich sequential recommenders, return item features embedding matrices according to selected features.""" def __init__(self, dataset, embedding_size, selected_features, pooling_mode, device): super(FeatureSeqEmbLayer, self).__init__() self.device = device self.embedding_size = embedding_size self.dataset = dataset self.user_feat = None self.item_feat = self.dataset.get_item_feature().to(self.device) self.field_names = {'item': selected_features} self.types = ['item'] self.pooling_mode = pooling_mode try: assert self.pooling_mode in ['mean', 'max', 'sum'] except AssertionError: raise AssertionError("Make sure 'pooling_mode' in ['mean', 'max', 'sum']!") self.get_fields_name_dim() self.get_embedding()
[docs]class CNNLayers(nn.Module): r""" CNNLayers Args: - channels(list): a list contains the channels of each layer in cnn layers - kernel(list): a list contains the kernels of each layer in cnn layers - strides(list): a list contains the channels of each layer in cnn layers - activation(str): activation function after each layer in mlp layers. Default: 'relu' candidates: 'sigmoid', 'tanh', 'relu', 'leekyrelu', 'none' Shape: - Input: :math:`(N, C_{in}, H_{in}, W_{in})` - Output: :math:`(N, C_{out}, H_{out}, W_{out})` where .. math:: H_{out} = \left\lfloor\frac{H_{in} + 2 \times \text{padding}[0] - \text{dilation}[0] \times (\text{kernel\_size}[0] - 1) - 1}{\text{stride}[0]} + 1\right\rfloor .. math:: W_{out} = \left\lfloor\frac{W_{in} + 2 \times \text{padding}[1] - \text{dilation}[1] \times (\text{kernel\_size}[1] - 1) - 1}{\text{stride}[1]} + 1\right\rfloor Examples:: >>> m = CNNLayers([1, 32, 32], [2,2], [2,2], 'relu') >>> input = torch.randn(128, 1, 64, 64) >>> output = m(input) >>> print(output.size()) >>> torch.Size([128, 32, 16, 16]) """ def __init__(self, channels, kernels, strides, activation='relu', init_method=None): super(CNNLayers, self).__init__() self.channels = channels self.kernels = kernels self.strides = strides self.activation = activation self.init_method = init_method self.num_of_nets = len(self.channels) - 1 if len(kernels) != len(strides) or self.num_of_nets != (len(kernels)): raise RuntimeError('channels, kernels and strides don\'t match\n') cnn_modules = [] for i in range(self.num_of_nets): cnn_modules.append(nn.Conv2d(self.channels[i], self.channels[i + 1], self.kernels[i], stride=self.strides[i])) if self.activation.lower() == 'sigmoid': cnn_modules.append(nn.Sigmoid()) elif self.activation.lower() == 'tanh': cnn_modules.append(nn.Tanh()) elif self.activation.lower() == 'relu': cnn_modules.append(nn.ReLU()) elif self.activation.lower() == 'leakyrelu': cnn_modules.append(nn.LeakyReLU()) elif self.activation.lower() == 'none': pass self.cnn_layers = nn.Sequential(*cnn_modules) if self.init_method is not None: self.apply(self.init_weights)
[docs] def init_weights(self, module): # We just initialize the module with normal distribution as the paper said if isinstance(module, nn.Conv2d): if self.init_method == 'norm': normal_(module.weight.data, 0, 0.01) if module.bias is not None: module.bias.data.fill_(0.0)
[docs] def forward(self, input_feature): return self.cnn_layers(input_feature)
[docs]class FMFirstOrderLinear(nn.Module): """Calculate the first order score of the input features. This class is a member of ContextRecommender, you can call it easily when inherit ContextRecommender. """ def __init__(self, config, dataset, output_dim=1): super(FMFirstOrderLinear, self).__init__() self.field_names = dataset.fields() self.LABEL = config['LABEL_FIELD'] self.device = config['device'] self.token_field_names = [] self.token_field_dims = [] self.float_field_names = [] self.float_field_dims = [] self.token_seq_field_names = [] self.token_seq_field_dims = [] for field_name in self.field_names: if field_name == self.LABEL: continue if dataset.field2type[field_name] == FeatureType.TOKEN: self.token_field_names.append(field_name) self.token_field_dims.append(dataset.num(field_name)) elif dataset.field2type[field_name] == FeatureType.TOKEN_SEQ: self.token_seq_field_names.append(field_name) self.token_seq_field_dims.append(dataset.num(field_name)) else: self.float_field_names.append(field_name) self.float_field_dims.append(dataset.num(field_name)) if len(self.token_field_dims) > 0: self.token_field_offsets = np.array((0, *np.cumsum(self.token_field_dims)[:-1]), dtype=np.long) self.token_embedding_table = FMEmbedding(self.token_field_dims, self.token_field_offsets, output_dim) if len(self.float_field_dims) > 0: self.float_embedding_table = nn.Embedding(np.sum(self.float_field_dims, dtype=np.int32), output_dim) if len(self.token_seq_field_dims) > 0: self.token_seq_embedding_table = nn.ModuleList() for token_seq_field_dim in self.token_seq_field_dims: self.token_seq_embedding_table.append(nn.Embedding(token_seq_field_dim, output_dim)) self.bias = nn.Parameter(torch.zeros((output_dim,)), requires_grad=True)
[docs] def embed_float_fields(self, float_fields, embed=True): """Calculate the first order score of float feature columns Args: float_fields (torch.FloatTensor): The input tensor. shape of [batch_size, num_float_field] Returns: torch.FloatTensor: The first order score of float feature columns """ # input Tensor shape : [batch_size, num_float_field] if not embed or float_fields is None: return float_fields num_float_field = float_fields.shape[1] # [batch_size, num_float_field] index = torch.arange(0, num_float_field).unsqueeze(0).expand_as(float_fields).long().to(self.device) # [batch_size, num_float_field, output_dim] float_embedding = self.float_embedding_table(index) float_embedding = torch.mul(float_embedding, float_fields.unsqueeze(2)) # [batch_size, 1, output_dim] float_embedding = torch.sum(float_embedding, dim=1, keepdim=True) return float_embedding
[docs] def embed_token_fields(self, token_fields): """Calculate the first order score of token feature columns Args: token_fields (torch.LongTensor): The input tensor. shape of [batch_size, num_token_field] Returns: torch.FloatTensor: The first order score of token feature columns """ # input Tensor shape : [batch_size, num_token_field] if token_fields is None: return None # [batch_size, num_token_field, embed_dim] token_embedding = self.token_embedding_table(token_fields) # [batch_size, 1, output_dim] token_embedding = torch.sum(token_embedding, dim=1, keepdim=True) return token_embedding
[docs] def embed_token_seq_fields(self, token_seq_fields): """Calculate the first order score of token sequence feature columns Args: token_seq_fields (torch.LongTensor): The input tensor. shape of [batch_size, seq_len] Returns: torch.FloatTensor: The first order score of token sequence feature columns """ # input is a list of Tensor shape of [batch_size, seq_len] fields_result = [] for i, token_seq_field in enumerate(token_seq_fields): embedding_table = self.token_seq_embedding_table[i] mask = token_seq_field != 0 # [batch_size, seq_len] mask = mask.float() value_cnt = torch.sum(mask, dim=1, keepdim=True) # [batch_size, 1] token_seq_embedding = embedding_table(token_seq_field) # [batch_size, seq_len, output_dim] mask = mask.unsqueeze(2).expand_as(token_seq_embedding) # [batch_size, seq_len, output_dim] masked_token_seq_embedding = token_seq_embedding * mask.float() result = torch.sum(masked_token_seq_embedding, dim=1, keepdim=True) # [batch_size, 1, output_dim] fields_result.append(result) if len(fields_result) == 0: return None else: return torch.sum(torch.cat(fields_result, dim=1), dim=1, keepdim=True) # [batch_size, 1, output_dim]
[docs] def forward(self, interaction): total_fields_embedding = [] float_fields = [] for field_name in self.float_field_names: float_fields.append(interaction[field_name] if len(interaction[field_name].shape) == 2 else interaction[field_name].unsqueeze(1)) if len(float_fields) > 0: float_fields = torch.cat(float_fields, dim=1) # [batch_size, num_float_field] else: float_fields = None # [batch_size, 1, output_dim] or None float_fields_embedding = self.embed_float_fields(float_fields, embed=True) if float_fields_embedding is not None: total_fields_embedding.append(float_fields_embedding) token_fields = [] for field_name in self.token_field_names: token_fields.append(interaction[field_name].unsqueeze(1)) if len(token_fields) > 0: token_fields = torch.cat(token_fields, dim=1) # [batch_size, num_token_field] else: token_fields = None # [batch_size, 1, output_dim] or None token_fields_embedding = self.embed_token_fields(token_fields) if token_fields_embedding is not None: total_fields_embedding.append(token_fields_embedding) token_seq_fields = [] for field_name in self.token_seq_field_names: token_seq_fields.append(interaction[field_name]) # [batch_size, 1, output_dim] or None token_seq_fields_embedding = self.embed_token_seq_fields(token_seq_fields) if token_seq_fields_embedding is not None: total_fields_embedding.append(token_seq_fields_embedding) return torch.sum(torch.cat(total_fields_embedding, dim=1), dim=1) + self.bias # [batch_size, output_dim]