Source code for recbole.utils.utils

# -*- coding: utf-8 -*-
# @Time   : 2020/7/17
# @Author : Shanlei Mu
# @Email  : slmu@ruc.edu.cn

# UPDATE
# @Time   : 2021/3/8, 2022/7/12, 2023/2/11
# @Author : Jiawei Guan, Lei Wang, Gaowei Zhang
# @Email  : guanjw@ruc.edu.cn, zxcptss@gmail.com, zgw2022101006@ruc.edu.cn

"""
recbole.utils.utils
################################
"""

import datetime
import importlib
import os
import random
import pandas as pd

import numpy as np
import torch
import torch.nn as nn
from torch.utils.tensorboard import SummaryWriter
from texttable import Texttable


from recbole.utils.enum_type import ModelType


[docs]def get_local_time(): r"""Get current time Returns: str: current time """ cur = datetime.datetime.now() cur = cur.strftime("%b-%d-%Y_%H-%M-%S") return cur
[docs]def ensure_dir(dir_path): r"""Make sure the directory exists, if it does not exist, create it Args: dir_path (str): directory path """ if not os.path.exists(dir_path): os.makedirs(dir_path)
[docs]def get_model(model_name): r"""Automatically select model class based on model name Args: model_name (str): model name Returns: Recommender: model class """ model_submodule = [ "general_recommender", "context_aware_recommender", "sequential_recommender", "knowledge_aware_recommender", "exlib_recommender", ] model_file_name = model_name.lower() model_module = None for submodule in model_submodule: module_path = ".".join(["recbole.model", submodule, model_file_name]) if importlib.util.find_spec(module_path, __name__): model_module = importlib.import_module(module_path, __name__) break if model_module is None: raise ValueError( "`model_name` [{}] is not the name of an existing model.".format(model_name) ) model_class = getattr(model_module, model_name) return model_class
[docs]def get_trainer(model_type, model_name): r"""Automatically select trainer class based on model type and model name Args: model_type (ModelType): model type model_name (str): model name Returns: Trainer: trainer class """ try: return getattr( importlib.import_module("recbole.trainer"), model_name + "Trainer" ) except AttributeError: if model_type == ModelType.KNOWLEDGE: return getattr(importlib.import_module("recbole.trainer"), "KGTrainer") elif model_type == ModelType.TRADITIONAL: return getattr( importlib.import_module("recbole.trainer"), "TraditionalTrainer" ) else: return getattr(importlib.import_module("recbole.trainer"), "Trainer")
[docs]def early_stopping(value, best, cur_step, max_step, bigger=True): r"""validation-based early stopping Args: value (float): current result best (float): best result cur_step (int): the number of consecutive steps that did not exceed the best result max_step (int): threshold steps for stopping bigger (bool, optional): whether the bigger the better Returns: tuple: - float, best result after this step - int, the number of consecutive steps that did not exceed the best result after this step - bool, whether to stop - bool, whether to update """ stop_flag = False update_flag = False if bigger: if value >= best: cur_step = 0 best = value update_flag = True else: cur_step += 1 if cur_step > max_step: stop_flag = True else: if value <= best: cur_step = 0 best = value update_flag = True else: cur_step += 1 if cur_step > max_step: stop_flag = True return best, cur_step, stop_flag, update_flag
[docs]def calculate_valid_score(valid_result, valid_metric=None): r"""return valid score from valid result Args: valid_result (dict): valid result valid_metric (str, optional): the selected metric in valid result for valid score Returns: float: valid score """ if valid_metric: return valid_result[valid_metric] else: return valid_result["Recall@10"]
[docs]def dict2str(result_dict): r"""convert result dict to str Args: result_dict (dict): result dict Returns: str: result str """ return " ".join( [str(metric) + " : " + str(value) for metric, value in result_dict.items()] )
[docs]def init_seed(seed, reproducibility): r"""init random seed for random functions in numpy, torch, cuda and cudnn Args: seed (int): random seed reproducibility (bool): Whether to require reproducibility """ random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed(seed) torch.cuda.manual_seed_all(seed) if reproducibility: torch.backends.cudnn.benchmark = False torch.backends.cudnn.deterministic = True else: torch.backends.cudnn.benchmark = True torch.backends.cudnn.deterministic = False
[docs]def get_tensorboard(logger): r"""Creates a SummaryWriter of Tensorboard that can log PyTorch models and metrics into a directory for visualization within the TensorBoard UI. For the convenience of the user, the naming rule of the SummaryWriter's log_dir is the same as the logger. Args: logger: its output filename is used to name the SummaryWriter's log_dir. If the filename is not available, we will name the log_dir according to the current time. Returns: SummaryWriter: it will write out events and summaries to the event file. """ base_path = "log_tensorboard" dir_name = None for handler in logger.handlers: if hasattr(handler, "baseFilename"): dir_name = os.path.basename(getattr(handler, "baseFilename")).split(".")[0] break if dir_name is None: dir_name = "{}-{}".format("model", get_local_time()) dir_path = os.path.join(base_path, dir_name) writer = SummaryWriter(dir_path) return writer
[docs]def get_gpu_usage(device=None): r"""Return the reserved memory and total memory of given device in a string. Args: device: cuda.device. It is the device that the model run on. Returns: str: it contains the info about reserved memory and total memory of given device. """ reserved = torch.cuda.max_memory_reserved(device) / 1024**3 total = torch.cuda.get_device_properties(device).total_memory / 1024**3 return "{:.2f} G/{:.2f} G".format(reserved, total)
[docs]def get_flops(model, dataset, device, logger, transform, verbose=False): r"""Given a model and dataset to the model, compute the per-operator flops of the given model. Args: model: the model to compute flop counts. dataset: dataset that are passed to `model` to count flops. device: cuda.device. It is the device that the model run on. verbose: whether to print information of modules. Returns: total_ops: the number of flops for each operation. """ if model.type == ModelType.DECISIONTREE: return 1 if model.__class__.__name__ == "Pop": return 1 import copy model = copy.deepcopy(model) def count_normalization(m, x, y): x = x[0] flops = torch.DoubleTensor([2 * x.numel()]) m.total_ops += flops def count_embedding(m, x, y): x = x[0] nelements = x.numel() hiddensize = y.shape[-1] m.total_ops += nelements * hiddensize class TracingAdapter(torch.nn.Module): def __init__(self, rec_model): super().__init__() self.model = rec_model def forward(self, interaction): return self.model.predict(interaction) custom_ops = { torch.nn.Embedding: count_embedding, torch.nn.LayerNorm: count_normalization, } wrapper = TracingAdapter(model) inter = dataset[torch.tensor([1])].to(device) inter = transform(dataset, inter) inputs = (inter,) from thop.profile import register_hooks from thop.vision.basic_hooks import count_parameters handler_collection = {} fn_handles = [] params_handles = [] types_collection = set() if custom_ops is None: custom_ops = {} def add_hooks(m: nn.Module): m.register_buffer("total_ops", torch.zeros(1, dtype=torch.float64)) m.register_buffer("total_params", torch.zeros(1, dtype=torch.float64)) m_type = type(m) fn = None if m_type in custom_ops: fn = custom_ops[m_type] if m_type not in types_collection and verbose: logger.info("Customize rule %s() %s." % (fn.__qualname__, m_type)) elif m_type in register_hooks: fn = register_hooks[m_type] if m_type not in types_collection and verbose: logger.info("Register %s() for %s." % (fn.__qualname__, m_type)) else: if m_type not in types_collection and verbose: logger.warning( "[WARN] Cannot find rule for %s. Treat it as zero Macs and zero Params." % m_type ) if fn is not None: handle_fn = m.register_forward_hook(fn) handle_paras = m.register_forward_hook(count_parameters) handler_collection[m] = ( handle_fn, handle_paras, ) fn_handles.append(handle_fn) params_handles.append(handle_paras) types_collection.add(m_type) prev_training_status = wrapper.training wrapper.eval() wrapper.apply(add_hooks) with torch.no_grad(): wrapper(*inputs) def dfs_count(module: nn.Module, prefix="\t"): total_ops, total_params = module.total_ops.item(), 0 ret_dict = {} for n, m in module.named_children(): next_dict = {} if m in handler_collection and not isinstance( m, (nn.Sequential, nn.ModuleList) ): m_ops, m_params = m.total_ops.item(), m.total_params.item() else: m_ops, m_params, next_dict = dfs_count(m, prefix=prefix + "\t") ret_dict[n] = (m_ops, m_params, next_dict) total_ops += m_ops total_params += m_params return total_ops, total_params, ret_dict total_ops, total_params, ret_dict = dfs_count(wrapper) # reset wrapper to original status wrapper.train(prev_training_status) for m, (op_handler, params_handler) in handler_collection.items(): m._buffers.pop("total_ops") m._buffers.pop("total_params") for i in range(len(fn_handles)): fn_handles[i].remove() params_handles[i].remove() return total_ops
[docs]def list_to_latex(convert_list, bigger_flag=True, subset_columns=[]): result = {} for d in convert_list: for key, value in d.items(): if key in result: result[key].append(value) else: result[key] = [value] df = pd.DataFrame.from_dict(result, orient="index").T if len(subset_columns) == 0: tex = df.to_latex(index=False) return df, tex def bold_func(x, bigger_flag): if bigger_flag: return np.where(x == np.max(x.to_numpy()), "font-weight:bold", None) else: return np.where(x == np.min(x.to_numpy()), "font-weight:bold", None) style = df.style style.apply(bold_func, bigger_flag=bigger_flag, subset=subset_columns) style.format(precision=4) num_column = len(df.columns) column_format = "c" * num_column tex = style.hide(axis="index").to_latex( caption="Result Table", label="Result Table", convert_css=True, hrules=True, column_format=column_format, ) return df, tex
[docs]def get_environment(config): gpu_usage = ( get_gpu_usage(config["device"]) if torch.cuda.is_available() and config["use_gpu"] else "0.0 / 0.0" ) import psutil memory_used = psutil.Process(os.getpid()).memory_info().rss / 1024**3 memory_total = psutil.virtual_memory()[0] / 1024**3 memory_usage = "{:.2f} G/{:.2f} G".format(memory_used, memory_total) cpu_usage = "{:.2f} %".format(psutil.cpu_percent(interval=1)) """environment_data = [ {"Environment": "CPU", "Usage": cpu_usage,}, {"Environment": "GPU", "Usage": gpu_usage, }, {"Environment": "Memory", "Usage": memory_usage, }, ]""" table = Texttable() table.set_cols_align(["l", "c"]) table.set_cols_valign(["m", "m"]) table.add_rows( [ ["Environment", "Usage"], ["CPU", cpu_usage], ["GPU", gpu_usage], ["Memory", memory_usage], ] ) return table