Source code for recbole.data.dataloader.general_dataloader

# @Time   : 2020/7/7
# @Author : Yupeng Hou
# @Email  : houyupeng@ruc.edu.cn

# UPDATE
# @Time   : 2020/9/9, 2020/9/29
# @Author : Yupeng Hou, Yushuo Chen
# @email  : houyupeng@ruc.edu.cn, chenyushuo@ruc.edu.cn

"""
recbole.data.dataloader.general_dataloader
################################################
"""

import numpy as np
import torch

from recbole.data.dataloader.abstract_dataloader import AbstractDataLoader
from recbole.data.dataloader.neg_sample_mixin import NegSampleMixin, NegSampleByMixin
from recbole.data.interaction import Interaction, cat_interactions
from recbole.utils import DataLoaderType, InputType


[docs]class GeneralDataLoader(AbstractDataLoader): """:class:`GeneralDataLoader` is used for general model and it just return the origin data. Args: config (Config): The config of dataloader. dataset (Dataset): The dataset of dataloader. batch_size (int, optional): The batch_size of dataloader. Defaults to ``1``. dl_format (InputType, optional): The input type of dataloader. Defaults to :obj:`~recbole.utils.enum_type.InputType.POINTWISE`. shuffle (bool, optional): Whether the dataloader will be shuffle after a round. Defaults to ``False``. """ dl_type = DataLoaderType.ORIGIN def __init__(self, config, dataset, batch_size=1, dl_format=InputType.POINTWISE, shuffle=False): super().__init__(config, dataset, batch_size=batch_size, dl_format=dl_format, shuffle=shuffle) @property def pr_end(self): return len(self.dataset) def _shuffle(self): self.dataset.shuffle() def _next_batch_data(self): cur_data = self.dataset[self.pr:self.pr + self.step] self.pr += self.step return cur_data
[docs]class GeneralNegSampleDataLoader(NegSampleByMixin, AbstractDataLoader): """:class:`GeneralNegSampleDataLoader` is a general-dataloader with negative sampling. For the result of every batch, we permit that every positive interaction and its negative interaction must be in the same batch. Beside this, when it is in the evaluation stage, and evaluator is topk-like function, we also permit that all the interactions corresponding to each user are in the same batch and positive interactions are before negative interactions. Args: config (Config): The config of dataloader. dataset (Dataset): The dataset of dataloader. sampler (Sampler): The sampler of dataloader. neg_sample_args (dict): The neg_sample_args of dataloader. batch_size (int, optional): The batch_size of dataloader. Defaults to ``1``. dl_format (InputType, optional): The input type of dataloader. Defaults to :obj:`~recbole.utils.enum_type.InputType.POINTWISE`. shuffle (bool, optional): Whether the dataloader will be shuffle after a round. Defaults to ``False``. """ def __init__( self, config, dataset, sampler, neg_sample_args, batch_size=1, dl_format=InputType.POINTWISE, shuffle=False ): self.uid_field = dataset.uid_field self.iid_field = dataset.iid_field self.uid_list, self.uid2index, self.uid2items_num = None, None, None super().__init__( config, dataset, sampler, neg_sample_args, batch_size=batch_size, dl_format=dl_format, shuffle=shuffle )
[docs] def setup(self): if self.user_inter_in_one_batch: uid_field = self.dataset.uid_field user_num = self.dataset.user_num self.dataset.sort(by=uid_field, ascending=True) self.uid_list = [] start, end = dict(), dict() for i, uid in enumerate(self.dataset.inter_feat[uid_field].numpy()): if uid not in start: self.uid_list.append(uid) start[uid] = i end[uid] = i self.uid2index = np.array([None] * user_num) self.uid2items_num = np.zeros(user_num, dtype=np.int64) for uid in self.uid_list: self.uid2index[uid] = slice(start[uid], end[uid] + 1) self.uid2items_num[uid] = end[uid] - start[uid] + 1 self.uid_list = np.array(self.uid_list) self._batch_size_adaptation()
def _batch_size_adaptation(self): if self.user_inter_in_one_batch: inters_num = sorted(self.uid2items_num * self.times, reverse=True) batch_num = 1 new_batch_size = inters_num[0] for i in range(1, len(inters_num)): if new_batch_size + inters_num[i] > self.batch_size: break batch_num = i + 1 new_batch_size += inters_num[i] self.step = batch_num self.upgrade_batch_size(new_batch_size) else: batch_num = max(self.batch_size // self.times, 1) new_batch_size = batch_num * self.times self.step = batch_num self.upgrade_batch_size(new_batch_size) @property def pr_end(self): if self.user_inter_in_one_batch: return len(self.uid_list) else: return len(self.dataset) def _shuffle(self): if self.user_inter_in_one_batch: np.random.shuffle(self.uid_list) else: self.dataset.shuffle() def _next_batch_data(self): if self.user_inter_in_one_batch: uid_list = self.uid_list[self.pr:self.pr + self.step] data_list = [] for uid in uid_list: index = self.uid2index[uid] data_list.append(self._neg_sampling(self.dataset[index])) cur_data = cat_interactions(data_list) pos_len_list = self.uid2items_num[uid_list] user_len_list = pos_len_list * self.times cur_data.set_additional_info(list(pos_len_list), list(user_len_list)) self.pr += self.step return cur_data else: cur_data = self._neg_sampling(self.dataset[self.pr:self.pr + self.step]) self.pr += self.step return cur_data def _neg_sampling(self, inter_feat): uids = inter_feat[self.uid_field] neg_iids = self.sampler.sample_by_user_ids(uids, self.neg_sample_by) return self.sampling_func(inter_feat, neg_iids) def _neg_sample_by_pair_wise_sampling(self, inter_feat, neg_iids): inter_feat = inter_feat.repeat(self.times) neg_item_feat = Interaction({self.iid_field: neg_iids}) neg_item_feat = self.dataset.join(neg_item_feat) neg_item_feat.add_prefix(self.neg_prefix) inter_feat.update(neg_item_feat) return inter_feat def _neg_sample_by_point_wise_sampling(self, inter_feat, neg_iids): pos_inter_num = len(inter_feat) new_data = inter_feat.repeat(self.times) new_data[self.iid_field][pos_inter_num:] = neg_iids new_data = self.dataset.join(new_data) labels = torch.zeros(pos_inter_num * self.times) labels[:pos_inter_num] = 1.0 new_data.update(Interaction({self.label_field: labels})) return new_data
[docs] def get_pos_len_list(self): """ Returns: numpy.ndarray: Number of positive item for each user in a training/evaluating epoch. """ return self.uid2items_num[self.uid_list]
[docs] def get_user_len_list(self): """ Returns: numpy.ndarray: Number of all item for each user in a training/evaluating epoch. """ return self.uid2items_num[self.uid_list] * self.times
[docs]class GeneralFullDataLoader(NegSampleMixin, AbstractDataLoader): """:class:`GeneralFullDataLoader` is a general-dataloader with full sort. In order to speed up calculation, this dataloader would only return then user part of interactions, positive items and used items. It would not return negative items. Args: config (Config): The config of dataloader. dataset (Dataset): The dataset of dataloader. sampler (Sampler): The sampler of dataloader. neg_sample_args (dict): The neg_sample_args of dataloader. batch_size (int, optional): The batch_size of dataloader. Defaults to ``1``. dl_format (InputType, optional): The input type of dataloader. Defaults to :obj:`~recbole.utils.enum_type.InputType.POINTWISE`. shuffle (bool, optional): Whether the dataloader will be shuffle after a round. Defaults to ``False``. """ dl_type = DataLoaderType.FULL def __init__( self, config, dataset, sampler, neg_sample_args, batch_size=1, dl_format=InputType.POINTWISE, shuffle=False ): if neg_sample_args['strategy'] != 'full': raise ValueError('neg_sample strategy in GeneralFullDataLoader() should be `full`') uid_field = dataset.uid_field iid_field = dataset.iid_field user_num = dataset.user_num self.uid_list = [] self.uid2items_num = np.zeros(user_num, dtype=np.int64) self.uid2swap_idx = np.array([None] * user_num) self.uid2rev_swap_idx = np.array([None] * user_num) self.uid2history_item = np.array([None] * user_num) dataset.sort(by=uid_field, ascending=True) last_uid = None positive_item = set() uid2used_item = sampler.used_ids for uid, iid in zip(dataset.inter_feat[uid_field].numpy(), dataset.inter_feat[iid_field].numpy()): if uid != last_uid: self._set_user_property(last_uid, uid2used_item[last_uid], positive_item) last_uid = uid self.uid_list.append(uid) positive_item = set() positive_item.add(iid) self._set_user_property(last_uid, uid2used_item[last_uid], positive_item) self.uid_list = torch.tensor(self.uid_list) self.user_df = dataset.join(Interaction({uid_field: self.uid_list})) super().__init__( config, dataset, sampler, neg_sample_args, batch_size=batch_size, dl_format=dl_format, shuffle=shuffle ) def _set_user_property(self, uid, used_item, positive_item): if uid is None: return history_item = used_item - positive_item positive_item_num = len(positive_item) self.uid2items_num[uid] = positive_item_num swap_idx = torch.tensor(sorted(set(range(positive_item_num)) ^ positive_item)) self.uid2swap_idx[uid] = swap_idx self.uid2rev_swap_idx[uid] = swap_idx.flip(0) self.uid2history_item[uid] = torch.tensor(list(history_item), dtype=torch.int64) def _batch_size_adaptation(self): batch_num = max(self.batch_size // self.dataset.item_num, 1) new_batch_size = batch_num * self.dataset.item_num self.step = batch_num self.upgrade_batch_size(new_batch_size) @property def pr_end(self): return len(self.uid_list) def _shuffle(self): self.logger.warnning('GeneralFullDataLoader can\'t shuffle') def _next_batch_data(self): user_df = self.user_df[self.pr:self.pr + self.step] cur_data = self._neg_sampling(user_df) self.pr += self.step return cur_data def _neg_sampling(self, user_df): uid_list = list(user_df[self.dataset.uid_field]) pos_len_list = self.uid2items_num[uid_list] user_len_list = np.full(len(uid_list), self.item_num) user_df.set_additional_info(pos_len_list, user_len_list) history_item = self.uid2history_item[uid_list] history_row = torch.cat([torch.full_like(hist_iid, i) for i, hist_iid in enumerate(history_item)]) history_col = torch.cat(list(history_item)) swap_idx = self.uid2swap_idx[uid_list] rev_swap_idx = self.uid2rev_swap_idx[uid_list] swap_row = torch.cat([torch.full_like(swap, i) for i, swap in enumerate(swap_idx)]) swap_col_after = torch.cat(list(swap_idx)) swap_col_before = torch.cat(list(rev_swap_idx)) return user_df, (history_row, history_col), swap_row, swap_col_after, swap_col_before
[docs] def get_pos_len_list(self): """ Returns: numpy.ndarray: Number of positive item for each user in a training/evaluating epoch. """ return self.uid2items_num[self.uid_list]
[docs] def get_user_len_list(self): """ Returns: numpy.ndarray: Number of all item for each user in a training/evaluating epoch. """ return np.full(self.pr_end, self.item_num)