Source code for recbole.trainer.hyper_tuning

# -*- coding: utf-8 -*-
# @Time   : 2020/7/19 19:06
# @Author : Shanlei Mu
# @Email  : slmu@ruc.edu.cn
# @File   : hyper_tuning.py

# UPDATE:
# @Time   : 2022/7/7, 2023/2/11
# @Author : Gaowei Zhang
# @Email  : zgw15630559577@163.com

"""
recbole.trainer.hyper_tuning
############################
"""

from functools import partial

import numpy as np

from recbole.utils.utils import dict2str


def _recursiveFindNodes(root, node_type="switch"):
    from hyperopt.pyll.base import Apply

    nodes = []
    if isinstance(root, (list, tuple)):
        for node in root:
            nodes.extend(_recursiveFindNodes(node, node_type))
    elif isinstance(root, dict):
        for node in root.values():
            nodes.extend(_recursiveFindNodes(node, node_type))
    elif isinstance(root, (Apply)):
        if root.name == node_type:
            nodes.append(root)

        for node in root.pos_args:
            if node.name == node_type:
                nodes.append(node)
        for _, node in root.named_args:
            if node.name == node_type:
                nodes.append(node)
    return nodes


def _parameters(space):
    # Analyze the domain instance to find parameters
    parameters = {}
    if isinstance(space, dict):
        space = list(space.values())
    for node in _recursiveFindNodes(space, "switch"):
        # Find the name of this parameter
        paramNode = node.pos_args[0]
        assert paramNode.name == "hyperopt_param"
        paramName = paramNode.pos_args[0].obj

        # Find all possible choices for this parameter
        values = [literal.obj for literal in node.pos_args[1:]]
        parameters[paramName] = np.array(range(len(values)))
    return parameters


def _spacesize(space):
    # Compute the number of possible combinations
    params = _parameters(space)
    return np.prod([len(values) for values in params.values()])


[docs]class ExhaustiveSearchError(Exception): r"""ExhaustiveSearchError""" pass
def _validate_space_exhaustive_search(space): from hyperopt.pyll.base import dfs, as_apply from hyperopt.pyll.stochastic import implicit_stochastic_symbols supported_stochastic_symbols = [ "randint", "quniform", "qloguniform", "qnormal", "qlognormal", "categorical", ] for node in dfs(as_apply(space)): if node.name in implicit_stochastic_symbols: if node.name not in supported_stochastic_symbols: raise ExhaustiveSearchError( "Exhaustive search is only possible with the following stochastic symbols: " "" + ", ".join(supported_stochastic_symbols) )
[docs]class HyperTuning(object): r"""HyperTuning Class is used to manage the parameter tuning process of recommender system models. Given objective funciton, parameters range and optimization algorithm, using HyperTuning can find the best result among these parameters Note: HyperTuning is based on the hyperopt (https://github.com/hyperopt/hyperopt) Thanks to sbrodeur for the exhaustive search code. https://github.com/hyperopt/hyperopt/issues/200 """ def __init__( self, objective_function, space=None, params_file=None, params_dict=None, fixed_config_file_list=None, display_file=None, algo="exhaustive", max_evals=100, early_stop=10, ): self.best_score = None self.best_params = None self.best_test_result = None self.params2result = {} self.params_list = [] self.score_list = [] self.objective_function = objective_function self.max_evals = max_evals self.fixed_config_file_list = fixed_config_file_list self.display_file = display_file if space: self.space = space elif params_file: self.space = self._build_space_from_file(params_file) elif params_dict: self.space = self._build_space_from_dict(params_dict) else: raise ValueError( "at least one of `space`, `params_file` and `params_dict` is provided" ) if isinstance(algo, str): if algo == "exhaustive": self.algo = partial(exhaustive_search, nbMaxSucessiveFailures=1000) self.max_evals = _spacesize(self.space) elif algo == "random": from hyperopt import rand self.algo = rand.suggest elif algo == "bayes": from hyperopt import tpe self.algo = tpe.suggest else: raise ValueError("Illegal algo [{}]".format(algo)) else: self.algo = algo from hyperopt.early_stop import no_progress_loss self.early_stop_fn = no_progress_loss(early_stop) @staticmethod def _build_space_from_file(file): from hyperopt import hp space = {} with open(file, "r") as fp: for line in fp: para_list = line.strip().split(" ") if len(para_list) < 3: continue para_name, para_type, para_value = ( para_list[0], para_list[1], "".join(para_list[2:]), ) if para_type == "choice": para_value = eval(para_value) space[para_name] = hp.choice(para_name, para_value) elif para_type == "uniform": low, high = para_value.strip().split(",") space[para_name] = hp.uniform(para_name, float(low), float(high)) elif para_type == "quniform": low, high, q = para_value.strip().split(",") space[para_name] = hp.quniform( para_name, float(low), float(high), float(q) ) elif para_type == "loguniform": low, high = para_value.strip().split(",") space[para_name] = hp.loguniform(para_name, float(low), float(high)) else: raise ValueError("Illegal param type [{}]".format(para_type)) return space @staticmethod def _build_space_from_dict(config_dict): from hyperopt import hp space = {} for para_type in config_dict: if para_type == "choice": for para_name in config_dict["choice"]: para_value = config_dict["choice"][para_name] space[para_name] = hp.choice(para_name, para_value) elif para_type == "uniform": for para_name in config_dict["uniform"]: para_value = config_dict["uniform"][para_name] low = para_value[0] high = para_value[1] space[para_name] = hp.uniform(para_name, float(low), float(high)) elif para_type == "quniform": for para_name in config_dict["quniform"]: para_value = config_dict["quniform"][para_name] low = para_value[0] high = para_value[1] q = para_value[2] space[para_name] = hp.quniform( para_name, float(low), float(high), float(q) ) elif para_type == "loguniform": for para_name in config_dict["loguniform"]: para_value = config_dict["loguniform"][para_name] low = para_value[0] high = para_value[1] space[para_name] = hp.loguniform(para_name, float(low), float(high)) else: raise ValueError("Illegal param type [{}]".format(para_type)) return space
[docs] @staticmethod def params2str(params): r"""convert dict to str Args: params (dict): parameters dict Returns: str: parameters string """ params_str = "" for param_name in params: params_str += param_name + ":" + str(params[param_name]) + ", " return params_str[:-2]
@staticmethod def _print_result(result_dict: dict): print("current best valid score: %.4f" % result_dict["best_valid_score"]) print("current best valid result:") print(result_dict["best_valid_result"]) print("current test result:") print(result_dict["test_result"]) print()
[docs] def export_result(self, output_file=None): r"""Write the searched parameters and corresponding results to the file Args: output_file (str): the output file """ with open(output_file, "w") as fp: for params in self.params2result: fp.write(params + "\n") fp.write( "Valid result:\n" + dict2str(self.params2result[params]["best_valid_result"]) + "\n" ) fp.write( "Test result:\n" + dict2str(self.params2result[params]["test_result"]) + "\n\n" )
[docs] def trial(self, params): r"""Given a set of parameters, return results and optimization status Args: params (dict): the parameter dictionary """ import hyperopt config_dict = params.copy() params_str = self.params2str(params) self.params_list.append(params_str) print("running parameters:", config_dict) result_dict = self.objective_function(config_dict, self.fixed_config_file_list) self.params2result[params_str] = result_dict model, score, bigger = ( result_dict["model"], result_dict["best_valid_score"], result_dict["valid_score_bigger"], ) self.model = model self.score_list.append(score) if not self.best_score: self.best_score = score self.best_params = params self._print_result(result_dict) else: if bigger: if score > self.best_score: self.best_score = score self.best_params = params self._print_result(result_dict) else: if score < self.best_score: self.best_score = score self.best_params = params self._print_result(result_dict) if bigger: score = -score return {"loss": score, "status": hyperopt.STATUS_OK}
[docs] def plot_hyper(self): import plotly.graph_objs as go from plotly.offline import plot import pandas as pd data_dict = {"valid_score": self.score_list, "params": self.params_list} trial_df = pd.DataFrame(data_dict) trial_df["trial_number"] = trial_df.index + 1 trial_df["trial_number"] = trial_df["trial_number"].astype(dtype=np.str) trace = go.Scatter( x=trial_df["trial_number"], y=trial_df["valid_score"], text=trial_df["params"], mode="lines+markers", marker=dict(color="green"), showlegend=True, textposition="top center", name=self.model + " tuning process", ) data = [trace] layout = go.Layout( title="hyperparams_tuning", xaxis=dict(title="trials"), yaxis=dict(title="valid_score"), ) fig = go.Figure(data=data, layout=layout) plot(fig, filename=self.display_file)
[docs] def run(self): r"""begin to search the best parameters""" from hyperopt import fmin fmin( self.trial, self.space, algo=self.algo, max_evals=self.max_evals, early_stop_fn=self.early_stop_fn, ) if self.display_file is not None: self.plot_hyper()