# @Time : 2020/6/28
# @Author : Zihan Lin
# @Email : linzihan.super@foxmail.com
# UPDATE
# @Time : 2020/10/04, 2021/3/2, 2021/2/17, 2021/6/30, 2022/7/6
# @Author : Shanlei Mu, Yupeng Hou, Jiawei Guan, Xingyu Pan, Gaowei Zhang
# @Email : slmu@ruc.edu.cn, houyupeng@ruc.edu.cn, Guanjw@ruc.edu.cn, xy_pan@foxmail.com, zgw15630559577@163.com
"""
recbole.config.configurator
################################
"""
import re
import os
import sys
import yaml
from logging import getLogger
from typing import Literal
from recbole.evaluator import metric_types, smaller_metrics
from recbole.utils import (
get_model,
Enum,
EvaluatorType,
ModelType,
InputType,
general_arguments,
training_arguments,
evaluation_arguments,
dataset_arguments,
set_color,
)
[docs]class Config(object):
"""Configurator module that load the defined parameters.
Configurator module will first load the default parameters from the fixed properties in RecBole and then
load parameters from the external input.
External input supports three kind of forms: config file, command line and parameter dictionaries.
- config file: It's a file that record the parameters to be modified or added. It should be in ``yaml`` format,
e.g. a config file is 'example.yaml', the content is:
learning_rate: 0.001
train_batch_size: 2048
- command line: It should be in the format as '---learning_rate=0.001'
- parameter dictionaries: It should be a dict, where the key is parameter name and the value is parameter value,
e.g. config_dict = {'learning_rate': 0.001}
Configuration module allows the above three kind of external input format to be used together,
the priority order is as following:
command line > parameter dictionaries > config file
e.g. If we set learning_rate=0.01 in config file, learning_rate=0.02 in command line,
learning_rate=0.03 in parameter dictionaries.
Finally the learning_rate is equal to 0.02.
"""
def __init__(
self, model=None, dataset=None, config_file_list=None, config_dict=None
):
"""
Args:
model (str/AbstractRecommender): the model name or the model class, default is None, if it is None, config
will search the parameter 'model' from the external input as the model name or model class.
dataset (str): the dataset name, default is None, if it is None, config will search the parameter 'dataset'
from the external input as the dataset name.
config_file_list (list of str): the external config file, it allows multiple config files, default is None.
config_dict (dict): the external parameter dictionaries, default is None.
"""
self.compatibility_settings()
self._init_parameters_category()
self.yaml_loader = self._build_yaml_loader()
self.file_config_dict = self._load_config_files(config_file_list)
self.variable_config_dict = self._load_variable_config_dict(config_dict)
self.cmd_config_dict = self._load_cmd_line()
self._merge_external_config_dict()
self.model, self.model_class, self.dataset = self._get_model_and_dataset(
model, dataset
)
self._load_internal_config_dict(self.model, self.model_class, self.dataset)
self.final_config_dict = self._get_final_config_dict()
self._set_default_parameters()
self._init_device()
self._set_train_neg_sample_args()
self._set_eval_neg_sample_args("valid")
self._set_eval_neg_sample_args("test")
def _init_parameters_category(self):
self.parameters = dict()
self.parameters["General"] = general_arguments
self.parameters["Training"] = training_arguments
self.parameters["Evaluation"] = evaluation_arguments
self.parameters["Dataset"] = dataset_arguments
def _build_yaml_loader(self):
loader = yaml.FullLoader
loader.add_implicit_resolver(
"tag:yaml.org,2002:float",
re.compile(
"""^(?:
[-+]?(?:[0-9][0-9_]*)\\.[0-9_]*(?:[eE][-+]?[0-9]+)?
|[-+]?(?:[0-9][0-9_]*)(?:[eE][-+]?[0-9]+)
|\\.[0-9_]+(?:[eE][-+][0-9]+)?
|[-+]?[0-9][0-9_]*(?::[0-5]?[0-9])+\\.[0-9_]*
|[-+]?\\.(?:inf|Inf|INF)
|\\.(?:nan|NaN|NAN))$""",
re.X,
),
list("-+0123456789."),
)
return loader
def _convert_config_dict(self, config_dict):
r"""This function convert the str parameters to their original type."""
for key in config_dict:
param = config_dict[key]
if not isinstance(param, str):
continue
try:
value = eval(param)
if value is not None and not isinstance(
value, (str, int, float, list, tuple, dict, bool, Enum)
):
value = param
except (NameError, SyntaxError, TypeError):
if isinstance(param, str):
if param.lower() == "true":
value = True
elif param.lower() == "false":
value = False
else:
value = param
else:
value = param
config_dict[key] = value
return config_dict
def _load_config_files(self, file_list):
file_config_dict = dict()
if file_list:
for file in file_list:
with open(file, "r", encoding="utf-8") as f:
file_config_dict.update(
yaml.load(f.read(), Loader=self.yaml_loader)
)
return file_config_dict
def _load_variable_config_dict(self, config_dict):
# HyperTuning may set the parameters such as mlp_hidden_size in NeuMF in the format of ['[]', '[]']
# then config_dict will receive a str '[]', but indeed it's a list []
# temporarily use _convert_config_dict to solve this problem
return self._convert_config_dict(config_dict) if config_dict else dict()
def _load_cmd_line(self):
r"""Read parameters from command line and convert it to str."""
cmd_config_dict = dict()
unrecognized_args = []
if "ipykernel_launcher" not in sys.argv[0]:
for arg in sys.argv[1:]:
if not arg.startswith("--") or len(arg[2:].split("=")) != 2:
unrecognized_args.append(arg)
continue
cmd_arg_name, cmd_arg_value = arg[2:].split("=")
if (
cmd_arg_name in cmd_config_dict
and cmd_arg_value != cmd_config_dict[cmd_arg_name]
):
raise SyntaxError(
"There are duplicate commend arg '%s' with different value."
% arg
)
else:
cmd_config_dict[cmd_arg_name] = cmd_arg_value
if len(unrecognized_args) > 0:
logger = getLogger()
logger.warning(
"command line args [{}] will not be used in RecBole".format(
" ".join(unrecognized_args)
)
)
cmd_config_dict = self._convert_config_dict(cmd_config_dict)
return cmd_config_dict
def _merge_external_config_dict(self):
external_config_dict = dict()
external_config_dict.update(self.file_config_dict)
external_config_dict.update(self.variable_config_dict)
external_config_dict.update(self.cmd_config_dict)
self.external_config_dict = external_config_dict
def _get_model_and_dataset(self, model, dataset):
if model is None:
try:
model = self.external_config_dict["model"]
except KeyError:
raise KeyError(
"model need to be specified in at least one of the these ways: "
"[model variable, config file, config dict, command line] "
)
if not isinstance(model, str):
final_model_class = model
final_model = model.__name__
else:
final_model = model
final_model_class = get_model(final_model)
if dataset is None:
try:
final_dataset = self.external_config_dict["dataset"]
except KeyError:
raise KeyError(
"dataset need to be specified in at least one of the these ways: "
"[dataset variable, config file, config dict, command line] "
)
else:
final_dataset = dataset
return final_model, final_model_class, final_dataset
def _update_internal_config_dict(self, file):
with open(file, "r", encoding="utf-8") as f:
config_dict = yaml.load(f.read(), Loader=self.yaml_loader)
if config_dict is not None:
self.internal_config_dict.update(config_dict)
return config_dict
def _load_internal_config_dict(self, model, model_class, dataset):
current_path = os.path.dirname(os.path.realpath(__file__))
overall_init_file = os.path.join(current_path, "../properties/overall.yaml")
model_init_file = os.path.join(
current_path, "../properties/model/" + model + ".yaml"
)
sample_init_file = os.path.join(
current_path, "../properties/dataset/sample.yaml"
)
dataset_init_file = os.path.join(
current_path, "../properties/dataset/" + dataset + ".yaml"
)
quick_start_config_path = os.path.join(
current_path, "../properties/quick_start_config/"
)
context_aware_init = os.path.join(quick_start_config_path, "context-aware.yaml")
context_aware_on_ml_100k_init = os.path.join(
quick_start_config_path, "context-aware_ml-100k.yaml"
)
DIN_init = os.path.join(quick_start_config_path, "sequential_DIN.yaml")
DIN_on_ml_100k_init = os.path.join(
quick_start_config_path, "sequential_DIN_on_ml-100k.yaml"
)
sequential_init = os.path.join(quick_start_config_path, "sequential.yaml")
special_sequential_on_ml_100k_init = os.path.join(
quick_start_config_path, "special_sequential_on_ml-100k.yaml"
)
sequential_embedding_model_init = os.path.join(
quick_start_config_path, "sequential_embedding_model.yaml"
)
knowledge_base_init = os.path.join(
quick_start_config_path, "knowledge_base.yaml"
)
self.internal_config_dict = dict()
for file in [
overall_init_file,
model_init_file,
sample_init_file,
dataset_init_file,
]:
if os.path.isfile(file):
config_dict = self._update_internal_config_dict(file)
if file == dataset_init_file:
self.parameters["Dataset"] += [
key
for key in config_dict.keys()
if key not in self.parameters["Dataset"]
]
self.internal_config_dict["MODEL_TYPE"] = model_class.type
if self.internal_config_dict["MODEL_TYPE"] == ModelType.GENERAL:
pass
elif self.internal_config_dict["MODEL_TYPE"] in {
ModelType.CONTEXT,
ModelType.DECISIONTREE,
}:
self._update_internal_config_dict(context_aware_init)
if dataset == "ml-100k":
self._update_internal_config_dict(context_aware_on_ml_100k_init)
elif self.internal_config_dict["MODEL_TYPE"] == ModelType.SEQUENTIAL:
if model in ["DIN", "DIEN"]:
self._update_internal_config_dict(DIN_init)
if dataset == "ml-100k":
self._update_internal_config_dict(DIN_on_ml_100k_init)
elif model in ["GRU4RecKG", "KSR"]:
self._update_internal_config_dict(sequential_embedding_model_init)
else:
self._update_internal_config_dict(sequential_init)
if dataset == "ml-100k" and model in [
"GRU4RecF",
"SASRecF",
"FDSA",
"S3Rec",
]:
self._update_internal_config_dict(
special_sequential_on_ml_100k_init
)
elif self.internal_config_dict["MODEL_TYPE"] == ModelType.KNOWLEDGE:
self._update_internal_config_dict(knowledge_base_init)
def _get_final_config_dict(self):
final_config_dict = dict()
final_config_dict.update(self.internal_config_dict)
final_config_dict.update(self.external_config_dict)
return final_config_dict
def _set_default_parameters(self):
self.final_config_dict["dataset"] = self.dataset
self.final_config_dict["model"] = self.model
if self.dataset == "ml-100k":
current_path = os.path.dirname(os.path.realpath(__file__))
self.final_config_dict["data_path"] = os.path.join(
current_path, "../dataset_example/" + self.dataset
)
else:
self.final_config_dict["data_path"] = os.path.join(
self.final_config_dict["data_path"], self.dataset
)
if hasattr(self.model_class, "input_type"):
self.final_config_dict["MODEL_INPUT_TYPE"] = self.model_class.input_type
elif "loss_type" in self.final_config_dict:
if self.final_config_dict["loss_type"] in ["CE"]:
if (
self.final_config_dict["MODEL_TYPE"] == ModelType.SEQUENTIAL
and self.final_config_dict.get("train_neg_sample_args", None)
is not None
):
raise ValueError(
f"train_neg_sample_args [{self.final_config_dict['train_neg_sample_args']}] should be None "
f"when the loss_type is CE."
)
self.final_config_dict["MODEL_INPUT_TYPE"] = InputType.POINTWISE
elif self.final_config_dict["loss_type"] in ["BPR"]:
self.final_config_dict["MODEL_INPUT_TYPE"] = InputType.PAIRWISE
else:
raise ValueError(
"Either Model has attr 'input_type',"
"or arg 'loss_type' should exist in config."
)
metrics = self.final_config_dict["metrics"]
if isinstance(metrics, str):
self.final_config_dict["metrics"] = [metrics]
eval_type = set()
for metric in self.final_config_dict["metrics"]:
if metric.lower() in metric_types:
eval_type.add(metric_types[metric.lower()])
else:
raise NotImplementedError(f"There is no metric named '{metric}'")
if len(eval_type) > 1:
raise RuntimeError(
"Ranking metrics and value metrics can not be used at the same time."
)
self.final_config_dict["eval_type"] = eval_type.pop()
if (
self.final_config_dict["MODEL_TYPE"] == ModelType.SEQUENTIAL
and not self.final_config_dict["repeatable"]
):
raise ValueError(
"Sequential models currently only support repeatable recommendation, "
"please set `repeatable` as `True`."
)
valid_metric = self.final_config_dict["valid_metric"].split("@")[0]
self.final_config_dict["valid_metric_bigger"] = (
False if valid_metric.lower() in smaller_metrics else True
)
topk = self.final_config_dict["topk"]
if isinstance(topk, (int, list)):
if isinstance(topk, int):
topk = [topk]
for k in topk:
if k <= 0:
raise ValueError(
f"topk must be a positive integer or a list of positive integers, but get `{k}`"
)
self.final_config_dict["topk"] = topk
else:
raise TypeError(f"The topk [{topk}] must be a integer, list")
if "additional_feat_suffix" in self.final_config_dict:
ad_suf = self.final_config_dict["additional_feat_suffix"]
if isinstance(ad_suf, str):
self.final_config_dict["additional_feat_suffix"] = [ad_suf]
# train_neg_sample_args checking
default_train_neg_sample_args = {
"distribution": "uniform",
"sample_num": 1,
"alpha": 1.0,
"dynamic": False,
"candidate_num": 0,
}
if (
self.final_config_dict.get("neg_sampling") is not None
or self.final_config_dict.get("training_neg_sample_num") is not None
):
logger = getLogger()
logger.warning(
"Warning: Parameter 'neg_sampling' or 'training_neg_sample_num' has been deprecated in the new version, "
"please use 'train_neg_sample_args' instead and check the API documentation for proper usage."
)
if self.final_config_dict.get("train_neg_sample_args") is not None:
if not isinstance(self.final_config_dict["train_neg_sample_args"], dict):
raise ValueError(
f"train_neg_sample_args:[{self.final_config_dict['train_neg_sample_args']}] should be a dict."
)
for op_args in default_train_neg_sample_args:
if op_args not in self.final_config_dict["train_neg_sample_args"]:
self.final_config_dict["train_neg_sample_args"][
op_args
] = default_train_neg_sample_args[op_args]
# eval_args checking
default_eval_args = {
"split": {"RS": [0.8, 0.1, 0.1]},
"order": "RO",
"group_by": "user",
"mode": {"valid": "full", "test": "full"},
}
if not isinstance(self.final_config_dict["eval_args"], dict):
raise ValueError(
f"eval_args:[{self.final_config_dict['eval_args']}] should be a dict."
)
default_eval_args.update(self.final_config_dict["eval_args"])
mode = default_eval_args["mode"]
# backward compatible
if isinstance(mode, str):
default_eval_args["mode"] = {"valid": mode, "test": mode}
# in case there is only one key in `mode`, e.g., mode: {'valid': 'uni100'} or mode: {'test': 'full'}
if isinstance(mode, dict):
default_mode = mode.get("valid", mode.get("test", "full"))
default_eval_args["mode"] = {
"valid": mode.get("valid", default_mode),
"test": mode.get("test", default_mode),
}
self.final_config_dict["eval_args"] = default_eval_args
if (
self.final_config_dict["eval_type"] == EvaluatorType.VALUE
and "full" in self.final_config_dict["eval_args"]["mode"].values()
):
raise NotImplementedError(
"Full sort evaluation do not match value-based metrics!"
)
def _init_device(self):
if isinstance(self.final_config_dict["gpu_id"], tuple):
self.final_config_dict["gpu_id"] = ",".join(
map(str, list(self.final_config_dict["gpu_id"]))
)
else:
self.final_config_dict["gpu_id"] = str(self.final_config_dict["gpu_id"])
gpu_id = self.final_config_dict["gpu_id"]
os.environ["CUDA_VISIBLE_DEVICES"] = gpu_id
import torch
if "local_rank" not in self.final_config_dict:
self.final_config_dict["single_spec"] = True
self.final_config_dict["local_rank"] = 0
self.final_config_dict["device"] = (
torch.device("cpu")
if len(gpu_id) == 0 or not torch.cuda.is_available()
else torch.device("cuda")
)
else:
assert len(gpu_id.split(",")) >= self.final_config_dict["nproc"]
torch.distributed.init_process_group(
backend="nccl",
rank=self.final_config_dict["local_rank"]
+ self.final_config_dict["offset"],
world_size=self.final_config_dict["world_size"],
init_method="tcp://"
+ self.final_config_dict["ip"]
+ ":"
+ str(self.final_config_dict["port"]),
)
self.final_config_dict["device"] = torch.device(
"cuda", self.final_config_dict["local_rank"]
)
self.final_config_dict["single_spec"] = False
torch.cuda.set_device(self.final_config_dict["local_rank"])
if self.final_config_dict["local_rank"] != 0:
self.final_config_dict["state"] = "error"
self.final_config_dict["show_progress"] = False
self.final_config_dict["verbose"] = False
def _set_train_neg_sample_args(self):
train_neg_sample_args = self.final_config_dict.get("train_neg_sample_args")
if train_neg_sample_args is None or train_neg_sample_args == "None":
self.final_config_dict["train_neg_sample_args"] = {
"distribution": "none",
"sample_num": "none",
"alpha": "none",
"dynamic": False,
"candidate_num": 0,
}
else:
if not isinstance(train_neg_sample_args, dict):
raise ValueError(
f"train_neg_sample_args:[{train_neg_sample_args}] should be a dict."
)
distribution = train_neg_sample_args["distribution"]
if distribution is None or distribution == "None":
self.final_config_dict["train_neg_sample_args"] = {
"distribution": "none",
"sample_num": "none",
"alpha": "none",
"dynamic": False,
"candidate_num": 0,
}
elif distribution not in ["uniform", "popularity"]:
raise ValueError(
f"The distribution [{distribution}] of train_neg_sample_args "
f"should in ['uniform', 'popularity']"
)
def _set_eval_neg_sample_args(self, phase: Literal["valid", "test"]):
eval_mode = self.final_config_dict["eval_args"]["mode"][phase]
if not isinstance(eval_mode, str):
raise ValueError(f"mode [{eval_mode}] in eval_args should be a str.")
if eval_mode == "labeled":
eval_neg_sample_args = {"distribution": "none", "sample_num": "none"}
elif eval_mode == "full":
eval_neg_sample_args = {"distribution": "uniform", "sample_num": "none"}
elif eval_mode[0:3] == "uni":
sample_num = int(eval_mode[3:])
eval_neg_sample_args = {"distribution": "uniform", "sample_num": sample_num}
elif eval_mode[0:3] == "pop":
sample_num = int(eval_mode[3:])
eval_neg_sample_args = {
"distribution": "popularity",
"sample_num": sample_num,
}
else:
raise ValueError(f"the mode [{eval_mode}] in eval_args is not supported.")
self.final_config_dict[f"{phase}_neg_sample_args"] = eval_neg_sample_args
def __setitem__(self, key, value):
if not isinstance(key, str):
raise TypeError("index must be a str.")
self.final_config_dict[key] = value
def __getattr__(self, item):
if "final_config_dict" not in self.__dict__:
raise AttributeError(
f"'Config' object has no attribute 'final_config_dict'"
)
if item in self.final_config_dict:
return self.final_config_dict[item]
raise AttributeError(f"'Config' object has no attribute '{item}'")
def __getitem__(self, item):
return self.final_config_dict.get(item)
def __contains__(self, key):
if not isinstance(key, str):
raise TypeError("index must be a str.")
return key in self.final_config_dict
def __str__(self):
args_info = "\n"
for category in self.parameters:
args_info += set_color(category + " Hyper Parameters:\n", "pink")
args_info += "\n".join(
[
(
set_color("{}", "cyan") + " =" + set_color(" {}", "yellow")
).format(arg, value)
for arg, value in self.final_config_dict.items()
if arg in self.parameters[category]
]
)
args_info += "\n\n"
args_info += set_color("Other Hyper Parameters: \n", "pink")
args_info += "\n".join(
[
(set_color("{}", "cyan") + " = " + set_color("{}", "yellow")).format(
arg, value
)
for arg, value in self.final_config_dict.items()
if arg
not in {_ for args in self.parameters.values() for _ in args}.union(
{"model", "dataset", "config_files"}
)
]
)
args_info += "\n\n"
return args_info
def __repr__(self):
return self.__str__()
[docs] def compatibility_settings(self):
import numpy as np
np.bool = np.bool_
np.int = np.int_
np.float = np.float_
np.complex = np.complex_
np.object = np.object_
np.str = np.str_
np.long = np.int_
np.unicode = np.unicode_