# -*- coding: utf-8 -*-
# @Time : 2020/11/20 22:33
# @Author : Shao Weiqi
# @Reviewer : Lin Kun
# @Email : shaoweiqi@ruc.edu.cn
r"""
SHAN
################################################
Reference:
Ying, H et al. "Sequential Recommender System based on Hierarchical Attention Network."in IJCAI 2018
"""
import numpy as np
import torch
import torch.nn as nn
from torch.nn.init import normal_, uniform_
from recbole.model.abstract_recommender import SequentialRecommender
from recbole.model.loss import BPRLoss
[docs]class SHAN(SequentialRecommender):
r"""
SHAN exploit the Hierarchical Attention Network to get the long-short term preference
first get the long term purpose and then fuse the long-term with recent items to get long-short term purpose
"""
def __init__(self, config, dataset):
super(SHAN, self).__init__(config, dataset)
# load the dataset information
self.n_users = dataset.num(self.USER_ID)
self.device = config['device']
# load the parameter information
self.embedding_size = config["embedding_size"]
self.short_item_length = config["short_item_length"] # the length of the short session items
assert self.short_item_length <= self.max_seq_length, "short_item_length can't longer than the max_seq_length"
self.reg_weight = config["reg_weight"]
# define layers and loss
self.item_embedding = nn.Embedding(self.n_items, self.embedding_size, padding_idx=0)
self.user_embedding = nn.Embedding(self.n_users, self.embedding_size)
self.long_w = nn.Linear(self.embedding_size, self.embedding_size)
self.long_b = nn.Parameter(
uniform_(
tensor=torch.zeros(self.embedding_size),
a=-np.sqrt(3 / self.embedding_size),
b=np.sqrt(3 / self.embedding_size)
),
requires_grad=True
).to(self.device)
self.long_short_w = nn.Linear(self.embedding_size, self.embedding_size)
self.long_short_b = nn.Parameter(
uniform_(
tensor=torch.zeros(self.embedding_size),
a=-np.sqrt(3 / self.embedding_size),
b=np.sqrt(3 / self.embedding_size)
),
requires_grad=True
).to(self.device)
self.relu = nn.ReLU()
self.loss_type = config['loss_type']
if self.loss_type == 'BPR':
self.loss_fct = BPRLoss()
elif self.loss_type == 'CE':
self.loss_fct = nn.CrossEntropyLoss()
else:
raise NotImplementedError("Make sure 'loss_type' in ['BPR', 'CE']!")
# init the parameter of the model
self.apply(self.init_weights)
[docs] def reg_loss(self, user_embedding, item_embedding):
reg_1, reg_2 = self.reg_weight
loss_1 = reg_1 * torch.norm(self.long_w.weight, p=2) + reg_1 * torch.norm(self.long_short_w.weight, p=2)
loss_2 = reg_2 * torch.norm(user_embedding, p=2) + reg_2 * torch.norm(item_embedding, p=2)
return loss_1 + loss_2
[docs] def inverse_seq_item(self, seq_item, seq_item_len):
"""
inverse the seq_item, like this
[1,2,3,0,0,0,0] -- after inverse -->> [0,0,0,0,1,2,3]
"""
seq_item = seq_item.cpu().numpy()
seq_item_len = seq_item_len.cpu().numpy()
new_seq_item = []
for items, length in zip(seq_item, seq_item_len):
item = list(items[:length])
zeros = list(items[length:])
seqs = zeros + item
new_seq_item.append(seqs)
seq_item = torch.tensor(new_seq_item, dtype=torch.long, device=self.device)
return seq_item
[docs] def init_weights(self, module):
if isinstance(module, nn.Embedding):
normal_(module.weight.data, 0., 0.01)
elif isinstance(module, nn.Linear):
uniform_(module.weight.data, -np.sqrt(3 / self.embedding_size), np.sqrt(3 / self.embedding_size))
elif isinstance(module, nn.Parameter):
uniform_(module.data, -np.sqrt(3 / self.embedding_size), np.sqrt(3 / self.embedding_size))
print(module.data)
[docs] def forward(self, seq_item, user, seq_item_len):
seq_item = self.inverse_seq_item(seq_item, seq_item_len)
seq_item_embedding = self.item_embedding(seq_item)
user_embedding = self.user_embedding(user)
# get the mask
mask = seq_item.data.eq(0)
long_term_attention_based_pooling_layer = self.long_term_attention_based_pooling_layer(
seq_item_embedding, user_embedding, mask
)
# batch_size * 1 * embedding_size
short_item_embedding = seq_item_embedding[:, -self.short_item_length:, :]
mask_long_short = mask[:, -self.short_item_length:]
batch_size = mask_long_short.size(0)
x = torch.zeros(size=(batch_size, 1)).eq(1).to(self.device)
mask_long_short = torch.cat([x, mask_long_short], dim=1)
# batch_size * short_item_length * embedding_size
long_short_item_embedding = torch.cat([long_term_attention_based_pooling_layer, short_item_embedding], dim=1)
# batch_size * 1_plus_short_item_length * embedding_size
long_short_item_embedding = self.long_and_short_term_attention_based_pooling_layer(
long_short_item_embedding, user_embedding, mask_long_short
)
# batch_size * embedding_size
return long_short_item_embedding
[docs] def calculate_loss(self, interaction):
seq_item = interaction[self.ITEM_SEQ]
seq_item_len = interaction[self.ITEM_SEQ_LEN]
user = interaction[self.USER_ID]
user_embedding = self.user_embedding(user)
seq_output = self.forward(seq_item, user, seq_item_len)
pos_items = interaction[self.POS_ITEM_ID]
pos_items_emb = self.item_embedding(pos_items)
if self.loss_type == 'BPR':
neg_items = interaction[self.NEG_ITEM_ID]
neg_items_emb = self.item_embedding(neg_items)
pos_score = torch.sum(seq_output * pos_items_emb, dim=-1)
neg_score = torch.sum(seq_output * neg_items_emb, dim=-1)
loss = self.loss_fct(pos_score, neg_score)
return loss + self.reg_loss(user_embedding, pos_items_emb)
else: # self.loss_type = 'CE'
test_item_emb = self.item_embedding.weight
logits = torch.matmul(seq_output, test_item_emb.transpose(0, 1))
loss = self.loss_fct(logits, pos_items)
return loss + self.reg_loss(user_embedding, pos_items_emb)
[docs] def predict(self, interaction):
item_seq = interaction[self.ITEM_SEQ]
test_item = interaction[self.ITEM_ID]
seq_item_len = interaction[self.ITEM_SEQ_LEN]
user = interaction[self.USER_ID]
seq_output = self.forward(item_seq, user, seq_item_len)
test_item_emb = self.item_embedding(test_item)
scores = torch.mul(seq_output, test_item_emb).sum(dim=1)
return scores
[docs] def full_sort_predict(self, interaction):
item_seq = interaction[self.ITEM_SEQ]
seq_item_len = interaction[self.ITEM_SEQ_LEN]
user = interaction[self.USER_ID]
seq_output = self.forward(item_seq, user, seq_item_len)
test_items_emb = self.item_embedding.weight
scores = torch.matmul(seq_output, test_items_emb.transpose(0, 1))
return scores
[docs] def long_and_short_term_attention_based_pooling_layer(self, long_short_item_embedding, user_embedding, mask=None):
"""
fusing the long term purpose with the short-term preference
"""
long_short_item_embedding_value = long_short_item_embedding
long_short_item_embedding = self.relu(self.long_short_w(long_short_item_embedding) + self.long_short_b)
long_short_item_embedding = torch.matmul(long_short_item_embedding, user_embedding.unsqueeze(2)).squeeze(-1)
# batch_size * seq_len
if mask is not None:
long_short_item_embedding.masked_fill_(mask, -1e9)
long_short_item_embedding = nn.Softmax(dim=-1)(long_short_item_embedding)
long_short_item_embedding = torch.mul(long_short_item_embedding_value,
long_short_item_embedding.unsqueeze(2)).sum(dim=1)
return long_short_item_embedding
[docs] def long_term_attention_based_pooling_layer(self, seq_item_embedding, user_embedding, mask=None):
"""
get the long term purpose of user
"""
seq_item_embedding_value = seq_item_embedding
seq_item_embedding = self.relu(self.long_w(seq_item_embedding) + self.long_b)
user_item_embedding = torch.matmul(seq_item_embedding, user_embedding.unsqueeze(2)).squeeze(-1)
# batch_size * seq_len
if mask is not None:
user_item_embedding.masked_fill_(mask, -1e9)
user_item_embedding = nn.Softmax(dim=1)(user_item_embedding)
user_item_embedding = torch.mul(seq_item_embedding_value,
user_item_embedding.unsqueeze(2)).sum(dim=1, keepdim=True)
# batch_size * 1 * embedding_size
return user_item_embedding