-
Notifications
You must be signed in to change notification settings - Fork 35
[WIP] Async FP #308
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[WIP] Async FP #308
Conversation
Signed-off-by: zjgemi <[email protected]>
Signed-off-by: zjgemi <[email protected]>
for more information, see https://pre-commit.ci
📝 WalkthroughWalkthroughAdds configurable asynchronous FP handling controlled by Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant User
participant Submit as submit.workflow_concurrent_learning
participant CL as ConcurrentLearningBlock
participant Sel as SelectConfs (ConfSelectorFrames)
participant FP as prep-run-fp (sync)
participant AFP as prep-run-fp (async)
participant Col as CollectData
Note over Submit: Read fp.async_ratio -> async_fp=(ratio>0)
Submit->>CL: make_concurrent_learning_op(async_fp)
CL->>Sel: select(trajs, model_devis, async_ratio)
Sel-->>CL: confs, async_confs, report
CL->>FP: run on confs
alt async_fp enabled
CL->>AFP: run on async_confs
AFP-->>CL: async_labeled_data
end
FP-->>CL: labeled_data
CL->>Col: labeled_data + async_labeled_data?
Col-->>CL: iter_data
CL-->>Submit: outputs include async_confs
sequenceDiagram
participant Flow as ConcurrentLearningLoop/_dpgen
participant Block as block step
participant Next as next step
Note over Flow: `async_confs` added as optional InputArtifact
Flow->>Block: artifacts {..., async_confs}
Block-->>Next: artifacts {..., async_confs}
Next-->>Flow: propagate for subsequent iterations
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. 📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 💡 Knowledge Base configuration:
You can enable these sources in your CodeRabbit configuration. 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
dpgen2/exploration/selector/conf_selector.py (1)
34-41
: Add docstring toConfSelector.select
and align all implementations to the triple‐return contractThe abstract declaration in
dpgen2/exploration/selector/conf_selector.py
must document the three outputs, and every subclass (including tests) must update its return‐type annotation toTuple[List[Path], List[Path], ExplorationReport]
to match the actual values returned.• In
dpgen2/exploration/selector/conf_selector.py
, amend theselect
stub:@@ class ConfSelector(ABC): @abstractmethod def select( self, trajs: Union[List[Path], List[HDF5Dataset]], model_devis: Union[List[Path], List[HDF5Dataset]], type_map: Optional[List[str]] = None, optional_outputs: Optional[List[Path]] = None, - ) -> Tuple[List[Path], List[Path], ExplorationReport]: - pass + ) -> Tuple[List[Path], List[Path], ExplorationReport]: + """ + Returns + ------- + confs : List[Path] + Selected configurations for the standard FP path. + async_confs : List[Path] + Selected configurations for the asynchronous FP path. May be empty when disabled. + report : ExplorationReport + Detailed report of the exploration process. + """ + pass• In
tests/mocked_ops.py
, both mock selectors must update their return‐type annotations:@@ class MockedConfSelector(ConfSelector): - def select(...) -> Tuple[List[Path], ExplorationReport]: + def select(...) -> Tuple[List[Path], List[Path], ExplorationReport]: @@ class MockedAsyncConfSelector(ConfSelector): - def select(...) -> Tuple[List[Path], ExplorationReport]: + def select(...) -> Tuple[List[Path], List[Path], ExplorationReport]:• Double-check any other
ConfSelector
implementations (e.g., indpgen2/exploration/selector/*
) to ensure their signatures and return statements consistently reflect the three‐item contract.tests/test_block_cl.py (1)
59-77
: Fix missing import causing NameError: add MockedAsyncConfSelector to imports.The async test uses MockedAsyncConfSelector (Line 499) but it isn’t imported here, causing the unit test failure reported by CI.
Apply this diff to import the symbol:
from mocked_ops import ( MockedCollectData, MockedCollectDataCheckOptParam, MockedConfSelector, + MockedAsyncConfSelector, MockedExplorationReport, MockedExplorationTaskGroup, MockedPrepDPTrain, MockedPrepVasp, MockedRunDPTrain, MockedRunDPTrainCheckOptParam, MockedRunLmp, MockedRunVasp, MockedSelectConfs, make_mocked_init_data, make_mocked_init_models, mocked_incar_template, mocked_numb_models, mocked_numb_select, mocked_template_script, )
🧹 Nitpick comments (13)
dpgen2/entrypoint/args.py (1)
470-470
: Clarify semantics and bounds for async_ratioPlease make the doc explicit that async_ratio is a fraction in [0.0, 1.0] controlling the proportion of selected configurations routed to the async FP path. Right now it’s ambiguous.
Apply this doc tweak:
- doc_async_ratio = "Configuration ratio for async fp" + doc_async_ratio = ( + "Fraction [0.0, 1.0] of selected configurations routed to the asynchronous " + "FP path. 0.0 disables async FP; 1.0 routes all selected configurations." + )dpgen2/op/collect_data.py (1)
71-90
: Update execute() docstring to document async_labeled_dataThe docstring omits the new optional async input. Please add it for discoverability.
@@ - `labeled_data`: (`Artifact(List[Path])`) The paths of labeled data generated by FP tasks of the current iteration. + - `async_labeled_data`: (`Artifact(List[Path])`, optional) + The paths of labeled data generated by asynchronous FP tasks of the current iteration. - `iter_data`: (`Artifact(List[Path])`) The data paths previous iterations.dpgen2/op/select_confs.py (1)
71-80
: Fix return docs: pluralize confs and add async_confsDocs still say “conf” and don’t mention async_confs.
@@ - - `report`: (`ExplorationReport`) The report on the exploration. - - `conf`: (`Artifact(List[Path])`) The selected configurations. + - `report`: (`ExplorationReport`) The report on the exploration. + - `confs`: (`Artifact(List[Path])`) The selected configurations. + - `async_confs`: (`Artifact(List[Path])`) The configurations routed to the async FP path + (may be an empty list when async_ratio == 0).dpgen2/flow/dpgen_loop.py (1)
467-478
: Optional: expose async_confs at the loop/concurrent-learning outputs for observability.If downstream consumers (e.g., resubmission tooling) need access to accumulated async_confs, consider adding an OutputArtifact on ConcurrentLearningLoop and ConcurrentLearning and forwarding the latest value similarly to models/iter_data via if_expression. This keeps the external interface symmetric.
tests/exploration/test_conf_selector_frame.py (1)
196-216
: Clean up async_confs directory in tearDown to avoid test pollution.select() writes an async_confs directory. Tear down should remove it, like confs, to keep tests hermetic.
Apply this diff to extend teardown:
def tearDown(self): for ii in ["foo.dump", "bar.dump", "foo.md", "bar.md"]: if Path(ii).is_file(): os.remove(ii) - for ii in ["confs"]: + for ii in ["confs", "async_confs"]: if Path(ii).is_dir(): shutil.rmtree(ii)tests/mocked_ops.py (2)
870-893
: Align MockedConfSelector.select signature with the new abstract API.The abstract ConfSelector.select now returns (confs, async_confs, report). Update the annotation accordingly to avoid confusion and future type-check issues.
Apply this diff:
def select( self, trajs: List[Path], model_devis: List[Path], type_map: List[str] = None, optional_outputs: Optional[List[Path]] = None, - ) -> Tuple[List[Path], ExplorationReport]: + ) -> Tuple[List[Path], List[Path], ExplorationReport]: confs = [] ... - return confs, [], report + return confs, [], report
895-915
: Fix MockedAsyncConfSelector.select return annotation (currently mismatched).The function returns three values but the type hint advertises two. Update to the triple tuple to match the base class and actual return value.
Apply this diff:
class MockedAsyncConfSelector(ConfSelector): def __init__( self, conv_accuracy: float = 0.9, ): self.conv_accuracy = conv_accuracy def select( self, trajs: List[Path], model_devis: List[Path], type_map: List[str] = None, optional_outputs: Optional[List[Path]] = None, - ) -> Tuple[List[Path], ExplorationReport]: + ) -> Tuple[List[Path], List[Path], ExplorationReport]: fname = Path("confs") fname.write_text("conf of confs") fname = Path("async_confs") fname.write_text("conf of async_confs") report = MockedExplorationReport(conv_accuracy=self.conv_accuracy) return [Path("confs")], [Path("async_confs")], reportdpgen2/entrypoint/submit.py (1)
310-333
: Backward-compatible default for fp.async_ratio.Direct indexing will KeyError on older configs lacking fp.async_ratio. Use .get(..., 0) to default to synchronous behavior.
Apply these diffs:
def make_naive_exploration_scheduler_without_conf(config, explore_style): model_devi_jobs = config["explore"]["stages"] fp_task_max = config["fp"]["task_max"] - fp_async_ratio = config["fp"]["async_ratio"] + fp_async_ratio = config["fp"].get("async_ratio", 0) ... selector = ConfSelectorFrames( render, report, fp_task_max, conf_filters, - fp_async_ratio, + fp_async_ratio, )def make_lmp_naive_exploration_scheduler(config): ... fp_task_max = config["fp"]["task_max"] - fp_async_ratio = config["fp"]["async_ratio"] + fp_async_ratio = config["fp"].get("async_ratio", 0) ... selector = ConfSelectorFrames( render, report, fp_task_max, conf_filters, - fp_async_ratio, + fp_async_ratio, )def workflow_concurrent_learning( config: Dict, ) -> Step: ... - fp_async_ratio = config["fp"]["async_ratio"] + fp_async_ratio = config["fp"].get("async_ratio", 0)Also applies to: 368-395, 497-504
dpgen2/exploration/selector/conf_selector_frame.py (3)
86-90
: Clarify return contract for async_confs possibly being emptyDoc currently states “list only has one item,” which isn’t true when no async selection occurs. Clarify to “zero or one item” to match runtime behavior.
Apply this diff:
- async_confs : List[Path] - The selected confgurations for async fp, stored in a folder in deepmd/npy format, can be parsed as dpdata.MultiSystems. The `list` only has one item. + async_confs : List[Path] + The selected configurations for async FP, stored in a folder in deepmd/npy format, can be parsed as dpdata.MultiSystems. + This list has either zero or one item depending on the split result.
111-119
: Handle 100% async split and empty write for ms to avoid failures in to_deepmd_npyWhen async_ratio == 1.0 (or all frames randomly fall into the async bucket), ms can be empty. Depending on dpdata.MultiSystems semantics, ms.to_deepmd_npy(out_path) may error, and downstream steps expecting non-empty confs may break.
- Verify dpdata.MultiSystems.to_deepmd_npy supports empty MultiSystems without error.
- If not, either:
- return an empty confs list to signal “no non-async confs,” and conditionally skip the normal FP step in the block (see block.py suggestion), or
- ensure at least one frame remains in the non-async set when async_ratio < 1.0.
Proposed minimal guard:
out_path = Path("confs") out_path.mkdir(exist_ok=True) - ms.to_deepmd_npy(out_path) # type: ignore + if len(ms) > 0: + ms.to_deepmd_npy(out_path) # type: ignoreAnd adjust the return to reflect emptiness:
- return [out_path], async_confs, copy.deepcopy(self.report) + confs = [out_path] if len(ms) > 0 else [] + return confs, async_confs, copy.deepcopy(self.report)Follow-up: see coupled change to conditionally schedule the normal prep-run-fp step in block.py.
127-140
: Make split_multisystems robust (type hints, clamped ratio, stable ordering)
- Clamp ratio to [0, 1] defensively.
- Preserve frame order in unselected_indices for reproducibility.
- Add type hints to improve readability and tooling.
Apply this diff:
-def split_multisystems(ms, ratio): - selected_ms = dpdata.MultiSystems() - unselected_ms = dpdata.MultiSystems() - for s in ms: - nsel = math.floor(len(s) * ratio) - if random.random() < len(s) * ratio - nsel: - nsel += 1 - selected_indices = random.sample(range(len(s)), nsel) - unselected_indices = list(set(range(len(s))).difference(selected_indices)) - if len(selected_indices) > 0: - selected_ms.append(s.sub_system(selected_indices)) - if len(unselected_indices) > 0: - unselected_ms.append(s.sub_system(unselected_indices)) - return selected_ms, unselected_ms +from typing import Tuple + +def split_multisystems( + ms: dpdata.MultiSystems, ratio: float +) -> Tuple[dpdata.MultiSystems, dpdata.MultiSystems]: + ratio = max(0.0, min(1.0, float(ratio))) + selected_ms = dpdata.MultiSystems() + unselected_ms = dpdata.MultiSystems() + for s in ms: + n = len(s) + nsel = math.floor(n * ratio) + # randomized rounding to meet expectation exactly on average + if random.random() < n * ratio - nsel: + nsel += 1 + if nsel > 0: + selected_indices = sorted(random.sample(range(n), nsel)) + else: + selected_indices = [] + selected_set = set(selected_indices) + # preserve original ordering for unselected indices + unselected_indices = [i for i in range(n) if i not in selected_set] + if selected_indices: + selected_ms.append(s.sub_system(selected_indices)) + if unselected_indices: + unselected_ms.append(s.sub_system(unselected_indices)) + return selected_ms, unselected_msOptional: expose a seed or RNG to make the split reproducible across runs.
dpgen2/superop/block.py (2)
277-301
: Archive async_confs from select_confs to ensure artifact persistencePythonOPTemplate currently archives only “confs”. If “async_confs” must be persisted (e.g., downloaded or passed via artifact store), include it in output_artifact_archive as well.
Apply this diff:
select_confs = Step( name=name + "-select-confs", template=PythonOPTemplate( select_confs_op, - output_artifact_archive={"confs": None}, + output_artifact_archive={"confs": None, "async_confs": None}, python_packages=upload_python_packages, **select_confs_template_config, ),Please confirm whether dflow requires explicit listing for all output artifacts to be archived. If not required, this change is still harmless and makes intent explicit.
143-151
: Expose async prep-run-fp key for query tooling (consistency with keys contract)If your dflow_query logic relies on ConcurrentLearningBlock.keys, consider including the async variant so users/tools can discover it uniformly.
Apply this diff:
self._my_keys = ["select-confs", "collect-data"] self._keys = ( prep_run_dp_train_op.keys + prep_run_explore_op.keys + self._my_keys[:1] + prep_run_fp_op.keys + self._my_keys[1:2] ) + if async_fp: + # Advertise async prep-run-fp key variant for discoverability + self._keys = self._keys + [f"{k}-async" for k in prep_run_fp_op.keys]If dflow_query was explicitly updated to synthesize “-async” keys, you can skip this; otherwise this keeps the API self-descriptive.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (12)
dpgen2/entrypoint/args.py
(2 hunks)dpgen2/entrypoint/submit.py
(9 hunks)dpgen2/exploration/selector/conf_selector.py
(1 hunks)dpgen2/exploration/selector/conf_selector_frame.py
(4 hunks)dpgen2/flow/dpgen_loop.py
(5 hunks)dpgen2/op/collect_data.py
(2 hunks)dpgen2/op/select_confs.py
(3 hunks)dpgen2/superop/block.py
(8 hunks)dpgen2/utils/dflow_query.py
(1 hunks)tests/exploration/test_conf_selector_frame.py
(5 hunks)tests/mocked_ops.py
(3 hunks)tests/test_block_cl.py
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (6)
dpgen2/exploration/selector/conf_selector.py (1)
dpgen2/exploration/report/report.py (1)
ExplorationReport
(18-102)
dpgen2/op/select_confs.py (3)
dpgen2/exploration/selector/conf_selector_frame.py (1)
select
(61-124)tests/mocked_ops.py (2)
select
(864-892)select
(902-914)dpgen2/exploration/selector/conf_selector.py (1)
select
(34-41)
tests/exploration/test_conf_selector_frame.py (3)
dpgen2/exploration/selector/conf_selector_frame.py (1)
select
(61-124)tests/mocked_ops.py (2)
select
(864-892)select
(902-914)dpgen2/exploration/render/traj_render_lammps.py (1)
TrajRenderLammps
(40-134)
tests/test_block_cl.py (4)
tests/mocked_ops.py (7)
MockedPrepVasp
(432-473)MockedRunVasp
(476-528)make_mocked_init_models
(109-115)make_mocked_init_data
(118-124)MockedAsyncConfSelector
(895-914)MockedSelectConfs
(917-940)MockedCollectData
(645-675)dpgen2/superop/prep_run_fp.py (2)
PrepRunFp
(49-125)keys
(124-125)dpgen2/fp/vasp_input.py (1)
VaspInputs
(23-110)dpgen2/superop/block.py (2)
ConcurrentLearningBlock
(87-190)keys
(189-190)
dpgen2/exploration/selector/conf_selector_frame.py (3)
tests/mocked_ops.py (2)
select
(864-892)select
(902-914)dpgen2/exploration/selector/conf_selector.py (1)
select
(34-41)dpgen2/exploration/report/report.py (1)
ExplorationReport
(18-102)
tests/mocked_ops.py (2)
dpgen2/exploration/selector/conf_selector.py (2)
ConfSelector
(30-41)select
(34-41)dpgen2/exploration/selector/conf_selector_frame.py (1)
select
(61-124)
🪛 Ruff (0.12.2)
tests/test_block_cl.py
499-499: Undefined name MockedAsyncConfSelector
(F821)
🪛 GitHub Actions: Python unit-tests
tests/test_block_cl.py
[error] 499-499: NameError: name 'MockedAsyncConfSelector' is not defined during test setup (dpgen2/dpgen2/tests/test_block_cl.py:499).
🔇 Additional comments (15)
dpgen2/entrypoint/args.py (1)
489-495
: Consider validating async_ratio ∈ [0, 1]Given it’s a ratio, values outside [0, 1] should be rejected or clamped before downstream logic uses it. If dargs lacks built-in range validators here, we can add a lightweight check where the value is consumed (e.g., in the selector or block). For this file, the doc update above is sufficient, but please ensure a runtime guard exists downstream.
Would you like me to open a follow-up patch adding a guard where async_ratio is used to split MultiSystems?
dpgen2/utils/dflow_query.py (1)
41-41
: Verify async-sliced step patterns
No existing keys matching the “async–…–slice” form were found in the codebase, but absence of evidence is not evidence of absence. Please manually confirm whether any async‐sliced step keys are ever generated or consumed at runtime. If you do need to support them, apply the following updates in dpgen2/utils/dflow_query.py:• In matched_step_key(), extend the async match to include a slice suffix:
@@ def matched_step_key(all_keys: List[str], step_keys: Optional[List[str]] = None,): - or re.match(f"iter-[0-9]*-async--{jj}", kk) + or re.match(f"iter-[0-9]*-async--{jj}", kk) + or re.match(f"iter-[0-9]*-async--{jj}-[0-9]*", kk)• In find_slice_ranges(), allow an optional “-async” when detecting slices:
@@ def find_slice_ranges(keys: List[str], sliced_subkey: str,): - if re.match(f"iter-[0-9]*--{sliced_subkey}-[0-9]*", ii): + if re.match(f"iter-[0-9]*(-async)?--{sliced_subkey}-[0-9]*", ii): @@ def find_slice_ranges(keys: List[str], sliced_subkey: str,): - if not re.match(f"iter-[0-9]*--{sliced_subkey}-[0-9]*", ii): + if not re.match(f"iter-[0-9]*(-async)?--{sliced_subkey}-[0-9]*", ii):Please review whether any async-sliced steps exist or could be introduced, and apply these changes if needed.
dpgen2/op/select_confs.py (2)
52-53
: Confirm dflow semantics for empty artifacts; consider making async_confs optionalWhen async_ratio == 0, implementations return an empty list for async_confs. Some dflow setups treat empty Artifact(List[Path]) as invalid. If that’s the case here, mark async_confs optional.
If needed, apply:
- "async_confs": Artifact(List[Path]), + "async_confs": Artifact(List[Path], optional=True),
92-104
: LGTM: triple return wiring and propagationUnpacking (confs, async_confs, report) and exposing async_confs in outputs matches the new ConfSelector API and keeps backward compatibility for consumers of confs/report.
tests/test_block_cl.py (1)
448-601
: Async FP E2E test looks solid and exercises the new data path.Good coverage: validates the async prep-run-fp step output and the final async_confs artifact on the block step. The assertions match the mocked FP behavior.
dpgen2/flow/dpgen_loop.py (2)
191-192
: Threading async_confs into loop inputs is correct.Adding async_confs as an optional InputArtifact in both ConcurrentLearningLoop and ConcurrentLearning is consistent and backward-compatible.
Also applies to: 283-284
381-382
: Correctly wiring async_confs through block and next steps.Passing inputs.async_confs to the block, and the block’s async_confs to the next iteration ensures persistence of async inflow across iterations.
Also applies to: 454-455, 559-560
tests/exploration/test_conf_selector_frame.py (1)
102-105
: Signature adaptation to triple return (confs, async_confs, report) is good.Using the throwaway placeholder for async_confs in existing tests maintains backward compatibility while adopting the new API.
Also applies to: 127-130, 153-156, 183-185
tests/mocked_ops.py (1)
926-939
: MockedSelectConfs now emits async_confs as expected.Good adaptation of the mocked operator to the extended select() contract and outputs schema.
dpgen2/entrypoint/submit.py (3)
156-174
: Public API addition async_fp is clear and well-scoped.Exposing async_fp on make_concurrent_learning_op is the right place to control the block behavior.
262-273
: Correctly forwarding async_fp to ConcurrentLearningBlock.This ensures the block can enable the async pipeline based on config.
672-676
: Providing an empty async_confs input artifact is sensible.Seeding the DPGEN step with an empty async_confs ensures the loop’s optional artifact wiring remains valid even when not used at iteration 0.
dpgen2/superop/block.py (3)
99-101
: LGTM: async_fp flag adds an opt‑in path without breaking defaultsDefault False preserves existing behavior. Good backward compatibility.
119-129
: Inputs/outputs for async_confs are well introducedAdding async_confs as an optional input and as an output aligns with the new async flow.
355-357
: LGTM: wiring async_confs to block outputsOutput artifact correctly sources from select_confs, enabling loop propagation.
async_ratio: float = 0.0, | ||
): | ||
self.max_numb_sel = max_numb_sel | ||
self.conf_filters = conf_filters | ||
self.traj_render = traj_render | ||
self.report = report | ||
self.async_ratio = async_ratio | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Validate async_ratio in constructor to prevent out-of-bounds selection and runtime errors
Negative or >1 ratios can lead to ValueError in random.sample and undefined split behavior. Guard early.
Apply this diff:
def __init__(
self,
traj_render: TrajRender,
report: ExplorationReport,
max_numb_sel: Optional[int] = None,
conf_filters: Optional[ConfFilters] = None,
- async_ratio: float = 0.0,
+ async_ratio: float = 0.0,
):
self.max_numb_sel = max_numb_sel
self.conf_filters = conf_filters
self.traj_render = traj_render
self.report = report
- self.async_ratio = async_ratio
+ # Validate [0, 1] to keep downstream splitting safe
+ if not (0.0 <= async_ratio <= 1.0):
+ raise ValueError(f"async_ratio must be within [0, 1], got {async_ratio}")
+ self.async_ratio = float(async_ratio)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async_ratio: float = 0.0, | |
): | |
self.max_numb_sel = max_numb_sel | |
self.conf_filters = conf_filters | |
self.traj_render = traj_render | |
self.report = report | |
self.async_ratio = async_ratio | |
async_ratio: float = 0.0, | |
): | |
self.max_numb_sel = max_numb_sel | |
self.conf_filters = conf_filters | |
self.traj_render = traj_render | |
self.report = report | |
# Validate [0, 1] to keep downstream splitting safe | |
if not (0.0 <= async_ratio <= 1.0): | |
raise ValueError(f"async_ratio must be within [0, 1], got {async_ratio}") | |
self.async_ratio = float(async_ratio) |
🤖 Prompt for AI Agents
In dpgen2/exploration/selector/conf_selector_frame.py around lines 53 to 60,
validate the async_ratio constructor argument to ensure it is within [0.0, 1.0];
if async_ratio < 0.0 or async_ratio > 1.0 raise a ValueError with a clear
message indicating the invalid value and expected range so invalid ratios cannot
later cause random.sample or splitting errors.
"async_labeled_data": Artifact(List[Path], optional=True), | ||
"iter_data": Artifact(List[Path]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guard optional async_labeled_data to avoid KeyError; simplify concatenation
Accessing ip["async_labeled_data"] raises KeyError when the optional artifact is absent. Use .get() and default to an empty list. Then drop the inner “or []” in the concatenation.
Apply:
@@ class CollectData(OP):
- "async_labeled_data": Artifact(List[Path], optional=True),
+ "async_labeled_data": Artifact(List[Path], optional=True),
@@ def execute(self, ip: OPIO) -> OPIO:
- async_labeled_data = ip["async_labeled_data"]
+ async_labeled_data = ip.get("async_labeled_data") or []
@@ def execute(self, ip: OPIO) -> OPIO:
- for ii in labeled_data + (async_labeled_data or []):
+ for ii in labeled_data + async_labeled_data:
Also applies to: 95-100
🤖 Prompt for AI Agents
In dpgen2/op/collect_data.py around lines 53-54 (and similarly lines 95-100),
the code indexes the optional artifact ip["async_labeled_data"] which raises
KeyError when absent; change those accesses to ip.get("async_labeled_data", [])
so missing keys yield an empty list, and simplify concatenations by removing the
inner "or []" (i.e., concatenate using ip.get("async_labeled_data", []) +
ip["iter_data"] or similar) to avoid redundant fallbacks.
if async_fp: | ||
async_fp_step = Step( | ||
name=name + "-async-prep-run-fp", | ||
template=prep_run_fp_op, | ||
parameters={ | ||
"block_id": f"{block_steps.inputs.parameters['block_id']}-async", | ||
"fp_config": block_steps.inputs.parameters["fp_config"], | ||
"type_map": block_steps.inputs.parameters["type_map"], | ||
}, | ||
artifacts={ | ||
"confs": block_steps.inputs.artifacts["async_confs"], | ||
}, | ||
key=f"{block_steps.inputs.parameters['block_id']}-async--prep-run-fp", | ||
) | ||
block_steps.add([prep_run_dp_train, async_fp_step]) | ||
async_labeled_data = async_fp_step.outputs.artifacts["labeled_data"] | ||
else: | ||
block_steps.add(prep_run_dp_train) | ||
async_labeled_data = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Gate the async FP step when no async_confs are provided to avoid runtime failures
If async_fp is True but the block receives no async_confs (typical for the first iteration), PrepRunFp may receive a missing/empty artifact and fail. Use an Argo “when” guard leveraging argo_len so the step runs only when input exists.
Apply this diff:
- if async_fp:
- async_fp_step = Step(
+ if async_fp:
+ async_fp_step = Step(
name=name + "-async-prep-run-fp",
template=prep_run_fp_op,
parameters={
"block_id": f"{block_steps.inputs.parameters['block_id']}-async",
"fp_config": block_steps.inputs.parameters["fp_config"],
"type_map": block_steps.inputs.parameters["type_map"],
},
artifacts={
"confs": block_steps.inputs.artifacts["async_confs"],
},
+ # Run only when upstream provided async_confs
+ when=argo_len(block_steps.inputs.artifacts["async_confs"]) > 0,
key=f"{block_steps.inputs.parameters['block_id']}-async--prep-run-fp",
)
block_steps.add([prep_run_dp_train, async_fp_step])
async_labeled_data = async_fp_step.outputs.artifacts["labeled_data"]
else:
Follow-up: if you adopt the conf_selector change to return [] when confs are empty, consider adding a similar guard to the normal prep-run-fp step keyed by the length of select_confs.outputs.artifacts["confs"] to keep the DAG robust when the split is 100% async.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if async_fp: | |
async_fp_step = Step( | |
name=name + "-async-prep-run-fp", | |
template=prep_run_fp_op, | |
parameters={ | |
"block_id": f"{block_steps.inputs.parameters['block_id']}-async", | |
"fp_config": block_steps.inputs.parameters["fp_config"], | |
"type_map": block_steps.inputs.parameters["type_map"], | |
}, | |
artifacts={ | |
"confs": block_steps.inputs.artifacts["async_confs"], | |
}, | |
key=f"{block_steps.inputs.parameters['block_id']}-async--prep-run-fp", | |
) | |
block_steps.add([prep_run_dp_train, async_fp_step]) | |
async_labeled_data = async_fp_step.outputs.artifacts["labeled_data"] | |
else: | |
block_steps.add(prep_run_dp_train) | |
async_labeled_data = None | |
if async_fp: | |
async_fp_step = Step( | |
name=name + "-async-prep-run-fp", | |
template=prep_run_fp_op, | |
parameters={ | |
"block_id": f"{block_steps.inputs.parameters['block_id']}-async", | |
"fp_config": block_steps.inputs.parameters["fp_config"], | |
"type_map": block_steps.inputs.parameters["type_map"], | |
}, | |
artifacts={ | |
"confs": block_steps.inputs.artifacts["async_confs"], | |
}, | |
# Run only when upstream provided async_confs | |
when=argo_len(block_steps.inputs.artifacts["async_confs"]) > 0, | |
key=f"{block_steps.inputs.parameters['block_id']}-async--prep-run-fp", | |
) | |
block_steps.add([prep_run_dp_train, async_fp_step]) | |
async_labeled_data = async_fp_step.outputs.artifacts["labeled_data"] | |
else: | |
block_steps.add(prep_run_dp_train) | |
async_labeled_data = None |
🤖 Prompt for AI Agents
In dpgen2/superop/block.py around lines 239-257, the async FP Step is created
and added unconditionally when async_fp is True which can cause PrepRunFp to
fail if the artifact async_confs is missing/empty; guard the async_fp_step with
an Argo "when" condition that checks
argo_len(block_steps.inputs.artifacts['async_confs']) > 0 so the step only
executes when async_confs exists and has items (apply the same condition to its
key/name logic as needed), and as a follow-up consider adding a similar
argo_len-based "when" guard to the normal prep-run-fp step using
select_confs.outputs.artifacts['confs'] to protect against 100% async splits.
Signed-off-by: zjgemi <[email protected]>
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #308 +/- ##
==========================================
+ Coverage 84.22% 84.28% +0.05%
==========================================
Files 104 104
Lines 6112 6148 +36
==========================================
+ Hits 5148 5182 +34
- Misses 964 966 +2 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Summary by CodeRabbit
New Features
Improvements
Tests