Source code for recbole.model.context_aware_recommender.eulernet

# -*- coding: utf-8 -*-
# @Time   : 2023/4/21 12:00
# @Author : Zhen Tian
# @Email  : chenyuwuxinn@gmail.com
# @File   : eulernet.py

r"""
EulerNet
################################################
Reference:
    Zhen Tian et al. "EulerNet: Adaptive Feature Interaction Learning via Euler's Formula for CTR Prediction." in SIGIR 2023.

Reference code:
    https://github.com/chenyuwuxin/EulerNet

"""

import torch
import torch.nn as nn
from torch.nn.init import xavier_normal_, constant_
from recbole.model.abstract_recommender import ContextRecommender
from recbole.model.loss import RegLoss


[docs]class EulerNet(ContextRecommender): r"""EulerNet is a context-based recommendation model. It can adaptively learn the arbitrary-order feature interactions in a complex vector space by conducting space mapping according to Euler's formula. Meanwhile, it can jointly capture the explicit and implicit feature interactions in a unified model architecture. """ def __init__(self, config, dataset): super(EulerNet, self).__init__(config, dataset) field_num = self.field_num = self.num_feature_field shape_list = [config.embedding_size * field_num] + [ num_neurons * config.embedding_size for num_neurons in config.order_list ] interaction_shapes = [] for inshape, outshape in zip(shape_list[:-1], shape_list[1:]): interaction_shapes.append(EulerInteractionLayer(config, inshape, outshape)) self.Euler_interaction_layers = nn.Sequential(*interaction_shapes) self.mu = nn.Parameter(torch.ones(1, field_num, 1)) self.reg = nn.Linear(shape_list[-1], 1) self.reg_weight = config.reg_weight nn.init.normal_(self.reg.weight, mean=0, std=0.01) self.sigmoid = nn.Sigmoid() self.reg_loss = RegLoss() self.loss = nn.BCEWithLogitsLoss() self.apply(self._init_other_weights) def _init_other_weights(self, module): if isinstance(module, nn.Embedding): xavier_normal_(module.weight.data) elif isinstance(module, nn.Linear): if module.bias is not None: constant_(module.bias.data, 0)
[docs] def forward(self, interaction): fm_all_embeddings = self.concat_embed_input_fields( interaction ) # [batch_size, num_field, embed_dim] r, p = self.mu * torch.cos(fm_all_embeddings), self.mu * torch.sin( fm_all_embeddings ) o_r, o_p = self.Euler_interaction_layers((r, p)) o_r, o_p = o_r.reshape(o_r.shape[0], -1), o_p.reshape(o_p.shape[0], -1) re, im = self.reg(o_r), self.reg(o_p) logits = re + im return logits.squeeze(-1)
[docs] def calculate_loss(self, interaction): label = interaction[self.LABEL] output = self.forward(interaction) return self.loss(output, label) + self.RegularLoss(self.reg_weight)
[docs] def predict(self, interaction): return self.sigmoid(self.forward(interaction))
[docs] def RegularLoss(self, weight): if weight == 0: return 0 loss = 0 for _ in ["Euler_interaction_layers", "mu", "reg"]: comp = getattr(self, _) if isinstance(comp, nn.Parameter): loss += torch.norm(comp, p=2) continue for params in comp.parameters(): loss += torch.norm(params, p=2) return loss * weight
[docs]class EulerInteractionLayer(nn.Module): r"""Euler interaction layer is the core component of EulerNet, which enables the adaptive learning of explicit feature interactions. An Euler interaction layer performs the feature interaction under the complex space one time, taking as input a complex representation and outputting a transformed complex representation. """ def __init__(self, config, inshape, outshape): super().__init__() self.feature_dim = config.embedding_size self.apply_norm = config.apply_norm init_orders = torch.softmax( torch.randn(inshape // self.feature_dim, outshape // self.feature_dim) / 0.01, dim=0, ) self.inter_orders = nn.Parameter(init_orders) self.im = nn.Linear(inshape, outshape) self.bias_lam = nn.Parameter( torch.randn(1, self.feature_dim, outshape // self.feature_dim) * 0.01 ) self.bias_theta = nn.Parameter( torch.randn(1, self.feature_dim, outshape // self.feature_dim) * 0.01 ) nn.init.normal_(self.im.weight, mean=0, std=0.1) self.drop_ex = nn.Dropout(p=config.drop_ex) self.drop_im = nn.Dropout(p=config.drop_im) self.norm_r = nn.LayerNorm([self.feature_dim]) self.norm_p = nn.LayerNorm([self.feature_dim])
[docs] def forward(self, complex_features): r, p = complex_features lam = r**2 + p**2 + 1e-8 theta = torch.atan2(p, r) lam, theta = lam.reshape(lam.shape[0], -1, self.feature_dim), theta.reshape( theta.shape[0], -1, self.feature_dim ) r, p = self.drop_im(r), self.drop_im(p) lam = 0.5 * torch.log(lam) lam, theta = torch.transpose(lam, -2, -1), torch.transpose(theta, -2, -1) lam, theta = self.drop_ex(lam), self.drop_ex(theta) lam, theta = ( lam @ (self.inter_orders) + self.bias_lam, theta @ (self.inter_orders) + self.bias_theta, ) lam = torch.exp(lam) lam, theta = torch.transpose(lam, -2, -1), torch.transpose(theta, -2, -1) r, p = r.reshape(r.shape[0], -1), p.reshape(p.shape[0], -1) r, p = self.im(r), self.im(p) r, p = torch.relu(r), torch.relu(p) r, p = r.reshape(r.shape[0], -1, self.feature_dim), p.reshape( p.shape[0], -1, self.feature_dim ) o_r, o_p = r + lam * torch.cos(theta), p + lam * torch.sin(theta) o_r, o_p = o_r.reshape(o_r.shape[0], -1, self.feature_dim), o_p.reshape( o_p.shape[0], -1, self.feature_dim ) if self.apply_norm: o_r, o_p = self.norm_r(o_r), self.norm_p(o_p) return o_r, o_p