Source code for sierra.core.variables.batch_criteria

# Copyright 2019 John Harwell, All rights reserved.
#
#  SPDX-License-Identifier: MIT
"""
Base classes used to define :term:`Batch Experiments <Batch Experiment>`.
"""
# Core packages
import typing as tp
import logging
import argparse
import copy
import pathlib

# 3rd party packages
import implements

from sierra.core.variables import base_variable
from sierra.core import utils
from sierra.core.experiment import definition, xml


import sierra.core.plugin_manager as pm
from sierra.core import types, config


class IQueryableBatchCriteria(implements.Interface):
    """Mixin interface for criteria which can be queried during stage {1,2}.

    Used to extract additional information needed for configuring some
    :term:`Platforms <Platform>` and execution environments.

    """

    def n_robots(self, exp_num: int) -> int:
        """
        Return the # of robots used for a given :term:`Experiment`.
        """
        raise NotImplementedError


[docs]class IConcreteBatchCriteria(implements.Interface): """ 'Final' interface for user-visible batch criteria. """
[docs] def graph_xticks(self, cmdopts: types.Cmdopts, exp_names: tp.Optional[tp.List[str]] = None) -> tp.List[float]: """Calculate X axis ticks for graph generation. Arguments: cmdopts: Dictionary of parsed command line options. exp_names: If not None, then this list of directories will be used to calculate the ticks, rather than the results of gen_exp_names(). """ raise NotImplementedError
[docs] def graph_xticklabels(self, cmdopts: types.Cmdopts, exp_names: tp.Optional[tp.List[str]] = None) -> tp.List[str]: """Calculate X axis tick labels for graph generation. Arguments: cmdopts: Dictionary of parsed command line options. exp_names: If not None, then these directories will be used to calculate the labels, rather than the results of gen_exp_names(). """ raise NotImplementedError
[docs] def graph_xlabel(self, cmdopts: types.Cmdopts) -> str: """Get the X-label for a graph. Returns: The X-label that should be used for the graphs of various performance measures across batch criteria. """ raise NotImplementedError
class IBivarBatchCriteria(implements.Interface): """ Interface for bivariate batch criteria(those with two univariate axes). """ def graph_yticks(self, cmdopts: types.Cmdopts, exp_names: tp.Optional[tp.List[str]] = None) -> tp.List[float]: """ Calculate Y axis ticks for graph generation. Arguments: cmdopts: Dictionary of parsed command line options. exp_names: If not None, then these directories will be used to calculate the ticks, rather than the results of gen_exp_names(). """ raise NotImplementedError def graph_yticklabels(self, cmdopts: types.Cmdopts, exp_names: tp.Optional[tp.List[str]] = None) -> tp.List[str]: """ Calculate X axis ticks for graph generation. Arguments: cmdopts: Dictionary of parsed command line options. exp_names: If not None, then these directories will be used to calculate the labels, rather than the results of gen_exp_names(). """ raise NotImplementedError def graph_ylabel(self, cmdopts: types.Cmdopts) -> str: """ Get the Y-label for a graph. Returns:p The Y-label that should be used for the graphs of various performance measures across batch criteria. Only needed by bivar batch criteria. """ raise NotImplementedError class IBatchCriteriaType(implements.Interface): """Mixin interface for criteria for querying univariate/bivariate. """ def is_bivar(self) -> bool: """ Determine if the batch criteria is bivariate. Returns: `True` if this class is a bivariate batch criteria instance, and `False` otherwise. """ raise NotImplementedError def is_univar(self) -> bool: """ Determine if the batch criteria is univariate. Returns: `True` if this class is a univar batch criteria instance, and `False` otherwise. """ raise NotImplementedError @implements.implements(base_variable.IBaseVariable) class BatchCriteria(): """Defines experiments via lists of sets of changes to make to an XML file. Attributes: cli_arg: Unparsed batch criteria string from command line. main_config: Parsed dictionary of main YAML configuration. batch_input_root: Absolute path to the directory where batch experiment directories should be created. """ def __init__(self, cli_arg: str, main_config: types.YAMLDict, batch_input_root: pathlib.Path) -> None: self.cli_arg = cli_arg self.main_config = main_config self.batch_input_root = batch_input_root self.cat_str = cli_arg.split('.')[0] self.def_str = '.'.join(cli_arg.split('.')[1:]) self.logger = logging.getLogger(__name__) # Stub out IBaseVariable because all concrete batch criteria only implement # a subset of them. def gen_attr_changelist(self) -> tp.List[xml.AttrChangeSet]: return [] def gen_tag_rmlist(self) -> tp.List[xml.TagRmList]: return [] def gen_tag_addlist(self) -> tp.List[xml.TagAddList]: return [] def gen_files(self) -> None: pass def gen_exp_names(self, cmdopts: types.Cmdopts) -> tp.List[str]: """ Generate list of experiment names from the criteria. Used for creating unique directory names for each experiment in the batch. Returns: List of experiments names for current experiment. """ return [] def arena_dims(self, cmdopts: types.Cmdopts) -> tp.List[utils.ArenaExtent]: """Get the arena dimensions used for each experiment in the batch. Not applicable to all criteria. Must be implemented on a per-platform basis, as different platforms have different means of computing the size of the arena. """ module = pm.pipeline.get_plugin_module(cmdopts['platform']) assert hasattr(module, 'arena_dims_from_criteria'), \ f"Platform plugin {module.__name__} does not implement arena_dims_from_criteria()" return module.arena_dims_from_criteria(self) def n_exp(self) -> int: from sierra.core.experiment import spec scaffold_spec = spec.scaffold_spec_factory(self) return scaffold_spec.n_exps def pickle_exp_defs(self, cmdopts: types.Cmdopts) -> None: from sierra.core.experiment import spec scaffold_spec = spec.scaffold_spec_factory(self) for exp in range(0, scaffold_spec.n_exps): exp_dirname = self.gen_exp_names(cmdopts)[exp] # Pickling of batch criteria experiment definitions is the FIRST set # of changes to be pickled--all other changes come after. We append # to the pickle file by default, which allows any number of # additional sets of changes to be written, BUT that can also lead # to errors if stage 1 is run multiple times before stage 4. So, we # DELETE the pickle file for each experiment here to make stage 1 # idempotent. pkl_path = self.batch_input_root / exp_dirname / config.kPickleLeaf exp_defi = scaffold_spec.mods[exp] if not scaffold_spec.is_compound: exp_defi.pickle(pkl_path, delete=True) else: exp_defi[0].pickle(pkl_path, delete=True) exp_defi[1].pickle(pkl_path, delete=False) def scaffold_exps(self, batch_def: definition.XMLExpDef, cmdopts: types.Cmdopts) -> None: """Scaffold a batch experiment. Takes the raw template input file and apply XML modifications from the batch criteria for all experiments, and save the result in each experiment's input directory. """ from sierra.core.experiment import spec scaffold_spec = spec.scaffold_spec_factory(self, log=True) for i in range(0, scaffold_spec.n_exps): modsi = scaffold_spec.mods[i] expi_def = copy.deepcopy(batch_def) self._scaffold_expi(expi_def, modsi, scaffold_spec.is_compound, i, cmdopts) n_exp_dirs = len(list(self.batch_input_root.iterdir())) if scaffold_spec.n_exps != n_exp_dirs: msg1 = (f"Size of batch experiment ({scaffold_spec.n_exps}) != " f"# exp dirs ({n_exp_dirs}): possibly caused by:") msg2 = (f"(1) Changing bc w/o changing the generation root " f"({self.batch_input_root})") msg3 = (f"(2) Sharing {self.batch_input_root} between different " f"batch criteria") self.logger.fatal(msg1) self.logger.fatal(msg2) self.logger.fatal(msg3) raise RuntimeError("Batch experiment size/# exp dir mismatch") def _scaffold_expi(self, expi_def: definition.XMLExpDef, modsi, is_compound: bool, i: int, cmdopts: types.Cmdopts) -> None: exp_dirname = self.gen_exp_names(cmdopts)[i] exp_input_root = self.batch_input_root / exp_dirname utils.dir_create_checked(exp_input_root, exist_ok=cmdopts['exp_overwrite']) if not is_compound: self.logger.debug(("Applying %s XML modifications from '%s' for " "exp%s in %s"), len(modsi), self.cli_arg, i, exp_dirname) for mod in modsi: if isinstance(mod, xml.AttrChange): expi_def.attr_change(mod.path, mod.attr, mod.value) elif isinstance(mod, xml.TagAdd): assert mod.path is not None, \ "Cannot add root {mode.tag} during scaffolding" expi_def.tag_add(mod.path, mod.tag, mod.attr, mod.allow_dup) else: self.logger.debug(("Applying %s XML modifications from '%s' for " "exp%s in %s"), len(modsi[0]) + len(modsi[1]), self.cli_arg, i, exp_dirname) # Mods are a tuple for compound specs: adds, changes. We do adds # first, in case some insane person wants to use the second batch # criteria to modify something they just added. for add in modsi[0]: expi_def.tag_add(add.path, add.tag, add.attr, add.allow_dup) for chg in modsi[1]: expi_def.attr_change(chg.path, chg.attr, chg.value) # This will be the "template" input file used to generate the input # files for each experimental run in the experiment wr_config = xml.WriterConfig([{'src_parent': None, 'src_tag': '.', 'opath_leaf': None, 'create_tags': None, 'dest_parent': None }]) expi_def.write_config_set(wr_config) opath = utils.exp_template_path(cmdopts, self.batch_input_root, exp_dirname) expi_def.write(opath) @implements.implements(IBatchCriteriaType) class UnivarBatchCriteria(BatchCriteria): """ Base class for a univariate batch criteria. """ # # IBatchCriteriaType overrides # def is_bivar(self) -> bool: return False def is_univar(self) -> bool: return True def populations(self, cmdopts: types.Cmdopts, exp_names: tp.Optional[tp.List[str]] = None) -> tp.List[int]: """ Calculate system sizes used the batch experiment, sorted. Arguments: cmdopts: Dictionary of parsed command line options. exp_names: If is not `None`, then these directories will be used to calculate the system sizes, rather than the results of ``gen_exp_names()``. """ sizes = [] if exp_names is not None: names = exp_names else: names = self.gen_exp_names(cmdopts) module = pm.pipeline.get_plugin_module(cmdopts['platform']) for d in names: path = self.batch_input_root / d / config.kPickleLeaf exp_def = definition.unpickle(path) sizes.append(module.population_size_from_pickle(exp_def, self.main_config, cmdopts)) return sizes @implements.implements(IBivarBatchCriteria) @implements.implements(IBatchCriteriaType) @implements.implements(IQueryableBatchCriteria) class BivarBatchCriteria(BatchCriteria): """ Combination of the definition of two separate batch criteria. .. versionchanged:: 1.2.20 Bivariate batch criteria can be compound: one criteria can create and the other modify XML tags to create an experiment definition. """ def __init__(self, criteria1: IConcreteBatchCriteria, criteria2: IConcreteBatchCriteria) -> None: BatchCriteria.__init__(self, '+'.join([criteria1.cli_arg, criteria2.cli_arg]), criteria1.main_config, criteria1.batch_input_root) self.criteria1 = criteria1 self.criteria2 = criteria2 # # IBatchCriteriaType overrides # def is_bivar(self) -> bool: return True def is_univar(self) -> bool: return False def gen_attr_changelist(self) -> tp.List[xml.AttrChangeSet]: list1 = self.criteria1.gen_attr_changelist() list2 = self.criteria2.gen_attr_changelist() ret = [] if list1 and list2: for l1 in list1: for l2 in list2: ret.append(l1 | l2) elif list1: ret = list1 elif list2: ret = list2 return ret def gen_tag_addlist(self) -> tp.List[xml.TagAddList]: list1 = self.criteria1.gen_tag_addlist() list2 = self.criteria2.gen_tag_addlist() ret = [] if list1 and list2: for l1 in list1: for l2 in list2: l1.extend(l2) ret.append(l1) elif list1: ret = list1 elif list2: ret = list2 return ret def gen_tag_rmlist(self) -> tp.List[xml.TagRmList]: ret = self.criteria1.gen_tag_rmlist() ret.extend(self.criteria2.gen_tag_rmlist()) return ret def gen_exp_names(self, cmdopts: types.Cmdopts) -> tp.List[str]: """ Generate a SORTED list of strings for all experiment names. These will be used as directory LEAF names--and don't include the parents. """ list1 = self.criteria1.gen_exp_names(cmdopts) list2 = self.criteria2.gen_exp_names(cmdopts) ret = [] for l1 in list1: for l2 in list2: ret.append('+'.join(['c1-' + l1, 'c2-' + l2])) return ret def populations(self, cmdopts: types.Cmdopts) -> tp.List[tp.List[int]]: """Generate a 2D array of system sizes used the batch experiment. Sizes are in the same order as the directories returned from `gen_exp_names()` for each criteria along each axis. """ names = self.gen_exp_names(cmdopts) sizes = [[0 for col in self.criteria2.gen_exp_names( cmdopts)] for row in self.criteria1.gen_exp_names(cmdopts)] n_chgs2 = len(self.criteria2.gen_attr_changelist()) n_adds2 = len(self.criteria2.gen_tag_addlist()) module = pm.pipeline.get_plugin_module(cmdopts['platform']) for d in names: pkl_path = self.batch_input_root / d / config.kPickleLeaf exp_def = definition.unpickle(pkl_path) index = names.index(d) i = int(index / (n_chgs2 + n_adds2)) j = index % (n_chgs2 + n_adds2) sizes[i][j] = module.population_size_from_pickle(exp_def, self.main_config, cmdopts) return sizes def exp_scenario_name(self, exp_num: int) -> str: """Given the expeperiment number, compute a parsable scenario name. It is necessary to query this function after generating the changelist in order to create generator classes for each experiment in the batch with the correct name and definition in some cases. Can only be called if constant density is one of the sub-criteria. """ if hasattr(self.criteria1, 'exp_scenario_name'): return self.criteria1.exp_scenario_name(int(exp_num / len(self.criteria2.gen_attr_changelist()))) if hasattr(self.criteria2, 'exp_scenario_name'): return self.criteria2.exp_scenario_name(int(exp_num % len(self.criteria2.gen_attr_changelist()))) else: raise RuntimeError( "Bivariate batch criteria does not contain constant density") def graph_xticks(self, cmdopts: types.Cmdopts, exp_names: tp.Optional[tp.List[str]] = None) -> tp.List[float]: names = [] all_dirs = utils.exp_range_calc(cmdopts, cmdopts['batch_output_root'], self) for c1 in self.criteria1.gen_exp_names(cmdopts): for x in all_dirs: leaf = x.name if c1 in leaf.split('+')[0]: names.append(leaf) break return self.criteria1.graph_xticks(cmdopts, names) def graph_yticks(self, cmdopts: types.Cmdopts, exp_names: tp.Optional[tp.List[str]] = None) -> tp.List[float]: names = [] all_dirs = utils.exp_range_calc(cmdopts, cmdopts['batch_output_root'], self) for c2 in self.criteria2.gen_exp_names(cmdopts): for y in all_dirs: leaf = y.name if c2 in leaf.split('+')[1]: names.append(leaf) break return self.criteria2.graph_xticks(cmdopts, names) def graph_xticklabels(self, cmdopts: types.Cmdopts, exp_names: tp.Optional[tp.List[str]] = None) -> tp.List[str]: names = [] all_dirs = utils.exp_range_calc(cmdopts, cmdopts['batch_output_root'], self) for c1 in self.criteria1.gen_exp_names(cmdopts): for x in all_dirs: leaf = x.name if c1 in leaf.split('+')[0]: names.append(leaf) break return self.criteria1.graph_xticklabels(cmdopts, names) def graph_yticklabels(self, cmdopts: types.Cmdopts, exp_names: tp.Optional[tp.List[str]] = None) -> tp.List[str]: names = [] all_dirs = utils.exp_range_calc(cmdopts, cmdopts['batch_output_root'], self) for c2 in self.criteria2.gen_exp_names(cmdopts): for y in all_dirs: leaf = y.name if c2 in leaf.split('+')[1]: names.append(leaf) break return self.criteria2.graph_xticklabels(cmdopts, names) def graph_xlabel(self, cmdopts: types.Cmdopts) -> str: return self.criteria1.graph_xlabel(cmdopts) def graph_ylabel(self, cmdopts: types.Cmdopts) -> str: return self.criteria2.graph_xlabel(cmdopts) def set_batch_input_root(self, root: pathlib.Path) -> None: self.batch_input_root = root self.criteria1.batch_input_root = root self.criteria2.batch_input_root = root def n_robots(self, exp_num: int) -> int: n_chgs2 = len(self.criteria2.gen_attr_changelist()) n_adds2 = len(self.criteria2.gen_tag_addlist()) i = int(exp_num / (n_chgs2 + n_adds2)) j = exp_num % (n_chgs2 + n_adds2) if hasattr(self.criteria1, 'n_robots'): return self.criteria1.n_robots(i) elif hasattr(self.criteria2, 'n_robots'): return self.criteria2.n_robots(j) raise NotImplementedError def factory(main_config: types.YAMLDict, cmdopts: types.Cmdopts, args: argparse.Namespace, scenario: tp.Optional[str] = None) -> IConcreteBatchCriteria: if scenario is None: scenario = args.scenario if len(args.batch_criteria) == 1: return __univar_factory(main_config, cmdopts, args.batch_criteria[0], scenario) elif len(args.batch_criteria) == 2: assert args.batch_criteria[0] != args.batch_criteria[1],\ "Duplicate batch criteria passed" return __bivar_factory(main_config, cmdopts, args.batch_criteria, scenario) else: raise RuntimeError( "1 or 2 batch criterias must be specified on the cmdline") def __univar_factory(main_config: types.YAMLDict, cmdopts: types.Cmdopts, cli_arg: str, scenario) -> IConcreteBatchCriteria: """ Construct a batch criteria object from a single cmdline argument. """ category = cli_arg.split('.')[0] path = f'variables.{category}' module = pm.bc_load(cmdopts, category) bcfactory = getattr(module, "factory") if 5 in cmdopts['pipeline']: ret = bcfactory(cli_arg, main_config, cmdopts, scenario=scenario)() else: ret = bcfactory(cli_arg, main_config, cmdopts)() logging.info("Create univariate batch criteria '%s' from '%s'", ret.__class__.__name__, path) return ret # type: ignore def __bivar_factory(main_config: types.YAMLDict, cmdopts: types.Cmdopts, cli_arg: tp.List[str], scenario: str) -> IConcreteBatchCriteria: criteria1 = __univar_factory(main_config, cmdopts, cli_arg[0], scenario) criteria2 = __univar_factory(main_config, cmdopts, cli_arg[1], scenario) # Project hook bc = pm.module_load_tiered(project=cmdopts['project'], path='variables.batch_criteria') ret = bc.BivarBatchCriteria(criteria1, criteria2) logging.info("Created bivariate batch criteria from %s,%s", ret.criteria1.__class__.__name__, ret.criteria2.__class__.__name__) return ret # type: ignore __api__ = [ 'BatchCriteria', 'IConcreteBatchCriteria', 'UnivarBatchCriteria', 'BivarBatchCriteria', ]