From abc73061c86f15581bbc9e1570c0dc4b0ccfbb58 Mon Sep 17 00:00:00 2001 From: Erin Howard Date: Tue, 26 Aug 2025 19:48:02 -0700 Subject: [PATCH] Add and update template QC curation scripts. --- bin.src/curate_templates.py | 352 +++++++++++++++++--- bin.src/make_curated_template_collection.py | 275 +++++++++++++++ 2 files changed, 575 insertions(+), 52 deletions(-) create mode 100644 bin.src/make_curated_template_collection.py diff --git a/bin.src/curate_templates.py b/bin.src/curate_templates.py index 47e75645..34869833 100755 --- a/bin.src/curate_templates.py +++ b/bin.src/curate_templates.py @@ -21,38 +21,42 @@ # along with this program. If not, see . -"""Curate template_coadd datasets for LSSTCam with a new TAGGED collection. +"""Curate template_coadd datasets for LSSTCam. -This script copies a selected set of template_coadd datasets to a new location -at a butler repository, and tag them to a new collection. - -The input collections are ideally collections with only vetted templates. - -The scripts leaves a ecsv file used by butler ingest-files. - -This script provides a short-term workaround to store duplicated templates -with a special prefix, before DM-50699 provides a better mechanism. +This script performs a curation cut on a set of template_coadd datasets and +prepares them for manual vetting. """ import argparse import logging +import os import sys -from astropy.table import Table +from astropy.table import Table, vstack from lsst.daf.butler import Butler, CollectionType -from lsst.daf.butler.script import ingest_files def _make_parser(): parser = argparse.ArgumentParser() parser.add_argument( "repo", - help="An existing data repository.", + help="An existing data repository containing the input collections.", + ) + parser.add_argument( + "tag", + help="A Jira ticket number for the new template collection name.", + ) + parser.add_argument( + "release_num", + help="The release number (##) for the given tag.", ) parser.add_argument( - "collections", - help="The input collections to search for template_coadd datasets.", + "--collections", + action="extend", + nargs="+", + required=True, + help="The input collections to search for template_coadd and coadd_depth_table datasets.", ) parser.add_argument( "--where", @@ -60,62 +64,306 @@ def _make_parser(): help="A string expression to select datasets in the input collections.", ) parser.add_argument( - "tag", - help="A Jira ticket number for the new template collection name.", + "--records_path", + required=False, + default="", + help="An absolute filepath to save records to.", + ) + parser.add_argument( + "--stat_records", + required=False, + help="An output table file with accepted/rejected stats on templates that pass" + " curation. Default is release_{release_num}_stat_records.csv.", ) parser.add_argument( - "--records", + "--filter_by", required=False, - default="records.ecsv", - help="An output table file with records of selected template files." - " The file can be used by butler ingest-files.", + default="depth_above_threshold_3", + help="The coadd_depth_table column to use for filtering." + " Default is depth_above_threshold_3.", + ) + parser.add_argument( + "--cutoff", + required=False, + default=95, + type=int, + help="The curation process will filter out anything below this cutoff." + " Default is 95.", ) return parser +def select_ref(drefs, tract, patch, band, dtype="template_coadd"): + if not drefs: + logging.warning(f"No {dtype} found for tract {tract}, patch {patch}, band {band}. Skipping.") + return None + if len(drefs) > 1: + return sorted(drefs, key=lambda ref: ref.run)[-1] + return drefs[0] + + +def make_threshold_cuts(butler, template_coadds, n_images, tracts, filter_by, cutoff): + """Select template_coadd and template_coadd_n_image datasets that pass a depth threshold. + + Parameters + ---------- + butler : Butler + Butler instance used to fetch the coadd depth table. + template_coadds : list of DatasetRef + Candidate template_coadd references to filter. + n_images : list of DatasetRef + Candidate template_coadd_n_image references to filter. + tracts : list[int] + List of tract IDs to evaluate. + filter_by : str + Column name in coadd_depth_table used for thresholding. + cutoff : int + Minimum depth value required for a patch/band to be accepted. + + Returns + ------- + accepted_drefs : list of DatasetRef + Template coadd dataset refs that passed the threshold. + rejected_drefs : list of DatasetRef + Template coadd dataset refs that did not pass the threshold. + accepted_n_image_refs : list of DatasetRef + Corresponding template_coadd_n_image dataset refs for the accepted coadds. + """ + accepted_drefs = [] + accepted_n_image_refs = [] + rejected_drefs = [] + + for tract in tracts: + coadd_depth_table = butler.get('coadd_depth_table', tract=tract) + mask = (coadd_depth_table[filter_by] > cutoff) + + # --- Handle accepted patches/bands --- + accepted_coadds = coadd_depth_table[mask] + for patch_band in accepted_coadds['patch', 'band']: + patch = patch_band[0] + band = patch_band[1] + + # Find matching template_coadd references for this tract/patch/band. + dref = [d for d in template_coadds + if d.dataId['tract'] == tract + and d.dataId['patch'] == patch + and d.dataId['band'] == band + ] + # Find matching template_coadd_n_image references for this tract/patch/band. + n_image_dref = [d for d in n_images + if d.dataId['tract'] == tract + and d.dataId['patch'] == patch + and d.dataId['band'] == band + ] + + # Skip if no template_coadd is found. + if not dref: + logging.warning(f"No template_coadd found for tract {tract}, patch {patch}, band {band}. " + f"Skipping.") + continue + + # If duplicates exist, keep the one from the most recent run. + if len(dref) > 1: + sorted_dupe_entry = sorted(dref, key=lambda ref: ref.run) + ref = sorted_dupe_entry[-1] + else: + ref = dref[0] + accepted_drefs.append(ref) + + # Skip if no corresponding template_coadd_n_image is found. + if not n_image_dref: + logging.warning(f"No template_coadd_n_image found for tract {tract}, patch {patch}, " + f"band {band}. Skipping.") + continue + + # Again, if duplicates exist, keep the latest run. + if len(n_image_dref) > 1: + sorted_dupe_entry = sorted(n_image_dref, key=lambda ref: ref.run) + n_image_ref = sorted_dupe_entry[-1] + else: + n_image_ref = n_image_dref[0] + accepted_n_image_refs.append(n_image_ref) + + # --- Handle accepted patches/bands --- + rejected_coadds = coadd_depth_table[~mask] + for patch_band in rejected_coadds['patch', 'band']: + patch = patch_band[0] + band = patch_band[1] + + # Find matching template_coadd references for this tract/patch/band. + dref = [d for d in template_coadds + if d.dataId['tract'] == tract + and d.dataId['patch'] == patch + and d.dataId['band'] == band + ] + + # Skip if no template_coadd is found. + if not dref: + logging.warning(f"No template_coadd found for tract {tract}, patch {patch}, band {band}. " + f"Skipping.") + continue + + # If duplicates exist, keep the one from the most recent run. + if len(dref) > 1: + sorted_dupe_entry = sorted(dref, key=lambda ref: ref.run) + ref = sorted_dupe_entry[-1] + else: + ref = dref[0] + rejected_drefs.append(ref) + + return accepted_drefs, rejected_drefs, accepted_n_image_refs + + +def run_stats(accepted_drefs, rejected_drefs, tracts, stats_records_file): + """ + Compute per-tract and per-band accepted/rejected statistics and save to CSV. + + Parameters + ---------- + accepted_drefs : list of DatasetRef + Template coadd references that passed curation. + rejected_drefs : list of DatasetRef + Template coadd references that failed curation. + tracts : iterable of int + List of tract IDs to include in the stats. + stats_records_file : str + Path to save the resulting CSV file. + """ + + bands = ['u', 'g', 'r', 'i', 'z', 'y'] + + # Build accepted table + if accepted_drefs: + accepted = Table( + { + 'tract': [int(r.dataId['tract']) for r in accepted_drefs], + 'patch': [int(r.dataId['patch']) for r in accepted_drefs], + 'band': [str(r.dataId['band']) for r in accepted_drefs], + 'status': ['accepted'] * len(accepted_drefs) + } + ) + else: + accepted = Table(names=('tract', 'patch', 'band', 'status')) + + # Build rejected table + if rejected_drefs: + rejected = Table( + { + 'tract': [int(r.dataId['tract']) for r in rejected_drefs], + 'patch': [int(r.dataId['patch']) for r in rejected_drefs], + 'band': [str(r.dataId['band']) for r in rejected_drefs], + 'status': ['rejected'] * len(rejected_drefs) + } + ) + else: + rejected = Table(names=('tract', 'patch', 'band', 'status')) + + # Combine tables + all_refs = vstack([accepted, rejected]) + + # Group by tract and band + grouped = all_refs.group_by(['tract', 'band']) + + # Prepare output table + stat_table_data = {'tract': [], } + for band in bands: + stat_table_data[f'{band}_num_accepted'] = [] + stat_table_data[f'{band}_percent_accepted'] = [] + + for tract in tracts: + stat_table_data['tract'].append(tract) + for band in bands: + mask = (grouped['tract'] == tract) & (grouped['band'] == band) + subset = grouped[mask] + n_total = len(subset) + n_accepted = (subset['status'] == 'accepted').sum() if n_total > 0 else 0 + percent = (n_accepted / n_total * 100) if n_total > 0 else float('nan') + stat_table_data[f'{band}_num_accepted'].append(f"{n_accepted} / {n_total}") + stat_table_data[f'{band}_percent_accepted'].append(percent) + + # Create final table + stat_table = Table(stat_table_data) + stat_table.write(stats_records_file, format='ascii.csv', overwrite=True) + + def main(): logging.basicConfig(level=logging.INFO, stream=sys.stdout) + # Hide spurious messages from numexpr by setting the numexpr env var. + os.environ["NUMEXPR_MAX_THREADS"] = "8" + args = _make_parser().parse_args() - butler = Butler(args.repo, writeable=True) + butler = Butler(args.repo, collections=args.collections) + butler_write = Butler(args.repo, writeable=True) + + # Create directory for records + directory = args.records_path + if directory: + if not os.path.exists(directory): + os.makedirs(directory) + + # Set (stat_)records defaults based on release_num if not provided + if args.stat_records is None: + args.stat_records = f"release_{args.release_num}_stat_records.csv" - tagged_collection = "LSSTCam/templates/" + args.tag - registered = butler.collections.register( + # Create tagged collection, abort if it already exists. + tagged_collection = f"LSSTCam/templates/candidates/{args.tag}/release_{args.release_num}" + logging.info(f"Creating tagged collection {tagged_collection}.") + registered = butler_write.collections.register( tagged_collection, type=CollectionType.TAGGED ) if not registered: logging.error(f"Collection {tagged_collection} already exists. Aborting.") sys.exit(1) - refs = butler.query_datasets( - "template_coadd", collections=args.collections, where=args.where, limit=None - ) - logging.info(f"Found {len(refs)} template_coadd datasets in {args.collections}.") - - columns = ("filename", "band", "skymap", "tract", "patch") - data = Table(names=columns, dtype=("str", "str", "str", "int", "int")) - for ref in refs: - uri_path = butler.getURI(ref).geturl() - data_id_values = tuple(ref.dataId[col] for col in columns[1:]) - data.add_row((uri_path, *data_id_values)) - data.write(args.records, overwrite=True) - logging.info(f"Data records written to {args.records}.") - - run_collection = tagged_collection + "/run" - ingest_files( - args.repo, - "template_coadd", - run_collection, - table_file=args.records, - data_id=("instrument=LSSTCam",), - transfer="copy", - ) + logging.info("Collecting coadd_depth_table, template_coadd, and template_coadd_n_image refs.") + coadd_depth_table_refs = butler.query_datasets("coadd_depth_table", where=args.where, limit=None) + if not coadd_depth_table_refs: + logging.error("No coadd_depth_table datasets found in the given collections.") + sys.exit(1) - refs = butler.query_datasets( - "template_coadd", collections=run_collection, limit=None - ) - logging.info(f"Associating {len(refs)} datasets to {tagged_collection}.") - butler.registry.associate(tagged_collection, refs) + # Get a list of relavent tracts. + tracts = {item.dataId['tract'] for item in coadd_depth_table_refs} + + # Ammend the where argument to restrict refs to relavent tracts. + tracts_str = ",".join(str(t) for t in tracts) + tract_restriction = f"tract IN ({tracts_str})" + args.where = f"({args.where}) AND ({tract_restriction})" if args.where else tract_restriction + + # Get relavent template_coadd and template_coadd_n_image refs. + coadd_refs = butler.query_datasets("template_coadd", where=args.where, limit=None) + if not coadd_refs: + logging.error("No template_coadd datasets found in the given collections.") + sys.exit(1) + n_image_refs = butler.query_datasets("template_coadd_n_image", where=args.where, limit=None) + if not n_image_refs: + logging.error("No template_coadd_n_image datasets found in the given collections.") + sys.exit(1) + logging.info(f"Found {len(coadd_refs)} template_coadd datasets with coadd_depth_tables " + f"in {args.collections}.") + + # Filter out template_coads that don't meet the cutoff and save them to record. + logging.info("Starting curation.") + accepted_drefs, rejected_drefs, accepted_n_image_refs = make_threshold_cuts(butler, coadd_refs, + n_image_refs, tracts, + args.filter_by, args.cutoff + ) + logging.info(f"Curation complete. Accepted {len(accepted_drefs)} out of {len(coadd_refs)}" + f" template_coadd datasets in {args.collections}.") + + # Run accepted/rejected statistics and save them to record. + logging.info("Starting stat generation.") + stats_records_file = os.path.join(directory, args.stat_records) + run_stats(accepted_drefs, rejected_drefs, tracts, stats_records_file) + logging.info("Stat generation complete. Accepted/rejected stat records written to" + f" {stats_records_file}.") + + # Associate accepted template_coadds and template_coadd_n_images to tagged collection. + logging.info(f"Associating {len(accepted_drefs)} template_coadds and " + f"{len(accepted_n_image_refs)} template_coadd_n_images to {tagged_collection}.") + butler_write.registry.associate(tagged_collection, accepted_drefs) + butler_write.registry.associate(tagged_collection, accepted_n_image_refs) + logging.info("Association complete.") if __name__ == "__main__": diff --git a/bin.src/make_curated_template_collection.py b/bin.src/make_curated_template_collection.py new file mode 100644 index 00000000..b1446736 --- /dev/null +++ b/bin.src/make_curated_template_collection.py @@ -0,0 +1,275 @@ +#!/usr/bin/env python +# This file is part of prompt_processing. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +"""Make a new template_coadd TAGGED and RUN collections with curated template_coadds. + +This script filters and copies a curated set of template_coadd datasets to a new +location in a butler repository, and tags them to a new collection. + +The input collections are ideally collections with only curated templates. + +The scripts leaves an ecsv file used by butler ingest-files. +""" + +import argparse +import logging +import os +import sys + +from astropy.io import ascii +from astropy.table import Table + +from lsst.daf.butler import Butler, CollectionType +from lsst.daf.butler.script import ingest_files + + +def _make_parser(): + parser = argparse.ArgumentParser() + parser.add_argument( + "src_repo", + help="An existing data repository containing the input collections.", + ) + parser.add_argument( + "tag", + help="The Jira ticket number associated with the vetted template collection." + " Will be used to construct the new collection version name.", + ) + parser.add_argument( + "version", + help="A version tag following conventions YYYY-MM-DD.", + ) + parser.add_argument( + "--dest_repo", + default="embargo", + help="The repository to which to copy the datasets." + " Default is embargo.", + ) + parser.add_argument( + "--collections", + action="extend", + nargs="+", + required=True, + help="The input collections containing curated template_coadd datasets.", + ) + parser.add_argument( + "--existing_collection", + action="extend", + nargs="+", + help="The existing template collection. Optional.", + ) + parser.add_argument( + "--dupe_handler", + choices=("drop", "replace"), + default="drop", + help="How to handle duplicates in the new and existing templates." + " Drop will drop the new template and keep the existing template," + " and replace will replace the existing template with the new template.", + ) + parser.add_argument( + "--where", + default="", + help="A string expression to select datasets in the input collections.", + ) + parser.add_argument( + "--reject", + required=False, + default="", + help="An input table file listing data IDs to exclude from the final dataset." + " The table must include 'tract', 'patch', and 'band' columns.", + ) + parser.add_argument( + "--records_path", + required=False, + default="", + help="A filepath to save records to.", + ) + parser.add_argument( + "--records", + required=False, + help="An output table file with records of selected template files." + " The file can be used by butler ingest-files." + " Default is {version}_records.ecsv.", + ) + return parser + + +def make_key(ref): + keys = sorted(ref.datasetType.dimensions.names) + return (ref.datasetType.name, tuple(ref.dataId[k] for k in keys)) + + +def handle_duplicates(butler, old_collections, new_refs, dupe_handler): + """Resolve duplicate template_coadd refs between old and new collections. + + Parameters + ---------- + butler : Butler + Butler instance pointing at the source repo. + old_collections : str or list + Collections to query for old template_coadds. + new_refs : list of DatasetRef + Candidate new refs to deduplicate. + dupe_handler : {"drop", "replace"} + Strategy for resolving duplicates: + - "drop": keep old, drop new. + - "replace": keep new, drop old. + + Returns + ------- + new_ref_list : list of DatasetRef + Deduplicated new refs. + old_ref_list : list of DatasetRef + Deduplicated old refs. + """ + + # Query old refs + old_refs = butler.query_datasets("template_coadd", collections=old_collections, find_first=True) + + new_ref_map = {} + old_ref_map = {} + + if dupe_handler.lower() == "drop": + for ref in new_refs: + new_ref_map[make_key(ref)] = ref + for ref in old_refs: + key = make_key(ref) + old_ref_map[key] = ref + new_ref_map.pop(key, None) # remove duped new ref + logging.info(f"Dropped {len(new_refs)-len(new_ref_map)} duplicates of old coadds.") + + elif dupe_handler.lower() == "replace": + for ref in old_refs: + old_ref_map[make_key(ref)] = ref + for ref in new_refs: + key = make_key(ref) + new_ref_map[key] = ref + old_ref_map.pop(key, None) # remove duped old ref + logging.info(f"Replaced {len(old_refs)-len(old_ref_map)} duplicates with new coadds.") + + else: + raise ValueError(f"Unknown dupe_handler: {dupe_handler!r}. Options are 'drop' and 'replace'.") + + return list(new_ref_map.values()), list(old_ref_map.values()) + + +def main(): + logging.basicConfig(level=logging.INFO, stream=sys.stdout) + + args = _make_parser().parse_args() + butler = Butler(args.src_repo) + butler_write = Butler(args.dest_repo, writeable=True) + + # Create directory for records + directory = args.records_path + if directory: + if not os.path.exists(directory): + os.makedirs(directory) + + # Set records default based on version if not provided + if args.records is None: + args.records = f"{args.version}_records.ecsv" + + # Create tagged collection, abort if it already exists. + tagged_collection = f"LSSTCam/templates/{args.tag}/{args.version}" + registered = butler_write.collections.register( + tagged_collection, type=CollectionType.TAGGED + ) + if not registered: + logging.error(f"Collection {tagged_collection} already exists. Aborting.") + sys.exit(1) + else: + logging.info(f"Collection {tagged_collection} registered.") + + refs = butler.query_datasets( + "template_coadd", collections=args.collections, where=args.where, limit=None + ) + if not refs: + logging.error("No template_coadd datasets found in the given collections.") + sys.exit(1) + logging.info(f"Found {len(refs)} template_coadd datasets in {args.collections}.") + + if args.reject != "": + # Filter out rejected templates from curated collection. + reject_table = ascii.read(args.reject) + rejected_keys = { + (row["tract"], row["patch"], row["band"]) for row in reject_table + } + filtered_refs = [ + ref for ref in refs + if (ref.dataId["tract"], ref.dataId["patch"], ref.dataId["band"]) not in rejected_keys + ] + logging.info(f"Filtered out {len(refs)-len(filtered_refs)} template_coadds " + "that didn't pass vetting.") + else: + filtered_refs = refs + + # Handle duplicates + if args.existing_collection: + filtered_refs, old_refs = handle_duplicates( + butler, args.existing_collection, filtered_refs, args.dupe_handler + ) + else: + logging.info("No existing_collection specified; skipping duplicate handling.") + old_refs = [] + + if len(filtered_refs) == 0: + logging.info("No template_coadds remain after filtering.") + else: + # Generate record of vetted curated templates. + logging.info("Begin generating record of vetted curated templates.") + columns = ("filename", "band", "skymap", "tract", "patch") + data = Table(names=columns, dtype=("str", "str", "str", "int", "int")) + for ref in filtered_refs: + uri_path = butler.getURI(ref).geturl() + data_id_values = tuple(ref.dataId[col] for col in columns[1:]) + data.add_row((uri_path, *data_id_values)) + data.write(args.records, overwrite=True) + logging.info(f"Intermediate data records written to {args.records}.") + + # Copy uris to new location. + logging.info("Begin copying template_coadds to new location.") + run_collection = tagged_collection + "/run" + ingest_files( + args.dest_repo, + "template_coadd", + run_collection, + table_file=args.records, + data_id=("instrument=LSSTCam",), + transfer="copy", + ) + logging.info("Copying complete.") + + # Combine with existing refs and associate to new collection + logging.info("Begin combining new template_coadds with existing template_coadds.") + new_refs = butler.query_datasets("template_coadd", collections=run_collection, limit=None) + combined_refs = new_refs + old_refs + logging.info("Combining complete.") + + logging.info(f"Associating {len(combined_refs)} datasets ({len(new_refs)} new) " + f"to {tagged_collection}.") + butler_write.registry.associate(tagged_collection, combined_refs) + logging.info("Association complete.") + + +if __name__ == "__main__": + main()