Source code for recbole.data.dataloader.general_dataloader

# @Time   : 2020/7/7
# @Author : Yupeng Hou
# @Email  : houyupeng@ruc.edu.cn

# UPDATE
# @Time   : 2020/9/9, 2020/9/29
# @Author : Yupeng Hou, Yushuo Chen
# @email  : houyupeng@ruc.edu.cn, chenyushuo@ruc.edu.cn

"""
recbole.data.dataloader.general_dataloader
################################################
"""

import numpy as np
import pandas as pd
import torch
from tqdm import tqdm

from recbole.data.dataloader.abstract_dataloader import AbstractDataLoader
from recbole.data.dataloader.neg_sample_mixin import NegSampleMixin, NegSampleByMixin
from recbole.utils import DataLoaderType, InputType


[docs]class GeneralDataLoader(AbstractDataLoader): """:class:`GeneralDataLoader` is used for general model and it just return the origin data. Args: config (Config): The config of dataloader. dataset (Dataset): The dataset of dataloader. batch_size (int, optional): The batch_size of dataloader. Defaults to ``1``. dl_format (InputType, optional): The input type of dataloader. Defaults to :obj:`~recbole.utils.enum_type.InputType.POINTWISE`. shuffle (bool, optional): Whether the dataloader will be shuffle after a round. Defaults to ``False``. """ dl_type = DataLoaderType.ORIGIN def __init__(self, config, dataset, batch_size=1, dl_format=InputType.POINTWISE, shuffle=False): super().__init__(config, dataset, batch_size=batch_size, dl_format=dl_format, shuffle=shuffle) @property def pr_end(self): return len(self.dataset) def _shuffle(self): self.dataset.shuffle() def _next_batch_data(self): cur_data = self.dataset[self.pr: self.pr + self.step] self.pr += self.step return self._dataframe_to_interaction(cur_data)
[docs]class GeneralNegSampleDataLoader(NegSampleByMixin, AbstractDataLoader): """:class:`GeneralNegSampleDataLoader` is a general-dataloader with negative sampling. For the result of every batch, we permit that every positive interaction and its negative interaction must be in the same batch. Beside this, when it is in the evaluation stage, and evaluator is topk-like function, we also permit that all the interactions corresponding to each user are in the same batch and positive interactions are before negative interactions. Args: config (Config): The config of dataloader. dataset (Dataset): The dataset of dataloader. sampler (Sampler): The sampler of dataloader. neg_sample_args (dict): The neg_sample_args of dataloader. batch_size (int, optional): The batch_size of dataloader. Defaults to ``1``. dl_format (InputType, optional): The input type of dataloader. Defaults to :obj:`~recbole.utils.enum_type.InputType.POINTWISE`. shuffle (bool, optional): Whether the dataloader will be shuffle after a round. Defaults to ``False``. """ def __init__(self, config, dataset, sampler, neg_sample_args, batch_size=1, dl_format=InputType.POINTWISE, shuffle=False): self.uid2index, self.uid2items_num = None, None super().__init__(config, dataset, sampler, neg_sample_args, batch_size=batch_size, dl_format=dl_format, shuffle=shuffle)
[docs] def setup(self): if self.user_inter_in_one_batch: self.uid2index, self.uid2items_num = self.dataset.uid2index self._batch_size_adaptation()
[docs] def data_preprocess(self): if self.user_inter_in_one_batch: new_inter_num = 0 new_inter_feat = [] new_uid2index = [] for uid, index in self.uid2index: new_inter_feat.append(self._neg_sampling(self.dataset.inter_feat[index])) new_num = len(new_inter_feat[-1]) new_uid2index.append((uid, slice(new_inter_num, new_inter_num + new_num))) new_inter_num += new_num self.dataset.inter_feat = pd.concat(new_inter_feat, ignore_index=True) self.uid2index = np.array(new_uid2index) else: self.dataset.inter_feat = self._neg_sampling(self.dataset.inter_feat)
def _batch_size_adaptation(self): if self.user_inter_in_one_batch: inters_num = sorted(self.uid2items_num * self.times, reverse=True) batch_num = 1 new_batch_size = inters_num[0] for i in range(1, len(inters_num)): if new_batch_size + inters_num[i] > self.batch_size: break batch_num = i new_batch_size += inters_num[i] self.step = batch_num self.set_batch_size(new_batch_size) else: batch_num = max(self.batch_size // self.times, 1) new_batch_size = batch_num * self.times self.step = batch_num if self.real_time else new_batch_size self.set_batch_size(new_batch_size) @property def pr_end(self): if self.user_inter_in_one_batch: return len(self.uid2index) else: return len(self.dataset) def _shuffle(self): if self.user_inter_in_one_batch: new_index = np.random.permutation(len(self.uid2index)) self.uid2index = self.uid2index[new_index] self.uid2items_num = self.uid2items_num[new_index] else: self.dataset.shuffle() def _next_batch_data(self): if self.user_inter_in_one_batch: sampling_func = self._neg_sampling if self.real_time else (lambda x: x) cur_data = [] for uid, index in self.uid2index[self.pr: self.pr + self.step]: cur_data.append(sampling_func(self.dataset[index])) cur_data = pd.concat(cur_data, ignore_index=True) pos_len_list = self.uid2items_num[self.pr: self.pr + self.step] user_len_list = pos_len_list * self.times self.pr += self.step return self._dataframe_to_interaction(cur_data, list(pos_len_list), list(user_len_list)) else: cur_data = self.dataset[self.pr: self.pr + self.step] self.pr += self.step if self.real_time: cur_data = self._neg_sampling(cur_data) return self._dataframe_to_interaction(cur_data) def _neg_sampling(self, inter_feat): uid_field = self.config['USER_ID_FIELD'] iid_field = self.config['ITEM_ID_FIELD'] uids = inter_feat[uid_field].to_list() neg_iids = self.sampler.sample_by_user_ids(uids, self.neg_sample_by) return self.sampling_func(uid_field, iid_field, neg_iids, inter_feat) def _neg_sample_by_pair_wise_sampling(self, uid_field, iid_field, neg_iids, inter_feat): inter_feat.insert(len(inter_feat.columns), self.neg_item_id, neg_iids) if self.dataset.item_feat is not None: neg_prefix = self.config['NEG_PREFIX'] neg_item_feat = self.dataset.item_feat.add_prefix(neg_prefix) inter_feat = pd.merge(inter_feat, neg_item_feat, on=self.neg_item_id, how='left', suffixes=('_inter', '_item')) return inter_feat def _neg_sample_by_point_wise_sampling(self, uid_field, iid_field, neg_iids, inter_feat): pos_inter_num = len(inter_feat) new_df = pd.concat([inter_feat] * self.times, ignore_index=True) new_df[iid_field].values[pos_inter_num:] = neg_iids labels = np.zeros(pos_inter_num * self.times, dtype=np.int64) labels[: pos_inter_num] = 1 new_df[self.label_field] = labels return new_df
[docs] def get_pos_len_list(self): """ Returns: np.ndarray or list: Number of positive item for each user in a training/evaluating epoch. """ return self.uid2items_num
[docs]class GeneralFullDataLoader(NegSampleMixin, AbstractDataLoader): """:class:`GeneralFullDataLoader` is a general-dataloader with full sort. In order to speed up calculation, this dataloader would only return then user part of interactions, positive items and used items. It would not return negative items. Args: config (Config): The config of dataloader. dataset (Dataset): The dataset of dataloader. sampler (Sampler): The sampler of dataloader. neg_sample_args (dict): The neg_sample_args of dataloader. batch_size (int, optional): The batch_size of dataloader. Defaults to ``1``. dl_format (InputType, optional): The input type of dataloader. Defaults to :obj:`~recbole.utils.enum_type.InputType.POINTWISE`. shuffle (bool, optional): Whether the dataloader will be shuffle after a round. Defaults to ``False``. """ dl_type = DataLoaderType.FULL def __init__(self, config, dataset, sampler, neg_sample_args, batch_size=1, dl_format=InputType.POINTWISE, shuffle=False): if neg_sample_args['strategy'] != 'full': raise ValueError('neg_sample strategy in GeneralFullDataLoader() should be `full`') self.uid2index, self.uid2items_num = dataset.uid2index super().__init__(config, dataset, sampler, neg_sample_args, batch_size=batch_size, dl_format=dl_format, shuffle=shuffle)
[docs] def data_preprocess(self): self.user_tensor, tmp_pos_idx, tmp_used_idx, self.pos_len_list, self.neg_len_list = \ self._neg_sampling(self.uid2index, show_progress=True) tmp_pos_len_list = [sum(self.pos_len_list[_: _ + self.step]) for _ in range(0, self.pr_end, self.step)] tot_item_num = self.dataset.item_num tmp_used_len_list = [sum( [tot_item_num - x for x in self.neg_len_list[_: _ + self.step]] ) for _ in range(0, self.pr_end, self.step)] self.pos_idx = list(torch.split(tmp_pos_idx, tmp_pos_len_list)) self.used_idx = list(torch.split(tmp_used_idx, tmp_used_len_list)) for i in range(len(self.pos_idx)): self.pos_idx[i] -= i * tot_item_num * self.step for i in range(len(self.used_idx)): self.used_idx[i] -= i * tot_item_num * self.step
def _batch_size_adaptation(self): batch_num = max(self.batch_size // self.dataset.item_num, 1) new_batch_size = batch_num * self.dataset.item_num self.step = batch_num self.set_batch_size(new_batch_size) @property def pr_end(self): return len(self.uid2index) def _shuffle(self): self.logger.warnning('GeneralFullDataLoader can\'t shuffle') def _next_batch_data(self): if not self.real_time: slc = slice(self.pr, self.pr + self.step) idx = self.pr // self.step cur_data = self.user_tensor[slc], self.pos_idx[idx], self.used_idx[idx], \ self.pos_len_list[slc], self.neg_len_list[slc] else: cur_data = self._neg_sampling(self.uid2index[self.pr: self.pr + self.step]) self.pr += self.step return cur_data def _neg_sampling(self, uid2index, show_progress=False): uid_field = self.dataset.uid_field iid_field = self.dataset.iid_field tot_item_num = self.dataset.item_num start_idx = 0 pos_len_list = [] neg_len_list = [] pos_idx = [] used_idx = [] iter_data = tqdm(uid2index) if show_progress else uid2index for uid, index in iter_data: pos_item_id = self.dataset.inter_feat[iid_field][index].values pos_idx.extend([_ + start_idx for _ in pos_item_id]) pos_num = len(pos_item_id) pos_len_list.append(pos_num) used_item_id = self.sampler.used_ids[uid] used_idx.extend([_ + start_idx for _ in used_item_id]) used_num = len(used_item_id) neg_num = tot_item_num - used_num neg_len_list.append(neg_num) start_idx += tot_item_num user_df = pd.DataFrame({uid_field: np.array(uid2index[:, 0], dtype=np.int)}) user_interaction = self._dataframe_to_interaction(self.join(user_df)) return user_interaction, \ torch.LongTensor(pos_idx), torch.LongTensor(used_idx), \ pos_len_list, neg_len_list
[docs] def get_pos_len_list(self): """ Returns: np.ndarray or list: Number of positive item for each user in a training/evaluating epoch. """ return self.uid2items_num