Source code for recbole.data.dataset.sequential_dataset

# @Time   : 2020/9/16
# @Author : Yushuo Chen
# @Email  : chenyushuo@ruc.edu.cn

# UPDATE:
# @Time   : 2020/9/16
# @Author : Yushuo Chen
# @Email  : chenyushuo@ruc.edu.cn

"""
recbole.data.sequential_dataset
###############################
"""

import copy

import numpy as np

from recbole.data.dataset import Dataset


[docs]class SequentialDataset(Dataset): """:class:`SequentialDataset` is based on :class:`~recbole.data.dataset.dataset.Dataset`, and provides augmentation interface to adapt to Sequential Recommendation, which can accelerate the data loader. Attributes: uid_list (numpy.ndarray): List of user id after augmentation. item_list_index (numpy.ndarray): List of indexes of item sequence after augmentation. target_index (numpy.ndarray): List of indexes of target item id after augmentation. item_list_length (numpy.ndarray): List of item sequences' length after augmentation. """ def __init__(self, config): super().__init__(config)
[docs] def prepare_data_augmentation(self): """Augmentation processing for sequential dataset. E.g., ``u1`` has purchase sequence ``<i1, i2, i3, i4>``, then after augmentation, we will generate three cases. ``u1, <i1> | i2`` (Which means given user_id ``u1`` and item_seq ``<i1>``, we need to predict the next item ``i2``.) The other cases are below: ``u1, <i1, i2> | i3`` ``u1, <i1, i2, i3> | i4`` Note: Actually, we do not really generate these new item sequences. One user's item sequence is stored only once in memory. We store the index (slice) of each item sequence after augmentation, which saves memory and accelerates a lot. """ self.logger.debug('prepare_data_augmentation') self._check_field('uid_field', 'time_field') max_item_list_len = self.config['MAX_ITEM_LIST_LENGTH'] self.sort(by=[self.uid_field, self.time_field], ascending=True) last_uid = None uid_list, item_list_index, target_index, item_list_length = [], [], [], [] seq_start = 0 for i, uid in enumerate(self.inter_feat[self.uid_field].numpy()): if last_uid != uid: last_uid = uid seq_start = i else: if i - seq_start > max_item_list_len: seq_start += 1 uid_list.append(uid) item_list_index.append(slice(seq_start, i)) target_index.append(i) item_list_length.append(i - seq_start) self.uid_list = np.array(uid_list) self.item_list_index = np.array(item_list_index) self.target_index = np.array(target_index) self.item_list_length = np.array(item_list_length, dtype=np.int64) self.mask = np.ones(len(self.inter_feat), dtype=np.bool)
[docs] def leave_one_out(self, group_by, leave_one_num=1): self.logger.debug(f'Leave one out, group_by=[{group_by}], leave_one_num=[{leave_one_num}].') if group_by is None: raise ValueError('Leave one out strategy require a group field.') if group_by != self.uid_field: raise ValueError('Sequential models require group by user.') self.prepare_data_augmentation() grouped_index = self._grouped_index(self.uid_list) next_index = self._split_index_by_leave_one_out(grouped_index, leave_one_num) self._drop_unused_col() next_ds = [] for index in next_index: ds = copy.copy(self) for field in ['uid_list', 'item_list_index', 'target_index', 'item_list_length']: setattr(ds, field, np.array(getattr(ds, field)[index])) setattr(ds, 'mask', np.ones(len(self.inter_feat), dtype=np.bool)) next_ds.append(ds) next_ds[0].mask[self.target_index[next_index[1] + next_index[2]]] = False next_ds[1].mask[self.target_index[next_index[2]]] = False return next_ds
[docs] def inter_matrix(self, form='coo', value_field=None): """Get sparse matrix that describe interactions between user_id and item_id. Sparse matrix has shape (user_num, item_num). For a row of <src, tgt>, ``matrix[src, tgt] = 1`` if ``value_field`` is ``None``, else ``matrix[src, tgt] = self.inter_feat[src, tgt]``. Args: form (str, optional): Sparse matrix format. Defaults to ``coo``. value_field (str, optional): Data of sparse matrix, which should exist in ``df_feat``. Defaults to ``None``. Returns: scipy.sparse: Sparse matrix in form ``coo`` or ``csr``. """ if not self.uid_field or not self.iid_field: raise ValueError('dataset does not exist uid/iid, thus can not converted to sparse matrix.') self.logger.warning( 'Load interaction matrix may lead to label leakage from testing phase, this implementation ' 'only provides the interactions corresponding to specific phase' ) local_inter_feat = self.inter_feat[self.mask] # TODO: self.mask will applied to _history_matrix() in future return self._create_sparse_matrix(local_inter_feat, self.uid_field, self.iid_field, form, value_field)
[docs] def build(self, eval_setting): self._change_feat_format() ordering_args = eval_setting.ordering_args if ordering_args['strategy'] == 'shuffle': raise ValueError('Ordering strategy `shuffle` is not supported in sequential models.') elif ordering_args['strategy'] == 'by': if ordering_args['field'] != self.time_field: raise ValueError('Sequential models require `TO` (time ordering) strategy.') if ordering_args['ascending'] is not True: raise ValueError('Sequential models require `time_field` to sort in ascending order.') group_field = eval_setting.group_field split_args = eval_setting.split_args if split_args['strategy'] == 'loo': return self.leave_one_out(group_by=group_field, leave_one_num=split_args['leave_one_num']) else: ValueError('Sequential models require `loo` (leave one out) split strategy.')