Customize DataLoaders

Here, we present how to develop a new DataLoader, and apply it into our tool. If we have a new model, and there is no special requirement for loading the data, then we need to design a new DataLoader.

Abstract DataLoader

In this project, there are three abstracts: AbstractDataLoader, NegSampleMixin, NegSampleByMixin.

In general, the new dataloader should inherit from the above three abstract classes. If one only needs to modify existing DataLoader, you can also inherit from the it. The documentation of dataloader: recbole.data.dataloader

AbstractDataLoader

AbstractDataLoader is the most basic abstract class, which includes three functions: pr_end(), _shuffle() and _next_batch_data(). pr_end() is the max pr plus 1. _shuffle() is leverage to permute the dataset, which will be invoked by __iter__() if the parameter shuffle is True. _next_batch_data() is used to load the next batch data, and return the Interaction format, which will be invoked in __next__().

In AbstractDataLoader, there are two functions to assist the conversion of _next_batch_data(), one is _dataframe_to_interaction(), and the other is _dict_to_interaction(). They both use the functions with the same name in dataset. The pandas.DataFrame or dict is converted into Interaction.

In addition to the above three functions, two other functions can also be rewrite, that is setup() and data_preprocess().

setup() is used to tackle the problems except initializing the parameters. For example, reset the batch_size, examine the shuffle setting. All these things can be rewritten in the subclass. data_preprocess() is used to process the data, e.g., negative sampling.

At the end of __init__(), setup() will be invoked, and then if real_time is True, then data_preprocess() is recalled.

NegSampleMixin

NegSampleMixin inherent from AbstractDataLoader, which is used for negative sampling. It has three additional functions upon its father class: _batch_size_adaptation(), _neg_sampling() and get_pos_len_list().

Since the positive and negative samples should be framed in the same batch, the original batch size can be not appropriate. _batch_size_adaptation() is used to reset the batch size, such that the positive and negative samples can be in the same batch. _neg_sampling() is used for negative sampling, which should be implemented by the subclass. get_pos_len_list() returns the positive sample number for each user.

In addition, setup() and data_preprocess() are also changed. setup() will call _batch_size_adaptation(), data_preprocess() is used for negative sampling which should be implemented in the subclass.

NegSampleByMixin

NegSampleByMixin inherent from NegSampleMixin, which is used for negative sampling by ratio. It supports two strategies, the first one is pair-wise sampling, the other is point-wise sampling. Then based on the parent class, two functions are added: _neg_sample_by_pair_wise_sampling() and _neg_sample_by_point_wise_sampling().

Example

Here, we take UserDataLoader as the example, this dataloader returns user id, which is leveraged to train the user representations.

Implement __init__()

__init__() can be used to initialize some of the necessary parameters. Here, we just need to record uid_field.

def __init__(self, config, dataset,
             batch_size=1, dl_format=InputType.POINTWISE, shuffle=False):
    self.uid_field = dataset.uid_field

    super().__init__(config=config, dataset=dataset,
                     batch_size=batch_size, dl_format=dl_format, shuffle=shuffle)

Implement setup()

Because of some training requirement, self.shuffle should be true. Then we can check and revise self.shuffle in setup().

def setup(self):
    if self.shuffle is False:
        self.shuffle = True
        self.logger.warning('UserDataLoader must shuffle the data')

Implement pr_end() and _shuffle()

Since this dataloader only returns user id, these function can be implemented readily.

@property
def pr_end(self):
    return len(self.dataset.user_feat)

def _shuffle(self):
    self.dataset.user_feat = self.dataset.user_feat.sample(frac=1).reset_index(drop=True)

Implement _next_batch_data

This function only require return user id from user_feat, we only have to select one column, and use _dataframe_to_interaction() to convert pandas.DataFrame into Interaction.

def _next_batch_data(self):
    cur_data = self.dataset.user_feat[[self.uid_field]][self.pr: self.pr + self.step]
    self.pr += self.step
    return self._dataframe_to_interaction(cur_data)

Complete Code

class UserDataLoader(AbstractDataLoader):
    """:class:`UserDataLoader` will return a batch of data which only contains user-id when it is iterated.

    Args:
        config (Config): The config of dataloader.
        dataset (Dataset): The dataset of dataloader.
        batch_size (int, optional): The batch_size of dataloader. Defaults to ``1``.
        dl_format (InputType, optional): The input type of dataloader. Defaults to
            :obj:`~recbole.utils.enum_type.InputType.POINTWISE`.
        shuffle (bool, optional): Whether the dataloader will be shuffle after a round. Defaults to ``False``.

    Attributes:
        shuffle (bool): Whether the dataloader will be shuffle after a round.
            However, in :class:`UserDataLoader`, it's guaranteed to be ``True``.
    """
    dl_type = DataLoaderType.ORIGIN

    def __init__(self, config, dataset,
                 batch_size=1, dl_format=InputType.POINTWISE, shuffle=False):
        self.uid_field = dataset.uid_field

        super().__init__(config=config, dataset=dataset,
                         batch_size=batch_size, dl_format=dl_format, shuffle=shuffle)

    def setup(self):
        """Make sure that the :attr:`shuffle` is True. If :attr:`shuffle` is False, it will be changed to True
        and give a warning to user.
        """
        if self.shuffle is False:
            self.shuffle = True
            self.logger.warning('UserDataLoader must shuffle the data')

    @property
    def pr_end(self):
        return len(self.dataset.user_feat)

    def _shuffle(self):
        self.dataset.user_feat = self.dataset.user_feat.sample(frac=1).reset_index(drop=True)

    def _next_batch_data(self):
        cur_data = self.dataset.user_feat[[self.uid_field]][self.pr: self.pr + self.step]
        self.pr += self.step
        return self._dataframe_to_interaction(cur_data)

Other more complex Dataloader development can refer to the source code.