Skip to content

Commit 349b5b5

Browse files
authored
Merge pull request #61 from cseptesting/54-prepare-models-io-interface-in-parallel-execution2
Modified registries to handle input data persistence in a run or tmp directory, so they can be plugged in a volume docker without intefering with other time-windows when running in parallel
2 parents 14ce485 + 9a3c68c commit 349b5b5

File tree

11 files changed

+440
-179
lines changed

11 files changed

+440
-179
lines changed

floatcsep/experiment.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class Experiment:
7777
7878
model_config (str): Path to the models' configuration file
7979
test_config (str): Path to the evaluations' configuration file
80-
run_mode (str): 'sequential' or 'parallel'
80+
run_mode (str): 'serial' or 'parallel'
8181
default_test_kwargs (dict): Default values for the testing
8282
(seed, number of simulations, etc.)
8383
postprocess (dict): Contains the instruction for postprocessing
@@ -102,7 +102,7 @@ def __init__(
102102
postprocess: str = None,
103103
default_test_kwargs: dict = None,
104104
rundir: str = "results",
105-
run_mode: str = "sequential",
105+
run_mode: str = "serial",
106106
report_hook: dict = None,
107107
**kwargs,
108108
) -> None:
@@ -123,6 +123,7 @@ def __init__(
123123
self.registry = ExperimentRegistry.factory(workdir=workdir, run_dir=rundir)
124124
self.results_repo = ResultsRepository(self.registry)
125125
self.catalog_repo = CatalogRepository(self.registry)
126+
self.run_id = "run"
126127

127128
self.config_file = kwargs.get("config_file", None)
128129
self.original_config = kwargs.get("original_config", None)
@@ -285,20 +286,27 @@ def get_model(self, name: str) -> Model:
285286
for model in self.models:
286287
if model.name == name:
287288
return model
289+
raise Exception(f"No existing model with name {name}")
288290

289-
def get_test(self, name: str) -> Model:
291+
def get_test(self, name: str) -> Evaluation:
290292
"""Returns an Evaluation by its name string."""
291293
for test in self.tests:
292294
if test.name == name:
293295
return test
296+
raise Exception(f"No existing evaluation with name {name}")
294297

295298
def stage_models(self) -> None:
296299
"""
297300
Stages all the experiment's models. See :meth:`floatcsep.model.Model.stage`
298301
"""
299302
log.info("Staging models")
300303
for i in self.models:
301-
i.stage(self.time_windows, run_mode=self.run_mode, run_dir=self.run_dir)
304+
i.stage(
305+
self.time_windows,
306+
run_mode=self.run_mode,
307+
stage_dir=self.registry.run_dir,
308+
run_id=self.run_id,
309+
)
302310
self.registry.add_model_registry(i)
303311

304312
def set_tests(self, test_config: Union[str, Dict, List]) -> list:
@@ -365,7 +373,7 @@ def set_tasks(self) -> None:
365373
Lazy definition of the experiment core tasks by wrapping instances,
366374
methods and arguments. Creates a graph with task nodes, while assigning
367375
task-parents to each node, depending on each Evaluation signature.
368-
The tasks can then be run sequentially as a list or asynchronous
376+
The tasks can then be run in serial as a list or asynchronous
369377
using the graph's node dependencies.
370378
For instance:
371379

floatcsep/infrastructure/environments.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ def run_command(self, command) -> None:
277277
cmd = [
278278
"bash",
279279
"-c",
280-
f"{self.package_manager} run --live-stream -n {self.env_name} {command}",
280+
f"conda run --live-stream -n {self.env_name} {command}",
281281
]
282282

283283
process = subprocess.Popen(

floatcsep/infrastructure/registries.py

Lines changed: 57 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import logging
22
import os
3+
import tempfile
34
from abc import ABC, abstractmethod
45
from datetime import datetime
56
from os.path import join, abspath, relpath, normpath, dirname, exists
67
from pathlib import Path
7-
from typing import Sequence, Union, TYPE_CHECKING, Any, Optional
8+
from typing import Sequence, Union, TYPE_CHECKING, Any, Optional, Literal
89

910
from floatcsep.utils.helpers import timewindow2str
1011

@@ -269,8 +270,9 @@ def build_tree(
269270
time_windows: Sequence[Sequence[datetime]] = None,
270271
model_class: str = "TimeIndependentModel",
271272
prefix: str = None,
272-
run_mode: str = "sequential",
273-
run_dir: Optional[str] = None,
273+
run_mode: str = Literal["serial", "parallel"],
274+
stage_dir: str = "results",
275+
run_id: Optional[str] = "run",
274276
) -> None:
275277
"""
276278
Creates the run directory, and reads the file structure inside.
@@ -279,11 +281,13 @@ def build_tree(
279281
time_windows (list(str)): List of time windows or strings.
280282
model_class (str): Model's class name
281283
prefix (str): prefix of the model forecast filenames if TD
282-
run_mode (str): if run mode is sequential, input data (args and cat) will be
284+
run_mode (str): if run mode is serial, input data (args and cat) will be
283285
dynamically overwritten in 'model/input/` through time_windows. If 'parallel',
284286
input data is dynamically writing anew in
285287
'results/{time_window}/input/{model_name}/'.
286-
run_dir (str): Where experiment's results are stored.
288+
stage_dir (str): Whether input data is stored persistently in the run_dir or
289+
just in tmp cache (Only for parallel execution).
290+
run_id (str): Job ID of the run for parallel execution and tmp storing of input data
287291
"""
288292

289293
windows = timewindow2str(time_windows)
@@ -293,33 +297,61 @@ def build_tree(
293297

294298
elif model_class == "TimeDependentModel":
295299

296-
# grab names for creating model directories
297300
subfolders = ["input", "forecasts"]
298-
dirtree = {folder: self.abs(self.path, folder) for folder in subfolders}
299-
for _, folder_ in dirtree.items():
301+
model_dirtree = {folder: self.abs(self.path, folder) for folder in subfolders}
302+
for _, folder_ in model_dirtree.items():
300303
os.makedirs(folder_, exist_ok=True)
301304

302-
if run_mode == "sequential":
303-
self.input_args = {
304-
win: Path(self.path, "input", self.args_file) for win in windows
305-
}
306-
self.input_cats = {
307-
win: Path(self.path, "input", self.input_cat) for win in windows
308-
}
309-
elif run_mode == "parallel":
310-
self.input_args = {
311-
win: Path(run_dir, win, "input", self.model_name, self.args_file)
312-
for win in windows
313-
}
314-
self.input_cats = {
315-
win: Path(run_dir, win, "input", self.model_name, self.input_cat)
316-
for win in windows
317-
}
305+
# Decide the base dir for *per-window* inputs (args + catalog)
306+
# - serial mode: under the model folder {model_name}/input
307+
# - parallel + run_dir: <run_dir>/<win>/input/<model_name>/
308+
# - parallel + tmp: <tmp_root>/floatcsep/<run_id>/<win>/input/<model_name>/
309+
def _window_input_dir(win: str) -> Path:
310+
if run_mode == "parallel":
311+
if stage_dir == "tmp":
312+
base_tmp = Path(tempfile.gettempdir())
313+
return base_tmp / "floatcsep" / run_id / win / "input" / self.model_name
314+
else:
315+
return Path(stage_dir, win, "input", self.model_name)
316+
else:
317+
return model_dirtree["input"]
318+
319+
# Build input/output maps
320+
if not prefix:
321+
prefix = self.model_name
322+
323+
for win in windows:
324+
input_dir = _window_input_dir(win)
325+
os.makedirs(input_dir, exist_ok=True)
326+
327+
self.input_args[win] = Path(input_dir, self.args_file)
328+
self.input_cats[win] = Path(input_dir, self.input_cat)
318329

319330
self.forecasts = {
320-
win: Path(dirtree["forecasts"], f"{prefix}_{win}.{self.fmt}") for win in windows
331+
win: Path(model_dirtree["forecasts"], f"{prefix}_{win}.{self.fmt}")
332+
for win in windows
321333
}
322334

335+
def get_input_dir(self, tstring: str) -> Path:
336+
"""
337+
Returns the directory that contains the per-window input files (args/catalog).
338+
"""
339+
340+
if tstring in self.input_args:
341+
return self.abs(self.input_args[tstring]).parent
342+
elif tstring in self.input_cats:
343+
return self.abs(self.input_cats[tstring]).parent
344+
raise KeyError(f"No input directory has been built for window '{tstring}'")
345+
346+
def get_args_template_path(self) -> Path:
347+
"""
348+
Path to the model’s canonical args template: <model.path>/input/<args_file>.
349+
Exists regardless of staging mode. This file should come with the source model
350+
"""
351+
if not self.args_file:
352+
raise ValueError("args_file is not set on the registry.")
353+
return self.abs(self.path, "input", Path(self.args_file).name)
354+
323355
def as_dict(self) -> dict:
324356
"""
325357

floatcsep/model.py

Lines changed: 78 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import os
55
from abc import ABC, abstractmethod
66
from datetime import datetime
7+
from pathlib import Path
78
from typing import List, Callable, Union, Sequence
89

910
import yaml
@@ -284,9 +285,11 @@ def __init__(
284285
self.build, self.name, self.registry.path.as_posix()
285286
)
286287

287-
def stage(self, time_windows=None, run_mode="sequential", run_dir="") -> None:
288+
def stage(
289+
self, time_windows=None, run_mode="sequential", stage_dir="results", run_id="run"
290+
) -> None:
288291
"""
289-
Core method to interface a model with the experiment.
292+
Retrieve model artifacts and Set up its interface with the experiment.
290293
291294
1) Get the model from filesystem, Zenodo or Git. Prepares the directory
292295
2) If source code, creates the computational environment (conda, venv or Docker)
@@ -306,7 +309,8 @@ def stage(self, time_windows=None, run_mode="sequential", run_dir="") -> None:
306309
model_class=self.__class__.__name__,
307310
prefix=self.__dict__.get("prefix", self.name),
308311
run_mode=run_mode,
309-
run_dir=run_dir,
312+
stage_dir=stage_dir,
313+
run_id=run_id,
310314
)
311315

312316
def get_source(self, zenodo_id: int = None, giturl: str = None, **kwargs) -> None:
@@ -406,73 +410,83 @@ def prepare_args(self, start: datetime, end: datetime, **kwargs) -> None:
406410
"""
407411
window_str = timewindow2str([start, end])
408412

409-
filepath = self.registry.get_args_key(window_str)
410-
fmt = os.path.splitext(filepath)[1]
411-
412-
if fmt == ".txt":
413-
414-
def replace_arg(arg, val, fp):
415-
with open(fp, "r") as filearg_:
416-
lines = filearg_.readlines()
417-
418-
pattern_exists = False
419-
for k, line in enumerate(lines):
420-
if line.startswith(arg):
421-
lines[k] = f"{arg} = {val}\n"
422-
pattern_exists = True
423-
break # assume there's only one occurrence of the key
424-
if not pattern_exists:
425-
lines.append(f"{arg} = {val}\n")
426-
with open(fp, "w") as file:
427-
file.writelines(lines)
428-
429-
replace_arg("start_date", start.isoformat(), filepath)
430-
replace_arg("end_date", end.isoformat(), filepath)
431-
for i, j in kwargs.items():
432-
replace_arg(i, j, filepath)
433-
434-
elif fmt == ".json":
435-
with open(filepath, "r") as file_:
436-
args = json.load(file_)
437-
args["start_date"] = start.isoformat()
438-
args["end_date"] = end.isoformat()
439-
440-
args.update(kwargs)
441-
442-
with open(filepath, "w") as file_:
443-
json.dump(args, file_, indent=2)
444-
445-
elif fmt == ".yml" or fmt == ".yaml":
446-
447-
def nested_update(dest: dict, src: dict, max_depth: int = 3, _level: int = 1):
448-
"""
449-
Recursively update dest with values from src down to max_depth levels.
450-
- If dest[k] and src[k] are both dicts, recurse (until max_depth).
451-
- Otherwise overwrite dest[k] with src[k].
452-
"""
453-
for key, val in src.items():
413+
dest_path = Path(self.registry.get_args_key(window_str))
414+
tpl_path = self.registry.get_args_template_path()
415+
suffix = tpl_path.suffix.lower()
416+
417+
if suffix == ".txt":
418+
419+
def load_kv(fp: Path) -> dict:
420+
data = {}
421+
if fp.exists():
422+
with open(fp, "r") as f:
423+
for line in f:
424+
line = line.strip()
425+
if not line or line.startswith("#"):
426+
continue
427+
if "=" in line:
428+
k, v = line.split("=", 1)
429+
data[k.strip()] = v.strip()
430+
return data
431+
432+
def dump_kv(fp: Path, data: dict) -> None:
433+
ordered_keys = []
434+
for k in ("start_date", "end_date"):
435+
if k in data:
436+
ordered_keys.append(k)
437+
ordered_keys += sorted(
438+
k for k in data.keys() if k not in ("start_date", "end_date")
439+
)
440+
441+
with open(fp, "w") as f:
442+
for k in ordered_keys:
443+
f.write(f"{k} = {data[k]}\n")
444+
445+
data = load_kv(tpl_path)
446+
data["start_date"] = start.isoformat()
447+
data["end_date"] = end.isoformat()
448+
for k, v in (kwargs or {}).items():
449+
data[k] = v
450+
dump_kv(dest_path, data)
451+
452+
elif suffix == ".json":
453+
base = {}
454+
if tpl_path.exists():
455+
with open(tpl_path, "r") as f:
456+
base = json.load(f) or {}
457+
base["start_date"] = start.isoformat()
458+
base["end_date"] = end.isoformat()
459+
base.update(kwargs or {})
460+
461+
with open(dest_path, "w") as f:
462+
json.dump(base, f, indent=2)
463+
464+
elif suffix in (".yml", ".yaml"):
465+
if tpl_path.exists():
466+
with open(tpl_path, "r") as f:
467+
data = yaml.safe_load(f) or {}
468+
else:
469+
data = {}
470+
471+
data["start_date"] = start.isoformat()
472+
data["end_date"] = end.isoformat()
473+
474+
def nested_update(dest: dict, src: dict, max_depth: int = 3, _lvl: int = 1):
475+
for key, val in (src or {}).items():
454476
if (
455-
_level < max_depth
477+
_lvl < max_depth
456478
and key in dest
457479
and isinstance(dest[key], dict)
458480
and isinstance(val, dict)
459481
):
460-
nested_update(dest[key], val, max_depth, _level + 1)
482+
nested_update(dest[key], val, max_depth, _lvl + 1)
461483
else:
462484
dest[key] = val
463485

464-
if not os.path.exists(filepath):
465-
template_file = os.path.join(
466-
self.registry.path, "input", self.registry.args_file
467-
)
468-
else:
469-
template_file = filepath
486+
nested_update(data, self.func_kwargs or {})
487+
nested_update(data, kwargs or {})
488+
with open(dest_path, "w") as f:
489+
yaml.safe_dump(data, f, indent=2)
470490

471-
with open(template_file, "r") as file_:
472-
args = yaml.safe_load(file_)
473-
args["start_date"] = start.isoformat()
474-
args["end_date"] = end.isoformat()
475-
476-
nested_update(args, self.func_kwargs)
477-
with open(filepath, "w") as file_:
478-
yaml.safe_dump(args, file_, indent=2)
491+
else:
492+
raise ValueError(f"Unsupported args file format: {suffix}")

floatcsep/postprocess/reporting.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,9 @@ def generate_report(experiment, timewindow=-1):
9393
)
9494
for model in experiment.models:
9595
try:
96-
fig_path = experiment.registry.get_figure_key(timestr, f"{test.name}_{model.name}")
96+
fig_path = experiment.registry.get_figure_key(
97+
timestr, f"{test.name}_{model.name}"
98+
)
9799
width = test.plot_args[0].get("figsize", [4])[0] * 96
98100
report.add_figure(
99101
f"{test.name}: {model.name}",

0 commit comments

Comments
 (0)