[docs]classAbstractSampler(object):""":class:`AbstractSampler` is a abstract class, all sampler should inherit from it. This sampler supports returning a certain number of random value_ids according to the input key_id, and it also supports to prohibit certain key-value pairs by setting used_ids. Args: distribution (str): The string of distribution, which is used for subclass. Attributes: used_ids (numpy.ndarray): The result of :meth:`get_used_ids`. """def__init__(self,distribution,alpha):self.distribution=""self.alpha=alphaself.set_distribution(distribution)self.used_ids=self.get_used_ids()
[docs]defset_distribution(self,distribution):"""Set the distribution of sampler. Args: distribution (str): Distribution of the negative items. """self.distribution=distributionifdistribution=="popularity":self._build_alias_table()
def_uni_sampling(self,sample_num):"""Sample [sample_num] items in the uniform distribution. Args: sample_num (int): the number of samples. Returns: sample_list (np.array): a list of samples. """raiseNotImplementedError("Method [_uni_sampling] should be implemented")def_get_candidates_list(self):"""Get sample candidates list for _pop_sampling() Returns: candidates_list (list): a list of candidates id. """raiseNotImplementedError("Method [_get_candidates_list] should be implemented")def_build_alias_table(self):"""Build alias table for popularity_biased sampling."""candidates_list=self._get_candidates_list()self.prob=dict(Counter(candidates_list))self.alias=self.prob.copy()large_q=[]small_q=[]foriinself.prob:self.alias[i]=-1self.prob[i]=self.prob[i]/len(candidates_list)self.prob[i]=pow(self.prob[i],self.alpha)normalize_count=sum(self.prob.values())foriinself.prob:self.prob[i]=self.prob[i]/normalize_count*len(self.prob)ifself.prob[i]>1:large_q.append(i)elifself.prob[i]<1:small_q.append(i)whilelen(large_q)!=0andlen(small_q)!=0:l=large_q.pop(0)s=small_q.pop(0)self.alias[s]=lself.prob[l]=self.prob[l]-(1-self.prob[s])ifself.prob[l]<1:small_q.append(l)elifself.prob[l]>1:large_q.append(l)def_pop_sampling(self,sample_num):"""Sample [sample_num] items in the popularity-biased distribution. Args: sample_num (int): the number of samples. Returns: sample_list (np.array): a list of samples. """keys=list(self.prob.keys())random_index_list=np.random.randint(0,len(keys),sample_num)random_prob_list=np.random.random(sample_num)final_random_list=[]foridx,probinzip(random_index_list,random_prob_list):ifself.prob[keys[idx]]>prob:final_random_list.append(keys[idx])else:final_random_list.append(self.alias[keys[idx]])returnnp.array(final_random_list)
[docs]defsampling(self,sample_num):"""Sampling [sample_num] item_ids. Args: sample_num (int): the number of samples. Returns: sample_list (np.array): a list of samples and the len is [sample_num]. """ifself.distribution=="uniform":returnself._uni_sampling(sample_num)elifself.distribution=="popularity":returnself._pop_sampling(sample_num)else:raiseNotImplementedError(f"The sampling distribution [{self.distribution}] is not implemented.")
[docs]defget_used_ids(self):""" Returns: numpy.ndarray: Used ids. Index is key_id, and element is a set of value_ids. """raiseNotImplementedError("Method [get_used_ids] should be implemented")
[docs]defsample_by_key_ids(self,key_ids,num):"""Sampling by key_ids. Args: key_ids (numpy.ndarray or list): Input key_ids. num (int): Number of sampled value_ids for each key_id. Returns: torch.tensor: Sampled value_ids. value_ids[0], value_ids[len(key_ids)], value_ids[len(key_ids) * 2], ..., value_id[len(key_ids) * (num - 1)] is sampled for key_ids[0]; value_ids[1], value_ids[len(key_ids) + 1], value_ids[len(key_ids) * 2 + 1], ..., value_id[len(key_ids) * (num - 1) + 1] is sampled for key_ids[1]; ...; and so on. """key_ids=np.array(key_ids)key_num=len(key_ids)total_num=key_num*numif(key_ids==key_ids[0]).all():key_id=key_ids[0]used=np.array(list(self.used_ids[key_id]))value_ids=self.sampling(total_num)check_list=np.arange(total_num)[np.isin(value_ids,used)]whilelen(check_list)>0:value_ids[check_list]=value=self.sampling(len(check_list))mask=np.isin(value,used)check_list=check_list[mask]else:value_ids=np.zeros(total_num,dtype=np.int64)check_list=np.arange(total_num)key_ids=np.tile(key_ids,num)whilelen(check_list)>0:value_ids[check_list]=self.sampling(len(check_list))check_list=np.array([ifori,used,vinzip(check_list,self.used_ids[key_ids[check_list]],value_ids[check_list],)ifvinused])returntorch.tensor(value_ids)
[docs]classSampler(AbstractSampler):""":class:`Sampler` is used to sample negative items for each input user. In order to avoid positive items in train-phase to be sampled in valid-phase, and positive items in train-phase or valid-phase to be sampled in test-phase, we need to input the datasets of all phases for pre-processing. And, before using this sampler, it is needed to call :meth:`set_phase` to get the sampler of corresponding phase. Args: phases (str or list of str): All the phases of input. datasets (Dataset or list of Dataset): All the dataset for each phase. distribution (str, optional): Distribution of the negative items. Defaults to 'uniform'. Attributes: phase (str): the phase of sampler. It will not be set until :meth:`set_phase` is called. """def__init__(self,phases,datasets,distribution="uniform",alpha=1.0):ifnotisinstance(phases,list):phases=[phases]ifnotisinstance(datasets,list):datasets=[datasets]iflen(phases)!=len(datasets):raiseValueError(f"Phases {phases} and datasets {datasets} should have the same length.")self.phases=phasesself.datasets=datasetsself.uid_field=datasets[0].uid_fieldself.iid_field=datasets[0].iid_fieldself.user_num=datasets[0].user_numself.item_num=datasets[0].item_numsuper().__init__(distribution=distribution,alpha=alpha)def_get_candidates_list(self):candidates_list=[]fordatasetinself.datasets:candidates_list.extend(dataset.inter_feat[self.iid_field].numpy())returncandidates_listdef_uni_sampling(self,sample_num):returnnp.random.randint(1,self.item_num,sample_num)
[docs]defget_used_ids(self):""" Returns: dict: Used item_ids is the same as positive item_ids. Key is phase, and value is a numpy.ndarray which index is user_id, and element is a set of item_ids. """used_item_id=dict()last=[set()for_inrange(self.user_num)]forphase,datasetinzip(self.phases,self.datasets):cur=np.array([set(s)forsinlast])foruid,iidinzip(dataset.inter_feat[self.uid_field].numpy(),dataset.inter_feat[self.iid_field].numpy(),):cur[uid].add(iid)last=used_item_id[phase]=curforused_item_setinused_item_id[self.phases[-1]]:iflen(used_item_set)+1==self.item_num:# [pad] is a item.raiseValueError("Some users have interacted with all items, ""which we can not sample negative items for them. ""Please set `user_inter_num_interval` to filter those users.")returnused_item_id
[docs]defset_phase(self,phase):"""Get the sampler of corresponding phase. Args: phase (str): The phase of new sampler. Returns: Sampler: the copy of this sampler, :attr:`phase` is set the same as input phase, and :attr:`used_ids` is set to the value of corresponding phase. """ifphasenotinself.phases:raiseValueError(f"Phase [{phase}] not exist.")new_sampler=copy.copy(self)new_sampler.phase=phasenew_sampler.used_ids=new_sampler.used_ids[phase]returnnew_sampler
[docs]defsample_by_user_ids(self,user_ids,item_ids,num):"""Sampling by user_ids. Args: user_ids (numpy.ndarray or list): Input user_ids. item_ids (numpy.ndarray or list): Input item_ids. num (int): Number of sampled item_ids for each user_id. Returns: torch.tensor: Sampled item_ids. item_ids[0], item_ids[len(user_ids)], item_ids[len(user_ids) * 2], ..., item_id[len(user_ids) * (num - 1)] is sampled for user_ids[0]; item_ids[1], item_ids[len(user_ids) + 1], item_ids[len(user_ids) * 2 + 1], ..., item_id[len(user_ids) * (num - 1) + 1] is sampled for user_ids[1]; ...; and so on. """try:returnself.sample_by_key_ids(user_ids,num)exceptIndexError:foruser_idinuser_ids:ifuser_id<0oruser_id>=self.user_num:raiseValueError(f"user_id [{user_id}] not exist.")
[docs]classKGSampler(AbstractSampler):""":class:`KGSampler` is used to sample negative entities in a knowledge graph. Args: dataset (Dataset): The knowledge graph dataset, which contains triplets in a knowledge graph. distribution (str, optional): Distribution of the negative entities. Defaults to 'uniform'. """def__init__(self,dataset,distribution="uniform",alpha=1.0):self.dataset=datasetself.hid_field=dataset.head_entity_fieldself.tid_field=dataset.tail_entity_fieldself.hid_list=dataset.head_entitiesself.tid_list=dataset.tail_entitiesself.head_entities=set(dataset.head_entities)self.entity_num=dataset.entity_numsuper().__init__(distribution=distribution,alpha=alpha)def_uni_sampling(self,sample_num):returnnp.random.randint(1,self.entity_num,sample_num)def_get_candidates_list(self):returnlist(self.hid_list)+list(self.tid_list)
[docs]defget_used_ids(self):""" Returns: numpy.ndarray: Used entity_ids is the same as tail_entity_ids in knowledge graph. Index is head_entity_id, and element is a set of tail_entity_ids. """used_tail_entity_id=np.array([set()for_inrange(self.entity_num)])forhid,tidinzip(self.hid_list,self.tid_list):used_tail_entity_id[hid].add(tid)forused_tail_setinused_tail_entity_id:iflen(used_tail_set)+1==self.entity_num:# [pad] is a entity.raiseValueError("Some head entities have relation with all entities, ""which we can not sample negative entities for them.")returnused_tail_entity_id
[docs]defsample_by_entity_ids(self,head_entity_ids,num=1):"""Sampling by head_entity_ids. Args: head_entity_ids (numpy.ndarray or list): Input head_entity_ids. num (int, optional): Number of sampled entity_ids for each head_entity_id. Defaults to ``1``. Returns: torch.tensor: Sampled entity_ids. entity_ids[0], entity_ids[len(head_entity_ids)], entity_ids[len(head_entity_ids) * 2], ..., entity_id[len(head_entity_ids) * (num - 1)] is sampled for head_entity_ids[0]; entity_ids[1], entity_ids[len(head_entity_ids) + 1], entity_ids[len(head_entity_ids) * 2 + 1], ..., entity_id[len(head_entity_ids) * (num - 1) + 1] is sampled for head_entity_ids[1]; ...; and so on. """try:returnself.sample_by_key_ids(head_entity_ids,num)exceptIndexError:forhead_entity_idinhead_entity_ids:ifhead_entity_idnotinself.head_entities:raiseValueError(f"head_entity_id [{head_entity_id}] not exist.")
[docs]classRepeatableSampler(AbstractSampler):""":class:`RepeatableSampler` is used to sample negative items for each input user. The difference from :class:`Sampler` is it can only sampling the items that have not appeared at all phases. Args: phases (str or list of str): All the phases of input. dataset (Dataset): The union of all datasets for each phase. distribution (str, optional): Distribution of the negative items. Defaults to 'uniform'. Attributes: phase (str): the phase of sampler. It will not be set until :meth:`set_phase` is called. """def__init__(self,phases,dataset,distribution="uniform",alpha=1.0):ifnotisinstance(phases,list):phases=[phases]self.phases=phasesself.dataset=datasetself.iid_field=dataset.iid_fieldself.user_num=dataset.user_numself.item_num=dataset.item_numsuper().__init__(distribution=distribution,alpha=alpha)def_uni_sampling(self,sample_num):returnnp.random.randint(1,self.item_num,sample_num)def_get_candidates_list(self):returnlist(self.dataset.inter_feat[self.iid_field].numpy())
[docs]defget_used_ids(self):""" Returns: numpy.ndarray: Used item_ids is the same as positive item_ids. Index is user_id, and element is a set of item_ids. """returnnp.array([set()for_inrange(self.user_num)])
[docs]defsample_by_user_ids(self,user_ids,item_ids,num):"""Sampling by user_ids. Args: user_ids (numpy.ndarray or list): Input user_ids. item_ids (numpy.ndarray or list): Input item_ids. num (int): Number of sampled item_ids for each user_id. Returns: torch.tensor: Sampled item_ids. item_ids[0], item_ids[len(user_ids)], item_ids[len(user_ids) * 2], ..., item_id[len(user_ids) * (num - 1)] is sampled for user_ids[0]; item_ids[1], item_ids[len(user_ids) + 1], item_ids[len(user_ids) * 2 + 1], ..., item_id[len(user_ids) * (num - 1) + 1] is sampled for user_ids[1]; ...; and so on. """try:self.used_ids=np.array([{i}foriinitem_ids])returnself.sample_by_key_ids(np.arange(len(user_ids)),num)exceptIndexError:foruser_idinuser_ids:ifuser_id<0oruser_id>=self.user_num:raiseValueError(f"user_id [{user_id}] not exist.")
[docs]defset_phase(self,phase):"""Get the sampler of corresponding phase. Args: phase (str): The phase of new sampler. Returns: Sampler: the copy of this sampler, and :attr:`phase` is set the same as input phase. """ifphasenotinself.phases:raiseValueError(f"Phase [{phase}] not exist.")new_sampler=copy.copy(self)new_sampler.phase=phasereturnnew_sampler
[docs]classSeqSampler(AbstractSampler):""":class:`SeqSampler` is used to sample negative item sequence. Args: datasets (Dataset or list of Dataset): All the dataset for each phase. distribution (str, optional): Distribution of the negative items. Defaults to 'uniform'. """def__init__(self,dataset,distribution="uniform",alpha=1.0):self.dataset=datasetself.iid_field=dataset.iid_fieldself.user_num=dataset.user_numself.item_num=dataset.item_numsuper().__init__(distribution=distribution,alpha=alpha)def_uni_sampling(self,sample_num):returnnp.random.randint(1,self.item_num,sample_num)
[docs]defsample_neg_sequence(self,pos_sequence):"""For each moment, sampling one item from all the items except the one the user clicked on at that moment. Args: pos_sequence (torch.Tensor): all users' item history sequence, with the shape of `(N, )`. Returns: torch.tensor : all users' negative item history sequence. """total_num=len(pos_sequence)value_ids=np.zeros(total_num,dtype=np.int64)check_list=np.arange(total_num)whilelen(check_list)>0:value_ids[check_list]=self.sampling(len(check_list))check_index=np.where(value_ids[check_list]==pos_sequence[check_list])check_list=check_list[check_index]returntorch.tensor(value_ids)