diff --git a/mesa/time.py b/mesa/time.py index a1a73491dd3..03479b9a44a 100644 --- a/mesa/time.py +++ b/mesa/time.py @@ -133,3 +133,58 @@ def step(self): agent.advance(self.model) self.steps += 1 self.time += 1 + + +class StagedActivation(BaseScheduler): + ''' + A scheduler which allows agent activation to be divided into several stages + instead of a single `step` method. All agents execute one stage before + moving on to the next. + + Agents must have all the stage methods implemented. Stage methods take a + model object as their only argument. + + This schedule tracks steps and time separately. Time advances in fractional + increments of 1 / (# of stages), meaning that 1 step = 1 unit of time. + ''' + + stage_list = [] + shuffle = False + shuffle_between_stages = False + stage_time = 1 + + def __init__(self, model, stage_list=["step"], shuffle=False, + shuffle_between_stages=False): + ''' + Create an empty Staged Activation schedule. + + Args: + model: Model object associated with the schedule. + stage_list: List of strings of names of stages to run, in the + order to run them in. + shuffle: If True, shuffle the order of agents each step. + shuffle_between_stages: If True, shuffle the agents after each + stage; otherwise, only shuffle at the start + of each step. + ''' + super().__init__(model) + self.stage_list = stage_list + self.shuffle = shuffle + self.shuffle_between_stages = shuffle_between_stages + self.stage_time = 1 / len(self.stage_list) + + def step(self): + ''' + Executes all the stages of all agents. + ''' + + if self.shuffle: + random.shuffle(self.agents) + for stage in self.stage_list: + for agent in self.agents: + getattr(agent, stage)(self.model) # Run stage + if self.shuffle_between_stages: + random.shuffle(self.agents) + self.time += self.stage_time + + self.steps += 1 diff --git a/tests/test_time.py b/tests/test_time.py new file mode 100644 index 00000000000..b0df62e169a --- /dev/null +++ b/tests/test_time.py @@ -0,0 +1,65 @@ +''' +Test the advanced schedulers. +''' + +import unittest +from mesa import Model, Agent +from mesa.time import StagedActivation + + +class MockStagedAgent(Agent): + ''' + Minimalistic agent for testing purposes. + ''' + + def __init__(self, name): + self.unique_id = name + + def stage_one(self, model): + model.log.append(self.unique_id + "_1") + + def stage_two(self, model): + model.log.append(self.unique_id + "_2") + + +class MockModel(Model): + + def __init__(self, shuffle): + self.log = [] + model_stages = ["stage_one", "stage_two"] + self.schedule = StagedActivation(self, model_stages, shuffle=shuffle) + + # Make agents + for name in ["A", "B"]: + agent = MockStagedAgent(name) + self.schedule.add(agent) + + def step(self): + self.schedule.step() + + +class TestStagedActivation(unittest.TestCase): + ''' + Test the staged activation. + ''' + + expected_output = ["A_1", "B_1", "A_2", "B_2"] + + def test_no_shuffle(self): + ''' + Testing staged activation without shuffling. + ''' + model = MockModel(False) + model.step() + assert model.log == self.expected_output + + def test_shuffle(self): + ''' + Test staged activation with shuffling + ''' + model = MockModel(True) + model.step() + for output in self.expected_output[:2]: + assert output in model.log[:2] + for output in self.expected_output[2:]: + assert output in model.log[2:]