Source code for titerra.projects.common.generators.argos

# Copyright 2021 John Harwell, All rights reserved.
#
#  SPDX-License-Identifier: MIT

"""Extensions to
:class:`~sierra.plugins.platform.argos.platform_generators.PlatformExpDefGenerator`
common to all TITAN scenarios which use ARGoS.

"""
# Core packages
import re

# 3rd party packages
from sierra.core.utils import ArenaExtent
from sierra.core.experiment import definition
from sierra.plugins.platform.argos.generators.platform_generators import PlatformExpDefGenerator
from sierra.plugins.platform.argos.generators.platform_generators import PlatformExpRunDefUniqueGenerator
import sierra.core.utils as scutils

# Project packages
from titerra.platform.argos.variables import block_distribution, arena, block_quantity
from titerra.platform.argos.variables.nest import Nest
from titerra.projects.common.generators import utils as tiutils
from titerra.projects.common.variables import exp_setup


[docs]class BaseScenarioGenerator(PlatformExpDefGenerator):
[docs] def __init__(self, *args, **kwargs) -> None: PlatformExpDefGenerator.__init__(self, *args, **kwargs)
[docs] def generate_convergence(self, exp_def: definition.XMLExpDef): """ Generate XML changes for calculating swarm convergence. Does not write generated changes to the simulation definition pickle file. """ # This whole tree can be missing and that's fine if exp_def.has_tag(".//loop_functions/convergence"): exp_def.attr_change(".//loop_functions/convergence", "n_threads", str(self.cmdopts["physics_n_engines"]))
[docs] def generate_arena_map(self, exp_def: definition.XMLExpDef, the_arena: arena.RectangularArena) -> None: """ Generate XML changes for the specified arena map configuration. Writes generated changes to the simulation definition pickle file. """ _, adds, chgs = scutils.apply_to_expdef(the_arena, exp_def) scutils.pickle_modifications(adds, chgs, self.spec.exp_def_fpath)
[docs] @staticmethod def generate_block_dist(exp_def: definition.XMLExpDef, block_dist: block_distribution.BaseDistribution) -> None: """ Generate XML changes for the specified block distribution. Does not write generated changes to the simulation definition pickle file. """ scutils.apply_to_expdef(block_dist, exp_def)
[docs] def generate_block_count(self, exp_def: definition.XMLExpDef) -> None: """ Generates XML changes for # blocks in the simulation. If specified on the cmdline, that quantity is used (split evenly between ramp and cube blocks). Writes generated changes to the simulation definition pickle file. """ if self.cmdopts['n_blocks'] is not None: n_blocks = self.cmdopts['n_blocks'] chgs1 = block_quantity.BlockQuantity.gen_attr_changelist_from_list([n_blocks / 2], 'cube')[0] chgs2 = block_quantity.BlockQuantity.gen_attr_changelist_from_list([n_blocks / 2], 'ramp')[0] else: # This may have already been set by the batch criteria, but we can't # know for sure, and we need block quantity definitions to always be # written to the pickle file for later retrieval. n_blocks1 = int(exp_def.attr_get('.//manifest', 'n_cube')) n_blocks2 = int(exp_def.attr_get('.//manifest', 'n_ramp')) chgs1 = block_quantity.BlockQuantity.gen_attr_changelist_from_list([n_blocks1], 'cube')[0] chgs2 = block_quantity.BlockQuantity.gen_attr_changelist_from_list([n_blocks2], 'ramp')[0] chgs = chgs1 | chgs2 for chg in chgs: exp_def.attr_change(chg.path, chg.attr, chg.value) chgs.pickle(self.spec.exp_def_fpath)
[docs]class ForagingScenarioGenerator(BaseScenarioGenerator):
[docs] def __init__(self, *args, **kwargs) -> None: BaseScenarioGenerator.__init__(self, *args, **kwargs)
[docs] def generate(self) -> definition.XMLExpDef: exp_def = super().generate() # Generate time definitions for TITAN tiutils.generate_time(exp_def, self.cmdopts, self.spec) # Generate and apply convergence definitions self.generate_convergence(exp_def) # Generate and apply # blocks definitions self.generate_block_count(exp_def) return exp_def
[docs]class ForagingSSGenerator(ForagingScenarioGenerator): """ Generates XML changes for single source foraging. This includes: - Rectangular 2x1 arena - Single source block distribution - One nest """
[docs] def __init__(self, *args, **kwargs) -> None: ForagingScenarioGenerator.__init__(self, *args, **kwargs)
[docs] def generate(self): exp_def = super().generate() # Generate arena definitions assert self.spec.arena_dim.xsize() == 2 * self.spec.arena_dim.ysize(),\ "SS distribution requires a 2x1 arena: xdim={0},ydim={1}".format(self.spec.arena_dim.xsize(), self.spec.arena_dim.ysize()) nests_config = {'pos_src': 'dist', 'dist': 'SS'} arena_map = arena.RectangularArenaTwoByOne(x_range=[self.spec.arena_dim.xsize()], y_range=[ self.spec.arena_dim.ysize()], z=self.spec.arena_dim.zsize(), nests_config=nests_config ) self.generate_arena_map(exp_def, arena_map) # Generate and apply block distribution type definitions self.generate_block_dist( exp_def, block_distribution.SingleSource()) return exp_def
[docs]class ForagingDSGenerator(ForagingScenarioGenerator): """ Generates XML changes for dual source foraging. This includes: - Rectangular 2x1 arena - Dual source block distribution - One nest """
[docs] def __init__(self, *args, **kwargs) -> None: ForagingScenarioGenerator.__init__(self, *args, **kwargs)
[docs] def generate(self): exp_def = super().generate() # Generate arena definitions assert self.spec.arena_dim.xsize() == 2 * self.spec.arena_dim.ysize(),\ "DS distribution requires a 2x1 arena: xdim={0},ydim={1}".format(self.spec.arena_dim.xsize(), self.spec.arena_dim.ysize()) nests_config = {'pos_src': 'dist', 'dist': 'DS'} arena_map = arena.RectangularArenaTwoByOne(x_range=[self.spec.arena_dim.xsize()], y_range=[ self.spec.arena_dim.ysize()], z=self.spec.arena_dim.zsize(), nests_config=nests_config) self.generate_arena_map(exp_def, arena_map) # Generate and apply block distribution type definitions self.generate_block_dist( exp_def, block_distribution.DualSource()) return exp_def
[docs]class ForagingQSGenerator(ForagingScenarioGenerator): """ Generates XML changes for quad source foraging. This includes: - Square arena - Quad source block distribution - One nest """
[docs] def __init__(self, *args, **kwargs) -> None: ForagingScenarioGenerator.__init__(self, *args, **kwargs)
[docs] def generate(self): exp_def = super().generate() # Generate arena definitions assert self.spec.arena_dim.xsize() == self.spec.arena_dim.ysize(),\ "QS distribution requires a square arena: xdim={0},ydim={1}".format(self.spec.arena_dim.xsize(), self.spec.arena_dim.ysize()) nests_config = {'pos_src': 'dist', 'dist': 'QS'} arena_map = arena.SquareArena(sqrange=[self.spec.arena_dim.xsize()], z=self.spec.arena_dim.zsize(), nests_config=nests_config) self.generate_arena_map(exp_def, arena_map) # Generate and apply block distribution type definitions source = block_distribution.QuadSource() self.generate_block_dist(exp_def, source) return exp_def
[docs]class ForagingRNGenerator(ForagingScenarioGenerator): """ Generates XML changes for random foraging. This includes: - Square arena - Random block distribution - One nest """
[docs] def __init__(self, *args, **kwargs) -> None: ForagingScenarioGenerator.__init__(self, *args, **kwargs)
[docs] def generate(self): exp_def = super().generate() # Generate arena definitions assert self.spec.arena_dim.xsize() == self.spec.arena_dim.ysize(),\ "RN distribution requires a square arena: xdim={0},ydim={1}".format(self.spec.arena_dim.xsize(), self.spec.arena_dim.ysize()) nests_config = {'pos_src': 'dist', 'dist': 'RN'} arena_map = arena.SquareArena(sqrange=[self.spec.arena_dim.xsize()], z=self.spec.arena_dim.zsize(), nests_config=nests_config) self.generate_arena_map(exp_def, arena_map) # Generate and apply block distribution type definitions self.generate_block_dist( exp_def, block_distribution.Random()) return exp_def
class ForagingLermanRNGenerator(ForagingScenarioGenerator): """ Generates XML changes for conditions of Lerman et al's ODE model for foraging. This includes: - Square arena - Random block distribution - One nest in the corner """ def __init__(self, *args, **kwargs) -> None: ForagingScenarioGenerator.__init__(self, *args, **kwargs) def generate(self): exp_def = super().generate() # Generate arena definitions assert self.spec.arena_dim.xsize() == self.spec.arena_dim.ysize(),\ "RN distribution requires a square arena: xdim={0},ydim={1}".format(self.spec.arena_dim.xsize(), self.spec.arena_dim.ysize()) nests_config = {'pos_src': 'corner', 'corner': 'UL'} arena_map = arena.SquareArena(sqrange=[self.spec.arena_dim.xsize()], z=self.spec.arena_dim.zsize(), nests_config=nests_config) self.generate_arena_map(exp_def, arena_map) # Generate and apply block distribution type definitions self.generate_block_dist( exp_def, block_distribution.Random()) return exp_def
[docs]class ForagingPLGenerator(ForagingScenarioGenerator): """ Generates XML changes for powerlaw source foraging. This includes: - Square arena - Powerlaw block distribution - One nest """
[docs] def __init__(self, *args, **kwargs) -> None: ForagingScenarioGenerator.__init__(self, *args, **kwargs)
[docs] def generate(self): exp_def = super().generate() # Generate arena definitions assert self.spec.arena_dim.xsize() == self.spec.arena_dim.ysize(),\ "PL distribution requires a square arena: xdim={0},ydim={1}".format(self.spec.arena_dim.xsize(), self.spec.arena_dim.ysize()) nests_config = {'pos_src': 'dist', 'dist': 'PL'} arena_map = arena.SquareArena(sqrange=[self.spec.arena_dim.xsize()], z=self.spec.arena_dim.zsize(), nests_config=nests_config) self.generate_arena_map(exp_def, arena_map) # Generate and apply block distribution type definitions self.generate_block_dist(exp_def, block_distribution.PowerLaw(self.spec.arena_dim)) return exp_def
class ExpRunDefUniqueGenerator(PlatformExpRunDefUniqueGenerator): def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) def generate(self, exp_def: definition.XMLExpDef): super().generate(exp_def) tiutils.generate_random(exp_def, ".//controllers/*/params", self.random_seed) tiutils.generate_output(exp_def, ".//controllers/*/params", self.run_output_path) def gen_generator_name(scenario_name: str) -> str: if res := re.search('LermanRN', scenario_name): abbrev = res.group(0) else: res = re.search('[SDQPR][SSSLN]', scenario_name) assert res is not None, "Bad block distribution in {0}".format( scenario_name) abbrev = res.group(0) return abbrev + 'Generator' __api__ = [ 'BaseScenarioGenerator', 'ForagingScenarioGenerator', 'ForagingSSGenerator', 'ForagingDSGenerator', 'ForagingQSGenerator', 'ForagingPLGenerator', 'ForagingRNGenerator', ]