# Copyright 2021 John Harwell, All rights reserved.
#
# SPDX-License-Identifier: MIT
"""Extensions to
:class:`~sierra.plugins.platform.argos.platform_generators.PlatformExpDefGenerator`
common to all TITAN scenarios which use ARGoS.
"""
# Core packages
import re
# 3rd party packages
from sierra.core.utils import ArenaExtent
from sierra.core.experiment import definition
from sierra.plugins.platform.argos.generators.platform_generators import PlatformExpDefGenerator
from sierra.plugins.platform.argos.generators.platform_generators import PlatformExpRunDefUniqueGenerator
import sierra.core.utils as scutils
# Project packages
from titerra.platform.argos.variables import block_distribution, arena, block_quantity
from titerra.platform.argos.variables.nest import Nest
from titerra.projects.common.generators import utils as tiutils
from titerra.projects.common.variables import exp_setup
[docs]class BaseScenarioGenerator(PlatformExpDefGenerator):
[docs] def __init__(self, *args, **kwargs) -> None:
PlatformExpDefGenerator.__init__(self, *args, **kwargs)
[docs] def generate_convergence(self, exp_def: definition.XMLExpDef):
"""
Generate XML changes for calculating swarm convergence.
Does not write generated changes to the simulation definition pickle
file.
"""
# This whole tree can be missing and that's fine
if exp_def.has_tag(".//loop_functions/convergence"):
exp_def.attr_change(".//loop_functions/convergence",
"n_threads",
str(self.cmdopts["physics_n_engines"]))
[docs] def generate_arena_map(self,
exp_def: definition.XMLExpDef,
the_arena: arena.RectangularArena) -> None:
"""
Generate XML changes for the specified arena map configuration.
Writes generated changes to the simulation definition pickle file.
"""
_, adds, chgs = scutils.apply_to_expdef(the_arena, exp_def)
scutils.pickle_modifications(adds, chgs, self.spec.exp_def_fpath)
[docs] @staticmethod
def generate_block_dist(exp_def: definition.XMLExpDef,
block_dist: block_distribution.BaseDistribution) -> None:
"""
Generate XML changes for the specified block distribution.
Does not write generated changes to the simulation definition pickle
file.
"""
scutils.apply_to_expdef(block_dist, exp_def)
[docs] def generate_block_count(self, exp_def: definition.XMLExpDef) -> None:
"""
Generates XML changes for # blocks in the simulation. If specified on
the cmdline, that quantity is used (split evenly between ramp and cube
blocks).
Writes generated changes to the simulation definition pickle file.
"""
if self.cmdopts['n_blocks'] is not None:
n_blocks = self.cmdopts['n_blocks']
chgs1 = block_quantity.BlockQuantity.gen_attr_changelist_from_list([n_blocks / 2],
'cube')[0]
chgs2 = block_quantity.BlockQuantity.gen_attr_changelist_from_list([n_blocks / 2],
'ramp')[0]
else:
# This may have already been set by the batch criteria, but we can't
# know for sure, and we need block quantity definitions to always be
# written to the pickle file for later retrieval.
n_blocks1 = int(exp_def.attr_get('.//manifest', 'n_cube'))
n_blocks2 = int(exp_def.attr_get('.//manifest', 'n_ramp'))
chgs1 = block_quantity.BlockQuantity.gen_attr_changelist_from_list([n_blocks1],
'cube')[0]
chgs2 = block_quantity.BlockQuantity.gen_attr_changelist_from_list([n_blocks2],
'ramp')[0]
chgs = chgs1 | chgs2
for chg in chgs:
exp_def.attr_change(chg.path, chg.attr, chg.value)
chgs.pickle(self.spec.exp_def_fpath)
[docs]class ForagingScenarioGenerator(BaseScenarioGenerator):
[docs] def __init__(self, *args, **kwargs) -> None:
BaseScenarioGenerator.__init__(self, *args, **kwargs)
[docs] def generate(self) -> definition.XMLExpDef:
exp_def = super().generate()
# Generate time definitions for TITAN
tiutils.generate_time(exp_def, self.cmdopts, self.spec)
# Generate and apply convergence definitions
self.generate_convergence(exp_def)
# Generate and apply # blocks definitions
self.generate_block_count(exp_def)
return exp_def
[docs]class ForagingSSGenerator(ForagingScenarioGenerator):
"""
Generates XML changes for single source foraging.
This includes:
- Rectangular 2x1 arena
- Single source block distribution
- One nest
"""
[docs] def __init__(self, *args, **kwargs) -> None:
ForagingScenarioGenerator.__init__(self, *args, **kwargs)
[docs] def generate(self):
exp_def = super().generate()
# Generate arena definitions
assert self.spec.arena_dim.xsize() == 2 * self.spec.arena_dim.ysize(),\
"SS distribution requires a 2x1 arena: xdim={0},ydim={1}".format(self.spec.arena_dim.xsize(),
self.spec.arena_dim.ysize())
nests_config = {'pos_src': 'dist', 'dist': 'SS'}
arena_map = arena.RectangularArenaTwoByOne(x_range=[self.spec.arena_dim.xsize()],
y_range=[
self.spec.arena_dim.ysize()],
z=self.spec.arena_dim.zsize(),
nests_config=nests_config
)
self.generate_arena_map(exp_def, arena_map)
# Generate and apply block distribution type definitions
self.generate_block_dist(
exp_def, block_distribution.SingleSource())
return exp_def
[docs]class ForagingDSGenerator(ForagingScenarioGenerator):
"""
Generates XML changes for dual source foraging.
This includes:
- Rectangular 2x1 arena
- Dual source block distribution
- One nest
"""
[docs] def __init__(self, *args, **kwargs) -> None:
ForagingScenarioGenerator.__init__(self, *args, **kwargs)
[docs] def generate(self):
exp_def = super().generate()
# Generate arena definitions
assert self.spec.arena_dim.xsize() == 2 * self.spec.arena_dim.ysize(),\
"DS distribution requires a 2x1 arena: xdim={0},ydim={1}".format(self.spec.arena_dim.xsize(),
self.spec.arena_dim.ysize())
nests_config = {'pos_src': 'dist', 'dist': 'DS'}
arena_map = arena.RectangularArenaTwoByOne(x_range=[self.spec.arena_dim.xsize()],
y_range=[
self.spec.arena_dim.ysize()],
z=self.spec.arena_dim.zsize(),
nests_config=nests_config)
self.generate_arena_map(exp_def, arena_map)
# Generate and apply block distribution type definitions
self.generate_block_dist(
exp_def, block_distribution.DualSource())
return exp_def
[docs]class ForagingQSGenerator(ForagingScenarioGenerator):
"""
Generates XML changes for quad source foraging.
This includes:
- Square arena
- Quad source block distribution
- One nest
"""
[docs] def __init__(self, *args, **kwargs) -> None:
ForagingScenarioGenerator.__init__(self, *args, **kwargs)
[docs] def generate(self):
exp_def = super().generate()
# Generate arena definitions
assert self.spec.arena_dim.xsize() == self.spec.arena_dim.ysize(),\
"QS distribution requires a square arena: xdim={0},ydim={1}".format(self.spec.arena_dim.xsize(),
self.spec.arena_dim.ysize())
nests_config = {'pos_src': 'dist', 'dist': 'QS'}
arena_map = arena.SquareArena(sqrange=[self.spec.arena_dim.xsize()],
z=self.spec.arena_dim.zsize(),
nests_config=nests_config)
self.generate_arena_map(exp_def, arena_map)
# Generate and apply block distribution type definitions
source = block_distribution.QuadSource()
self.generate_block_dist(exp_def, source)
return exp_def
[docs]class ForagingRNGenerator(ForagingScenarioGenerator):
"""
Generates XML changes for random foraging.
This includes:
- Square arena
- Random block distribution
- One nest
"""
[docs] def __init__(self, *args, **kwargs) -> None:
ForagingScenarioGenerator.__init__(self, *args, **kwargs)
[docs] def generate(self):
exp_def = super().generate()
# Generate arena definitions
assert self.spec.arena_dim.xsize() == self.spec.arena_dim.ysize(),\
"RN distribution requires a square arena: xdim={0},ydim={1}".format(self.spec.arena_dim.xsize(),
self.spec.arena_dim.ysize())
nests_config = {'pos_src': 'dist', 'dist': 'RN'}
arena_map = arena.SquareArena(sqrange=[self.spec.arena_dim.xsize()],
z=self.spec.arena_dim.zsize(),
nests_config=nests_config)
self.generate_arena_map(exp_def, arena_map)
# Generate and apply block distribution type definitions
self.generate_block_dist(
exp_def, block_distribution.Random())
return exp_def
class ForagingLermanRNGenerator(ForagingScenarioGenerator):
"""
Generates XML changes for conditions of Lerman et al's ODE model for foraging.
This includes:
- Square arena
- Random block distribution
- One nest in the corner
"""
def __init__(self, *args, **kwargs) -> None:
ForagingScenarioGenerator.__init__(self, *args, **kwargs)
def generate(self):
exp_def = super().generate()
# Generate arena definitions
assert self.spec.arena_dim.xsize() == self.spec.arena_dim.ysize(),\
"RN distribution requires a square arena: xdim={0},ydim={1}".format(self.spec.arena_dim.xsize(),
self.spec.arena_dim.ysize())
nests_config = {'pos_src': 'corner', 'corner': 'UL'}
arena_map = arena.SquareArena(sqrange=[self.spec.arena_dim.xsize()],
z=self.spec.arena_dim.zsize(),
nests_config=nests_config)
self.generate_arena_map(exp_def, arena_map)
# Generate and apply block distribution type definitions
self.generate_block_dist(
exp_def, block_distribution.Random())
return exp_def
[docs]class ForagingPLGenerator(ForagingScenarioGenerator):
"""
Generates XML changes for powerlaw source foraging.
This includes:
- Square arena
- Powerlaw block distribution
- One nest
"""
[docs] def __init__(self, *args, **kwargs) -> None:
ForagingScenarioGenerator.__init__(self, *args, **kwargs)
[docs] def generate(self):
exp_def = super().generate()
# Generate arena definitions
assert self.spec.arena_dim.xsize() == self.spec.arena_dim.ysize(),\
"PL distribution requires a square arena: xdim={0},ydim={1}".format(self.spec.arena_dim.xsize(),
self.spec.arena_dim.ysize())
nests_config = {'pos_src': 'dist', 'dist': 'PL'}
arena_map = arena.SquareArena(sqrange=[self.spec.arena_dim.xsize()],
z=self.spec.arena_dim.zsize(),
nests_config=nests_config)
self.generate_arena_map(exp_def, arena_map)
# Generate and apply block distribution type definitions
self.generate_block_dist(exp_def,
block_distribution.PowerLaw(self.spec.arena_dim))
return exp_def
class ExpRunDefUniqueGenerator(PlatformExpRunDefUniqueGenerator):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
def generate(self, exp_def: definition.XMLExpDef):
super().generate(exp_def)
tiutils.generate_random(exp_def,
".//controllers/*/params",
self.random_seed)
tiutils.generate_output(exp_def,
".//controllers/*/params",
self.run_output_path)
def gen_generator_name(scenario_name: str) -> str:
if res := re.search('LermanRN', scenario_name):
abbrev = res.group(0)
else:
res = re.search('[SDQPR][SSSLN]', scenario_name)
assert res is not None, "Bad block distribution in {0}".format(
scenario_name)
abbrev = res.group(0)
return abbrev + 'Generator'
__api__ = [
'BaseScenarioGenerator',
'ForagingScenarioGenerator',
'ForagingSSGenerator',
'ForagingDSGenerator',
'ForagingQSGenerator',
'ForagingPLGenerator',
'ForagingRNGenerator',
]