From a2b4ab430c45355faf952bed886911a7f584529c Mon Sep 17 00:00:00 2001 From: Gustavo Hidalgo Date: Sat, 10 May 2025 17:49:51 -0400 Subject: [PATCH 01/14] Fix stac_geoparquet export --- datasets/stac-geoparquet/Dockerfile | 46 +- datasets/stac-geoparquet/README.md | 39 +- .../stac-geoparquet/pc_stac_geoparquet.py | 678 ++++++++++++++++-- datasets/stac-geoparquet/requirements.txt | 9 +- datasets/stac-geoparquet/test.ipynb | 434 +++++++++++ datasets/stac-geoparquet/workflow.yaml | 17 +- 6 files changed, 1125 insertions(+), 98 deletions(-) create mode 100644 datasets/stac-geoparquet/test.ipynb diff --git a/datasets/stac-geoparquet/Dockerfile b/datasets/stac-geoparquet/Dockerfile index 7fe782fb..df6f2f87 100644 --- a/datasets/stac-geoparquet/Dockerfile +++ b/datasets/stac-geoparquet/Dockerfile @@ -1,41 +1,23 @@ -FROM ubuntu:20.04 +FROM mcr.microsoft.com/azurelinux/base/python:3.12 # Setup timezone info ENV TZ=UTC ENV LC_ALL=C.UTF-8 ENV LANG=C.UTF-8 +ENV UV_SYSTEM_PYTHON=TRUE RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone -RUN apt-get update && apt-get install -y software-properties-common +RUN tdnf install build-essential jq unzip ca-certificates awk wget curl git azure-cli -y \ + && tdnf clean all -RUN add-apt-repository ppa:ubuntugis/ppa && \ - apt-get update && \ - apt-get install -y build-essential python3-dev python3-pip \ - jq unzip ca-certificates wget curl git && \ - apt-get autoremove && apt-get autoclean && apt-get clean - -RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 10 - -# See https://github.com/mapbox/rasterio/issues/1289 -ENV CURL_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt - -# Install Python 3.11 -RUN curl -L -O "https://github.com/conda-forge/miniforge/releases/latest/download/Mambaforge-$(uname)-$(uname -m).sh" \ - && bash "Mambaforge-$(uname)-$(uname -m).sh" -b -p /opt/conda \ - && rm -rf "Mambaforge-$(uname)-$(uname -m).sh" - -ENV PATH /opt/conda/bin:$PATH -ENV LD_LIBRARY_PATH /opt/conda/lib/:$LD_LIBRARY_PATH - -RUN mamba install -y -c conda-forge python=3.11 gdal pip setuptools cython numpy - -RUN python -m pip install --upgrade pip +# RUN python3 -m pip install --upgrade pip +RUN pip install --upgrade uv # Install common packages COPY requirements-task-base.txt /tmp/requirements.txt -RUN python -m pip install --no-build-isolation -r /tmp/requirements.txt +RUN uv pip install --no-build-isolation -r /tmp/requirements.txt # # Copy and install packages @@ -43,30 +25,30 @@ RUN python -m pip install --no-build-isolation -r /tmp/requirements.txt COPY pctasks/core /opt/src/pctasks/core RUN cd /opt/src/pctasks/core && \ - pip install . + uv pip install . COPY pctasks/cli /opt/src/pctasks/cli RUN cd /opt/src/pctasks/cli && \ - pip install . + uv pip install . COPY pctasks/task /opt/src/pctasks/task RUN cd /opt/src/pctasks/task && \ - pip install . + uv pip install . COPY pctasks/client /opt/src/pctasks/client RUN cd /opt/src/pctasks/client && \ - pip install . + uv pip install . # COPY pctasks/ingest /opt/src/pctasks/ingest # RUN cd /opt/src/pctasks/ingest && \ -# pip install . +# uv pip install . # COPY pctasks/dataset /opt/src/pctasks/dataset # RUN cd /opt/src/pctasks/dataset && \ -# pip install . +# uv pip install . COPY datasets/stac-geoparquet /opt/src/datasets/stac-geoparquet -RUN python3 -m pip install -r /opt/src/datasets/stac-geoparquet/requirements.txt +RUN uv pip install -r /opt/src/datasets/stac-geoparquet/requirements.txt # Setup Python Path to allow import of test modules ENV PYTHONPATH=/opt/src:$PYTHONPATH diff --git a/datasets/stac-geoparquet/README.md b/datasets/stac-geoparquet/README.md index ef293a83..7f385719 100644 --- a/datasets/stac-geoparquet/README.md +++ b/datasets/stac-geoparquet/README.md @@ -4,20 +4,29 @@ Generates the `stac-geoparquet` collection-level assets for the [Planetary Compu ## Container Images +Test the build with; ```shell -$ az acr build -r pccomponents -t pctasks-stac-geoparquet:latest -t pctasks-stac-geoparquet:2023.7.10.0 -f datasets/stac-geoparquet/Dockerfile . +docker build -t stac-geoparquet -f datasets/stac-geoparquet/Dockerfile . +``` + +Then publish to the ACR with: + +```shell +az acr build -r pccomponents -t pctasks-stac-geoparquet:latest -t pctasks-stac-geoparquet:2023.7.10.0 -f datasets/stac-geoparquet/Dockerfile . ``` ## Permissions This requires the following permissions -* Storage Data Table Reader on the config tables (`pcapi/bluecollectoinconfig`, `pcapi/greencollectionconfig`) +* Storage Data Table Reader on the config tables (`pcapi/bluecollectionconfig`, `pcapi/greencollectionconfig`) * Storage Blob Data Contributor on the `pcstacitems` container. ## Arguments + By default, this workflow will generate geoparquet assets for all collections. If you want to select a subset of collections, you can use either: + 1. `extra_skip`: This will skip certain collections 1. `collections`: This will only generate geoparquet for the specified collection(s). @@ -25,6 +34,28 @@ If you want to select a subset of collections, you can use either: The workflow used for updates was registered with +```shell +pctasks workflow update datasets/stac-geoparquet/workflow.yaml +``` + +It can be manually invoked with: + +```shell +pctasks workflow submit stac-geoparquet ``` -pctasks workflow update datasets/workflows/stac-geoparquet.yaml -``` \ No newline at end of file + +## Run Locally + +You can debug the geoparquet export locally like this: + +```shell +export STAC_GEOPARQUET_CONNECTION_INFO="secret" +export STAC_GEOPARQUET_TABLE_NAME="greencollectionconfig" +export STAC_GEOPARQUET_TABLE_ACCOUNT_URL="https://pcapi.table.core.windows.net" +export STAC_GEOPARQUET_STORAGE_OPTIONS_ACCOUNT_NAME="pcstacitems" + +python3 pc_stac_geoparquet.py --collection hls2-l30 +``` + +Apart from the Postgres connection string, you will need PIM activations for +`Storage Blob Data Contributor` to be able to write to the production storage account. diff --git a/datasets/stac-geoparquet/pc_stac_geoparquet.py b/datasets/stac-geoparquet/pc_stac_geoparquet.py index c9924cf3..0ab780b2 100644 --- a/datasets/stac-geoparquet/pc_stac_geoparquet.py +++ b/datasets/stac-geoparquet/pc_stac_geoparquet.py @@ -1,27 +1,514 @@ from __future__ import annotations import argparse +import collections.abc +import dataclasses +import datetime +import hashlib +import itertools +import json import logging import os -from typing import Union, Set +import time +import urllib +from typing import Any, Generator, Set, Union import azure.core.credentials import azure.data.tables import azure.identity -from stac_geoparquet import pc_runner -from pctasks.task.task import Task +import dateutil +import fsspec +import pandas as pd +import psycopg +import pystac +import requests +from stac_geoparquet.arrow import parse_stac_items_to_arrow, to_parquet +from stac_geoparquet.pgstac_reader import pgstac_to_iter + from pctasks.core.models.base import PCBaseModel from pctasks.core.models.task import FailedTaskResult, WaitTaskResult from pctasks.task.context import TaskContext - +from pctasks.task.task import Task +import tqdm.auto handler = logging.StreamHandler() handler.setFormatter(logging.Formatter("[%(levelname)s]:%(asctime)s: %(message)s")) -handler.setLevel(logging.INFO) +handler.setLevel(logging.DEBUG) logger = logging.getLogger(__name__) logger.addHandler(handler) -logger.setLevel(logging.INFO) +logger.setLevel(logging.DEBUG) + + +PARTITION_FREQUENCIES = { + "3dep-lidar-classification": "YS", + "3dep-lidar-copc": "YS", + "3dep-lidar-dsm": "YS", + "3dep-lidar-dtm": "YS", + "3dep-lidar-dtm-native": "YS", + "3dep-lidar-hag": "YS", + "3dep-lidar-intensity": "YS", + "3dep-lidar-pointsourceid": "YS", + "3dep-lidar-returns": "YS", + "3dep-seamless": None, + "alos-dem": None, + "alos-fnf-mosaic": "YS", + "alos-palsar-mosaic": "YS", + "aster-l1t": "YS", + "chloris-biomass": None, + "cil-gdpcir-cc-by": None, + "cil-gdpcir-cc-by-sa": None, + "cil-gdpcir-cc0": None, + "cop-dem-glo-30": None, + "cop-dem-glo-90": None, + "eclipse": None, + "ecmwf-forecast": "MS", + "era5-pds": None, + "esa-worldcover": None, + "fia": None, + "gap": None, + "gbif": None, + "gnatsgo-rasters": None, + "gnatsgo-tables": None, + "goes-cmi": "W-MON", + "hrea": None, + "io-lulc": None, + "io-lulc-9-class": None, + "jrc-gsw": None, + "landsat-c2-l1": "MS", + "landsat-c2-l2": "MS", + "mobi": None, + "modis-09A1-061": "MS", + "modis-09Q1-061": "MS", + "modis-10A1-061": "MS", + "modis-10A2-061": "MS", + "modis-11A1-061": "MS", + "modis-11A2-061": "MS", + "modis-13A1-061": "MS", + "modis-13Q1-061": "MS", + "modis-14A1-061": "MS", + "modis-14A2-061": "MS", + "modis-15A2H-061": "MS", + "modis-15A3H-061": "MS", + "modis-16A3GF-061": "MS", + "modis-17A2H-061": "MS", + "modis-17A2HGF-061": "MS", + "modis-17A3HGF-061": "MS", + "modis-21A2-061": "MS", + "modis-43A4-061": "MS", + "modis-64A1-061": "MS", + "mtbs": None, + "naip": "YS", + "nasa-nex-gddp-cmip6": None, + "nasadem": None, + "noaa-c-cap": None, + "nrcan-landcover": None, + "planet-nicfi-analytic": "YS", + "planet-nicfi-visual": "YS", + "sentinel-1-grd": "MS", + "sentinel-1-rtc": "MS", + "sentinel-2-l2a": "W-MON", + "us-census": None, +} + +SKIP = { + "daymet-daily-na", + "daymet-daily-pr", + "daymet-daily-hi", + "daymet-monthly-na", + "daymet-monthly-pr", + "daymet-monthly-hi", + "daymet-annual-na", + "daymet-annual-pr", + "daymet-annual-hi", + "terraclimate", + "gridmet", + "landsat-8-c2-l2", + "gpm-imerg-hhr", + "deltares-floods", + "goes-mcmip", + # errors + "cil-gdpcir-cc0", + "3dep-lidar-intensity", + "cil-gdpcir-cc-by", + "ecmwf-forecast", + "3dep-lidar-copc", + "era5-pds", + "3dep-lidar-classification", + "3dep-lidar-dtm-native", + "cil-gdpcir-cc-by-sa", +} + +def _pairwise( + iterable: collections.abc.Iterable, +) -> Any: + # pairwise('ABCDEFG') --> AB BC CD DE EF FG + a, b = itertools.tee(iterable) + next(b, None) + return zip(a, b) + +def query_collection_partitions_updated_after(conninfo: str, updated_after: datetime.datetime | None = None) -> Generator[Partition, None, None]: + with psycopg.connect(conninfo) as conn: + with conn.cursor(row_factory=psycopg.rows.class_row(Partition)) as cur: + q = """ + SELECT + collection, + CASE WHEN lower(partition_dtrange) = '-infinity' OR upper(partition_dtrange) = 'infinity' THEN + 'items.parquet' + ELSE + format( + 'items_%%s_%%s.parquet', + to_char(lower(partition_dtrange),'YYYYMMDD'), + to_char(upper(partition_dtrange),'YYYYMMDD') + ) + END AS partition, + lower(dtrange) as start, + upper(dtrange) as end, + last_updated + FROM partitions_view + """ + args = () + if updated_after is not None: + q += " WHERE last_updated >= %s" + args = (updated_after,) + q += " ORDER BY last_updated asc" + cur.execute(q, args) + for row in cur: + yield row + +def _build_output_path( + base_output_path: str, + part_number: int | None, + total: int | None, + start_datetime: datetime.datetime, + end_datetime: datetime.datetime, +) -> str: + a, b = start_datetime, end_datetime + base_output_path = base_output_path.rstrip("/") + + if part_number is not None and total is not None: + output_path = ( + f"{base_output_path}/part-{part_number:0{len(str(total * 10))}}_" + f"{a.isoformat()}_{b.isoformat()}.parquet" + ) + else: + token = hashlib.md5( + "".join([a.isoformat(), b.isoformat()]).encode() + ).hexdigest() + output_path = ( + f"{base_output_path}/part-{token}_{a.isoformat()}_{b.isoformat()}.parquet" + ) + return output_path + +@dataclasses.dataclass +class Partition: + collection: str + partition: str + start: datetime.datetime + end: datetime.datetime + last_updated: datetime.datetime + +def inject_links(item: dict[str, Any]) -> dict[str, Any]: + item["links"] = [ + { + "rel": "collection", + "type": "application/json", + "href": f"https://planetarycomputer.microsoft.com/api/stac/v1/collections/{item['collection']}", # noqa: E501 + }, + { + "rel": "parent", + "type": "application/json", + "href": f"https://planetarycomputer.microsoft.com/api/stac/v1/collections/{item['collection']}", # noqa: E501 + }, + { + "rel": "root", + "type": "application/json", + "href": "https://planetarycomputer.microsoft.com/api/stac/v1/", + }, + { + "rel": "self", + "type": "application/geo+json", + "href": f"https://planetarycomputer.microsoft.com/api/stac/v1/collections/{item['collection']}/items/{item['id']}", # noqa: E501 + }, + { + "rel": "preview", + "href": f"https://planetarycomputer.microsoft.com/api/data/v1/item/map?collection={item['collection']}&item={item['id']}", # noqa: E501 + "title": "Map of item", + "type": "text/html", + }, + ] + return item + + +def inject_assets(item: dict[str, Any], render_config: str | None) -> dict[str, Any]: + item["assets"]["tilejson"] = { + "href": ( + "https://planetarycomputer.microsoft.com/api/data/v1/item/tilejson.json?" + f"collection={item['collection']}" + f"&item={item['id']}&{render_config}" + ), + "roles": ["tiles"], + "title": "TileJSON with default rendering", + "type": "application/json", + } + item["assets"]["rendered_preview"] = { + "href": ( + "https://planetarycomputer.microsoft.com/api/data/v1/item/preview.png?" + f"collection={item['collection']}" + f"&item={item['id']}&{render_config}" + ), + "rel": "preview", + "roles": ["overview"], + "title": "Rendered preview", + "type": "image/png", + } + return item + +@dataclasses.dataclass +class CollectionConfig: + """ + Additional collection-based configuration to inject, matching the + dynamic properties from the API. + """ + + collection_id: str + partition_frequency: str | None = None + stac_api: str = "https://planetarycomputer.microsoft.com/api/stac/v1" + should_inject_dynamic_properties: bool = True + render_config: str | None = None + + def __post_init__(self) -> None: + self._collection: pystac.Collection | None = None + + @property + def collection(self) -> pystac.Collection: + if self._collection is None: + self._collection = pystac.read_file( + f"{self.stac_api}/collections/{self.collection_id}" + ) # type: ignore + assert self._collection is not None + return self._collection + + def generate_endpoints( + self, since: datetime.datetime | None = None + ) -> list[tuple[datetime.datetime, datetime.datetime]]: + if self.partition_frequency is None: + raise ValueError("Set partition_frequency") + + start_datetime, end_datetime = self.collection.extent.temporal.intervals[0] + + # https://github.com/dateutil/dateutil/issues/349 + if start_datetime and start_datetime.tzinfo == dateutil.tz.tz.tzlocal(): + start_datetime = start_datetime.astimezone(datetime.timezone.utc) + + if end_datetime and end_datetime.tzinfo == dateutil.tz.tz.tzlocal(): + end_datetime = end_datetime.astimezone(datetime.timezone.utc) + + if end_datetime is None: + end_datetime = pd.Timestamp.utcnow() + + # we need to ensure that the `end_datetime` is past the end of the last partition + # to avoid missing out on the last partition of data. + offset = pd.tseries.frequencies.to_offset(self.partition_frequency) + + if not offset.is_on_offset(start_datetime): + start_datetime = start_datetime - offset + + if not offset.is_on_offset(end_datetime): + end_datetime = end_datetime + offset + + idx = pd.date_range(start_datetime, end_datetime, freq=self.partition_frequency) + + if since: + idx = idx[idx >= since] + + pairs = _pairwise(idx) + return list(pairs) + + def export_partition( + self, + conninfo: str, + output_protocol: str, + output_path: str, + partition: Partition | None = None, + storage_options: dict[str, Any] | None = None, + rewrite: bool = False, + skip_empty_partitions: bool = False, + ) -> str | None: + fs = fsspec.filesystem(output_protocol, **storage_options) + if fs.exists(output_path) and not rewrite: + logger.debug("Path %s already exists.", output_path) + return output_path + if partition is not None: + items = pgstac_to_iter( + conninfo, collection=self.collection_id, start_datetime=partition.start, end_datetime=partition.end + ) + else: + items = pgstac_to_iter( + conninfo, collection=self.collection_id + ) + items = map(lambda i: inject_assets(inject_links(i), self.render_config), items) + arrow = parse_stac_items_to_arrow(items) + to_parquet(arrow, output_path, filesystem=fs) + return output_path + + def export_partition_for_endpoints( + self, + endpoints: tuple[datetime.datetime, datetime.datetime], + conninfo: str, + output_protocol: str, + output_path: str, + storage_options: dict[str, Any], + part_number: int | None = None, + total: int | None = None, + rewrite: bool = False, + skip_empty_partitions: bool = False, + ) -> str | None: + """ + Export results for a pair of endpoints. + """ + start, end = endpoints + partition_path = _build_output_path(output_path, part_number, total, start, end) + p = Partition( + collection=self.collection_id, + partition=partition_path, + start=start, + end=end, + last_updated=datetime.datetime.now(), + ) + return self.export_partition( + conninfo, + output_protocol, + partition_path, + partition=p, + storage_options=storage_options, + rewrite=rewrite, + skip_empty_partitions=skip_empty_partitions, + ) + + def export_exists( + self, + output_protocol: str, + output_path: str, + storage_options: dict[str, Any], + ) -> bool: + fs = fsspec.filesystem(output_protocol, **storage_options) + if output_protocol: + output_path = f"{output_protocol}://{output_path}" + return fs.exists(output_path) + + def export_collection( + self, + conninfo: str, + output_protocol: str, + output_path: str, + storage_options: dict[str, Any], + rewrite: bool = False, + skip_empty_partitions: bool = False, + ) -> list[str | None]: + + if not self.partition_frequency: + logger.info("Exporting single-partition collection %s", self.collection_id) + + results = [ + self.export_partition( + conninfo, + output_protocol, + output_path, + storage_options=storage_options) + ] + + else: + endpoints = self.generate_endpoints() + total = len(endpoints) + logger.info( + "Exporting %d partitions for collection %s", total, self.collection_id + ) + + results = [] + for i, endpoint in tqdm.auto.tqdm(enumerate(endpoints), total=total): + results.append( + self.export_partition_for_endpoints( + endpoints=endpoint, + conninfo=conninfo, + output_protocol=output_protocol, + output_path=output_path, + storage_options=storage_options, + rewrite=rewrite, + skip_empty_partitions=skip_empty_partitions, + part_number=i, + total=total, + ) + ) + + return results + +def build_render_config(render_params: dict[str, Any], assets: dict[str, Any]) -> str: + flat = [] + if assets: + for asset in assets: + flat.append(("assets", asset)) + + for k, v in render_params.items(): + if isinstance(v, list): + flat.extend([(k, v2) for v2 in v]) + else: + flat.append((k, v)) + return urllib.parse.urlencode(flat) + + +def generate_configs_from_storage_table( + table_client: azure.data.tables.TableClient, +) -> dict[str, CollectionConfig]: + configs = {} + for entity in table_client.list_entities(): + collection_id = entity["RowKey"] + data = json.loads(entity["Data"]) + + render_params = data["render_config"]["render_params"] + assets = data["render_config"]["assets"] + render_config = build_render_config(render_params, assets) + configs[collection_id] = CollectionConfig( + collection_id, render_config=render_config + ) + + return configs + + +def generate_configs_from_api(url: str) -> dict[str, CollectionConfig]: + configs = {} + r = requests.get(url) + r.raise_for_status() + + for collection in r.json()["collections"]: + partition_frequency = ( + collection["assets"] + .get("geoparquet-items", {}) + .get("msft:partition_info", {}) + .get("partition_frequency", None) + ) + + configs[collection["id"]] = CollectionConfig( + collection["id"], partition_frequency=partition_frequency + ) + + return configs + + +def merge_configs( + table_configs: dict[str, CollectionConfig], api_configs: dict[str, CollectionConfig] +) -> dict[str, CollectionConfig]: + # what a mess. Get partitioning config from the API, render from the table. + configs = {} + for k in table_configs.keys() | api_configs.keys(): + table_config = table_configs.get(k) + api_config = api_configs.get(k) + config = table_config or api_config + assert config + if api_config: + config.partition_frequency = api_config.partition_frequency + configs[k] = config + return configs class StacGeoparquetTaskInput(PCBaseModel): @@ -40,7 +527,6 @@ class StacGeoparquetTaskInput(PCBaseModel): class StacGeoparquetTaskOutput(PCBaseModel): n_failures: int - class StacGeoparquetTask(Task[StacGeoparquetTaskInput, StacGeoparquetTaskOutput]): _input_model = StacGeoparquetTaskInput _output_model = StacGeoparquetTaskOutput @@ -51,6 +537,7 @@ def get_required_environment_variables(self) -> list[str]: def run( self, input: StacGeoparquetTaskInput, context: TaskContext ) -> Union[StacGeoparquetTaskOutput, WaitTaskResult, FailedTaskResult]: + result = run( output_protocol=input.output_protocol, connection_info=input.connection_info, @@ -65,37 +552,7 @@ def run( return StacGeoparquetTaskOutput(n_failures=result) -SKIP = { - "daymet-daily-na", - "daymet-daily-pr", - "daymet-daily-hi", - "daymet-monthly-na", - "daymet-monthly-pr", - "daymet-monthly-hi", - "daymet-annual-na", - "daymet-annual-pr", - "daymet-annual-hi", - "terraclimate", - "gridmet", - "landsat-8-c2-l2", - "gpm-imerg-hhr", - "deltares-floods", - "goes-mcmip", - # errors - "cil-gdpcir-cc0", - "3dep-lidar-intensity", - "cil-gdpcir-cc-by", - "ecmwf-forecast", - "3dep-lidar-copc", - "era5-pds", - "3dep-lidar-classification", - "3dep-lidar-dtm-native", - "cil-gdpcir-cc-by-sa", -} - - -def run( - output_protocol: str = "abfs", +def list_planetary_computer_collection_configs( connection_info: str | None = None, table_credential: ( str @@ -111,8 +568,7 @@ def run( ) = None, extra_skip: Set[str] | None = None, collections: str | Set[str] | None = None, -) -> int: - # handle the arguments +) -> dict[str, CollectionConfig]: try: connection_info = ( connection_info or os.environ["STAC_GEOPARQUET_CONNECTION_INFO"] @@ -149,7 +605,8 @@ def run( table_name, credential=table_credential, ) - configs = pc_runner.get_configs(table_client) + logger.info(f"Connecting to table {table_name} at {table_account_url}") + configs = get_configs(table_client) if collections is None: configs = {k: v for k, v in configs.items() if k not in skip} @@ -158,28 +615,112 @@ def run( elif isinstance(collections, set): configs = {k: v for k, v in configs.items() if k in collections} + return configs + +def get_configs(table_client: azure.data.tables.TableClient) -> dict[str, CollectionConfig]: + table_configs = generate_configs_from_storage_table(table_client) + api_configs = generate_configs_from_api( + "https://planetarycomputer.microsoft.com/api/stac/v1/collections" + ) + configs = merge_configs(table_configs, api_configs) + for k, v in configs.items(): + if v.partition_frequency is None: + v.partition_frequency = PARTITION_FREQUENCIES.get(k) + + return configs + +def run( + output_protocol: str = "abfs", + connection_info: str | None = None, + table_credential: ( + str + | None + | azure.core.credentials.TokenCredential + | azure.core.credentials.AzureSasCredential + ) = None, + table_name: str | None = None, + table_account_url: str | None = None, + storage_options_account_name: str | None = None, + storage_options_credential: ( + str | None | azure.core.credentials.TokenCredential + ) = None, + extra_skip: Set[str] | None = None, + collections: str | Set[str] | None = None, + configs: dict[str, CollectionConfig] | None = None, +) -> int: + if configs is None: + configs = list_planetary_computer_collection_configs( + connection_info=connection_info, + table_credential=table_credential, + table_name=table_name, + table_account_url=table_account_url, + storage_options_account_name=storage_options_account_name, + storage_options_credential=storage_options_credential, + extra_skip=extra_skip, + collections=collections, + ) + try: + connection_info = ( + connection_info or os.environ["STAC_GEOPARQUET_CONNECTION_INFO"] + ) + except KeyError as e: + raise KeyError( + "STAC_GEOPARQUET_CONNECTION_INFO must be set if not explicitly provided" + ) from e + table_credential = table_credential or os.environ.get( + "STAC_GEOPARQUET_TABLE_CREDENTIAL", azure.identity.DefaultAzureCredential() + ) + assert table_credential is not None + table_name = table_name or os.environ["STAC_GEOPARQUET_TABLE_NAME"] + table_account_url = ( + table_account_url or os.environ["STAC_GEOPARQUET_TABLE_ACCOUNT_URL"] + ) + storage_options_account_name = ( + storage_options_account_name + or os.environ["STAC_GEOPARQUET_STORAGE_OPTIONS_ACCOUNT_NAME"] + ) + storage_options_credential = storage_options_credential or os.environ.get( + "STAC_GEOPARQUET_STORAGE_OPTIONS_CREDENTIAL", + azure.identity.DefaultAzureCredential(), + ) + storage_options = { "account_name": storage_options_account_name, "credential": storage_options_credential, } - def f(config: pc_runner.CollectionConfig) -> None: - config.export_collection( - connection_info, - output_protocol, - f"items/{config.collection_id}.parquet", - storage_options, - skip_empty_partitions=True, - ) - N = len(configs) success = [] failure = [] + one_year_ago = datetime.datetime.now() - datetime.timedelta(days=365) + collection_partitions = list(query_collection_partitions_updated_after(conninfo=connection_info, updated_after=one_year_ago)) + recent_collection_updates: dict[str, list[Partition]] = {} + for partition in collection_partitions: + recent_collection_updates.setdefault(partition.collection, []).append(partition) + logger.info(f"Found {len(collection_partitions)} partitions updated after {one_year_ago}") + for i, config in enumerate(configs.values(), 1): - logger.info(f"processing {config.collection_id} [{i}/{N}]") + output_path=f"items/{config.collection_id}.parquet" + if config.export_exists( + output_protocol=output_protocol, + output_path=output_path, + storage_options=storage_options, + ) and (config.collection_id not in recent_collection_updates): + logger.info(f"Existing collection export for {config.collection_id} has no updates since {one_year_ago}") + continue + logger.info(f"Processing {config.collection_id} [{i}/{N}]") try: - f(config) + t0 = time.monotonic() + config.export_collection( + connection_info, + output_protocol, + output_path, + storage_options, + skip_empty_partitions=True, + ) + t1 = time.monotonic() + logger.info(f"Completed {config.collection_id} [{i}/{N}] in {t1-t0:.2f}s") except Exception as e: failure.append((config.collection_id, e)) logger.exception(f"Failed processing {config.collection_id}") @@ -187,3 +728,36 @@ def f(config: pc_runner.CollectionConfig) -> None: success.append(config.collection_id) return len(failure) + +if __name__ == "__main__": + # Remove all handlers associated with the root logger object. + for h in logging.root.handlers[:]: + logging.root.removeHandler(h) + # Set up logging only for this file and stac_geoparquet package + logging.basicConfig(handlers=[handler], level=logging.DEBUG, force=True) + logging.getLogger().setLevel(logging.WARNING) + logger.setLevel(logging.DEBUG) + logging.getLogger("stac_geoparquet").setLevel(logging.DEBUG) + parser = argparse.ArgumentParser(description="Export STAC collection to GeoParquet.") + parser.add_argument( + "--collection", + type=str, + required=False, + help="The collection ID to export." + ) + args = parser.parse_args() + configs = list_planetary_computer_collection_configs( + connection_info=os.environ["STAC_GEOPARQUET_CONNECTION_INFO"], + table_credential=azure.identity.DefaultAzureCredential(), + table_name=os.environ["STAC_GEOPARQUET_TABLE_NAME"], + table_account_url=os.environ["STAC_GEOPARQUET_TABLE_ACCOUNT_URL"], + storage_options_account_name=os.environ["STAC_GEOPARQUET_STORAGE_OPTIONS_ACCOUNT_NAME"], + storage_options_credential=azure.identity.DefaultAzureCredential(), + extra_skip=SKIP, + collections=args.collection, + ) + n_failures = run(collections=args.collection, configs=configs) + if n_failures == 0: + logger.info("Export completed successfully.") + else: + logger.error(f"Export completed with {n_failures} failures.") diff --git a/datasets/stac-geoparquet/requirements.txt b/datasets/stac-geoparquet/requirements.txt index 1c53526b..9ee4f0b3 100644 --- a/datasets/stac-geoparquet/requirements.txt +++ b/datasets/stac-geoparquet/requirements.txt @@ -1,4 +1,5 @@ -stac-geoparquet[pgstac,pc]==0.2.1 -psycopg[binary,pool]==3.1.8 -azure-data-tables==12.4.2 -pypgstac==0.7.4 \ No newline at end of file +# stac-geoparquet[pgstac,pc]==0.6.0 +git+https://github.com/stac-utils/stac-geoparquet.git@30aa59fde4d2df30b1011340b41af2419887916c#egg=stac-geoparquet[pgstac,pc] +psycopg[binary,pool]==3.2.6 +azure-data-tables==12.5.0 +pypgstac==0.8.6 \ No newline at end of file diff --git a/datasets/stac-geoparquet/test.ipynb b/datasets/stac-geoparquet/test.ipynb new file mode 100644 index 00000000..4c6e4667 --- /dev/null +++ b/datasets/stac-geoparquet/test.ipynb @@ -0,0 +1,434 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "89454fb1", + "metadata": {}, + "source": [ + "# Test Notebook\n", + "Use this to simulate use " + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "1d4b6b68", + "metadata": {}, + "outputs": [], + "source": [ + "import geopandas\n", + "from azure.identity import DefaultAzureCredential\n", + "\n", + "account = \"guhidalgogclite\"\n", + "container = \"items\"\n", + "storage_options = {\n", + " \"account_name\": account,\n", + " \"credential\": DefaultAzureCredential()\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "3404f46e", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
assetscollectiongeometryidlinksstac_extensionsstac_versiontypedatetimegsdnaip:statenaip:yearproj:bboxproj:epsgproj:shapeproj:transform
0{'image': {'eo:bands': [{'common_name': 'red',...naipPOLYGON ((-99.93345 34.93482, -99.93423 35.003...ok_m_3409901_nw_14_1_20100425[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/eo/v1.0.0/s...1.0.0Feature2010-04-25 00:00:00+00:001.0ok2010[408377.0, 3866212.0, 414752.0, 3873800.0]26914[7588, 6375][1.0, 0.0, 408377.0, 0.0, -1.0, 3873800.0, 0.0...
1{'image': {'eo:bands': [{'common_name': 'red',...naipPOLYGON ((-99.871 34.87231, -99.87173 34.9407,...ok_m_3409901_se_14_1_20100425[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/eo/v1.0.0/s...1.0.0Feature2010-04-25 00:00:00+00:001.0ok2010[414020.0, 3859229.0, 420395.0, 3866814.0]26914[7585, 6375][1.0, 0.0, 414020.0, 0.0, -1.0, 3866814.0, 0.0...
2{'image': {'eo:bands': [{'common_name': 'red',...naipPOLYGON ((-99.93346 34.87232, -99.93423 34.940...ok_m_3409901_sw_14_1_20100425[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/eo/v1.0.0/s...1.0.0Feature2010-04-25 00:00:00+00:001.0ok2010[408308.0, 3859281.0, 414687.0, 3866869.0]26914[7588, 6379][1.0, 0.0, 408308.0, 0.0, -1.0, 3866869.0, 0.0...
3{'image': {'eo:bands': [{'common_name': 'red',...naipPOLYGON ((-99.74611 34.9348, -99.74672 35.0031...ok_m_3409902_ne_14_1_20100425[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/eo/v1.0.0/s...1.0.0Feature2010-04-25 00:00:00+00:001.0ok2010[425500.0, 3866067.0, 431862.0, 3873645.0]26914[7578, 6362][1.0, 0.0, 425500.0, 0.0, -1.0, 3873645.0, 0.0...
4{'image': {'eo:bands': [{'common_name': 'red',...naipPOLYGON ((-99.80855 34.93481, -99.80922 35.003...ok_m_3409902_nw_14_1_20100425[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/eo/v1.0.0/s...1.0.0Feature2010-04-25 00:00:00+00:001.0ok2010[419793.0, 3866112.0, 426159.0, 3873693.0]26914[7581, 6366][1.0, 0.0, 419793.0, 0.0, -1.0, 3873693.0, 0.0...
...................................................
235639{'image': {'eo:bands': [{'common_name': 'red',...naipPOLYGON ((-104.30931 29.49682, -104.30884 29.5...tx_m_2910430_sw_13_1_20120917[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/eo/v1.0.0/s...1.0.0Feature2012-09-17 00:00:00+00:001.0tx2012[560230.0, 3263230.0, 566950.0, 3270820.0]26913[7590, 6720][1.0, 0.0, 560230.0, 0.0, -1.0, 3270820.0, 0.0...
235640{'image': {'eo:bands': [{'common_name': 'red',...naipPOLYGON ((-104.18429 29.55929, -104.18374 29.6...tx_m_2910431_nw_13_1_20120917[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/eo/v1.0.0/s...1.0.0Feature2012-09-17 00:00:00+00:001.0tx2012[572290.0, 3270230.0, 579020.0, 3277820.0]26913[7590, 6730][1.0, 0.0, 572290.0, 0.0, -1.0, 3277820.0, 0.0...
235641{'image': {'eo:bands': [{'common_name': 'red',...naipPOLYGON ((-103.99682 29.30919, -103.99615 29.3...tx_m_2910448_ne_13_1_20120917[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/eo/v1.0.0/s...1.0.0Feature2012-09-17 00:00:00+00:001.0tx2012[590670.0, 3242660.0, 597420.0, 3250260.0]26913[7600, 6750][1.0, 0.0, 590670.0, 0.0, -1.0, 3250260.0, 0.0...
235642{'image': {'eo:bands': [{'common_name': 'red',...naipPOLYGON ((-82.49543 35.93484, -82.49672 36.003...tn_m_3508204_ne_17_1_20120923_20121015[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/eo/v1.0.0/s...1.0.0Feature2012-09-23 00:00:00+00:001.0tn2012[358759.0, 3977754.0, 365105.0, 3985378.0]26917[7624, 6346][1.0, 0.0, 358759.0, 0.0, -1.0, 3985378.0, 0.0...
235643{'image': {'eo:bands': [{'common_name': 'red',...naipPOLYGON ((-82.55787 35.93484, -82.55922 36.003...tn_m_3508204_nw_17_1_20120923_20121015[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/eo/v1.0.0/s...1.0.0Feature2012-09-23 00:00:00+00:001.0tn2012[353120.0, 3977843.0, 359472.0, 3985470.0]26917[7627, 6352][1.0, 0.0, 353120.0, 0.0, -1.0, 3985470.0, 0.0...
\n", + "

235644 rows × 16 columns

\n", + "
" + ], + "text/plain": [ + " assets collection \\\n", + "0 {'image': {'eo:bands': [{'common_name': 'red',... naip \n", + "1 {'image': {'eo:bands': [{'common_name': 'red',... naip \n", + "2 {'image': {'eo:bands': [{'common_name': 'red',... naip \n", + "3 {'image': {'eo:bands': [{'common_name': 'red',... naip \n", + "4 {'image': {'eo:bands': [{'common_name': 'red',... naip \n", + "... ... ... \n", + "235639 {'image': {'eo:bands': [{'common_name': 'red',... naip \n", + "235640 {'image': {'eo:bands': [{'common_name': 'red',... naip \n", + "235641 {'image': {'eo:bands': [{'common_name': 'red',... naip \n", + "235642 {'image': {'eo:bands': [{'common_name': 'red',... naip \n", + "235643 {'image': {'eo:bands': [{'common_name': 'red',... naip \n", + "\n", + " geometry \\\n", + "0 POLYGON ((-99.93345 34.93482, -99.93423 35.003... \n", + "1 POLYGON ((-99.871 34.87231, -99.87173 34.9407,... \n", + "2 POLYGON ((-99.93346 34.87232, -99.93423 34.940... \n", + "3 POLYGON ((-99.74611 34.9348, -99.74672 35.0031... \n", + "4 POLYGON ((-99.80855 34.93481, -99.80922 35.003... \n", + "... ... \n", + "235639 POLYGON ((-104.30931 29.49682, -104.30884 29.5... \n", + "235640 POLYGON ((-104.18429 29.55929, -104.18374 29.6... \n", + "235641 POLYGON ((-103.99682 29.30919, -103.99615 29.3... \n", + "235642 POLYGON ((-82.49543 35.93484, -82.49672 36.003... \n", + "235643 POLYGON ((-82.55787 35.93484, -82.55922 36.003... \n", + "\n", + " id \\\n", + "0 ok_m_3409901_nw_14_1_20100425 \n", + "1 ok_m_3409901_se_14_1_20100425 \n", + "2 ok_m_3409901_sw_14_1_20100425 \n", + "3 ok_m_3409902_ne_14_1_20100425 \n", + "4 ok_m_3409902_nw_14_1_20100425 \n", + "... ... \n", + "235639 tx_m_2910430_sw_13_1_20120917 \n", + "235640 tx_m_2910431_nw_13_1_20120917 \n", + "235641 tx_m_2910448_ne_13_1_20120917 \n", + "235642 tn_m_3508204_ne_17_1_20120923_20121015 \n", + "235643 tn_m_3508204_nw_17_1_20120923_20121015 \n", + "\n", + " links \\\n", + "0 [{'href': 'https://planetarycomputer.microsoft... \n", + "1 [{'href': 'https://planetarycomputer.microsoft... \n", + "2 [{'href': 'https://planetarycomputer.microsoft... \n", + "3 [{'href': 'https://planetarycomputer.microsoft... \n", + "4 [{'href': 'https://planetarycomputer.microsoft... \n", + "... ... \n", + "235639 [{'href': 'https://planetarycomputer.microsoft... \n", + "235640 [{'href': 'https://planetarycomputer.microsoft... \n", + "235641 [{'href': 'https://planetarycomputer.microsoft... \n", + "235642 [{'href': 'https://planetarycomputer.microsoft... \n", + "235643 [{'href': 'https://planetarycomputer.microsoft... \n", + "\n", + " stac_extensions stac_version \\\n", + "0 [https://stac-extensions.github.io/eo/v1.0.0/s... 1.0.0 \n", + "1 [https://stac-extensions.github.io/eo/v1.0.0/s... 1.0.0 \n", + "2 [https://stac-extensions.github.io/eo/v1.0.0/s... 1.0.0 \n", + "3 [https://stac-extensions.github.io/eo/v1.0.0/s... 1.0.0 \n", + "4 [https://stac-extensions.github.io/eo/v1.0.0/s... 1.0.0 \n", + "... ... ... \n", + "235639 [https://stac-extensions.github.io/eo/v1.0.0/s... 1.0.0 \n", + "235640 [https://stac-extensions.github.io/eo/v1.0.0/s... 1.0.0 \n", + "235641 [https://stac-extensions.github.io/eo/v1.0.0/s... 1.0.0 \n", + "235642 [https://stac-extensions.github.io/eo/v1.0.0/s... 1.0.0 \n", + "235643 [https://stac-extensions.github.io/eo/v1.0.0/s... 1.0.0 \n", + "\n", + " type datetime gsd naip:state naip:year \\\n", + "0 Feature 2010-04-25 00:00:00+00:00 1.0 ok 2010 \n", + "1 Feature 2010-04-25 00:00:00+00:00 1.0 ok 2010 \n", + "2 Feature 2010-04-25 00:00:00+00:00 1.0 ok 2010 \n", + "3 Feature 2010-04-25 00:00:00+00:00 1.0 ok 2010 \n", + "4 Feature 2010-04-25 00:00:00+00:00 1.0 ok 2010 \n", + "... ... ... ... ... ... \n", + "235639 Feature 2012-09-17 00:00:00+00:00 1.0 tx 2012 \n", + "235640 Feature 2012-09-17 00:00:00+00:00 1.0 tx 2012 \n", + "235641 Feature 2012-09-17 00:00:00+00:00 1.0 tx 2012 \n", + "235642 Feature 2012-09-23 00:00:00+00:00 1.0 tn 2012 \n", + "235643 Feature 2012-09-23 00:00:00+00:00 1.0 tn 2012 \n", + "\n", + " proj:bbox proj:epsg proj:shape \\\n", + "0 [408377.0, 3866212.0, 414752.0, 3873800.0] 26914 [7588, 6375] \n", + "1 [414020.0, 3859229.0, 420395.0, 3866814.0] 26914 [7585, 6375] \n", + "2 [408308.0, 3859281.0, 414687.0, 3866869.0] 26914 [7588, 6379] \n", + "3 [425500.0, 3866067.0, 431862.0, 3873645.0] 26914 [7578, 6362] \n", + "4 [419793.0, 3866112.0, 426159.0, 3873693.0] 26914 [7581, 6366] \n", + "... ... ... ... \n", + "235639 [560230.0, 3263230.0, 566950.0, 3270820.0] 26913 [7590, 6720] \n", + "235640 [572290.0, 3270230.0, 579020.0, 3277820.0] 26913 [7590, 6730] \n", + "235641 [590670.0, 3242660.0, 597420.0, 3250260.0] 26913 [7600, 6750] \n", + "235642 [358759.0, 3977754.0, 365105.0, 3985378.0] 26917 [7624, 6346] \n", + "235643 [353120.0, 3977843.0, 359472.0, 3985470.0] 26917 [7627, 6352] \n", + "\n", + " proj:transform \n", + "0 [1.0, 0.0, 408377.0, 0.0, -1.0, 3873800.0, 0.0... \n", + "1 [1.0, 0.0, 414020.0, 0.0, -1.0, 3866814.0, 0.0... \n", + "2 [1.0, 0.0, 408308.0, 0.0, -1.0, 3866869.0, 0.0... \n", + "3 [1.0, 0.0, 425500.0, 0.0, -1.0, 3873645.0, 0.0... \n", + "4 [1.0, 0.0, 419793.0, 0.0, -1.0, 3873693.0, 0.0... \n", + "... ... \n", + "235639 [1.0, 0.0, 560230.0, 0.0, -1.0, 3270820.0, 0.0... \n", + "235640 [1.0, 0.0, 572290.0, 0.0, -1.0, 3277820.0, 0.0... \n", + "235641 [1.0, 0.0, 590670.0, 0.0, -1.0, 3250260.0, 0.0... \n", + "235642 [1.0, 0.0, 358759.0, 0.0, -1.0, 3985378.0, 0.0... \n", + "235643 [1.0, 0.0, 353120.0, 0.0, -1.0, 3985470.0, 0.0... \n", + "\n", + "[235644 rows x 16 columns]" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Partitioned NAIP\n", + "collection = \"naip\"\n", + "df = geopandas.read_parquet(\n", + " f\"abfs://{container}/{collection}\", storage_options=storage_options\n", + ")\n", + "df" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "pctasks312", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/datasets/stac-geoparquet/workflow.yaml b/datasets/stac-geoparquet/workflow.yaml index 24a1efda..7601cca0 100644 --- a/datasets/stac-geoparquet/workflow.yaml +++ b/datasets/stac-geoparquet/workflow.yaml @@ -1,20 +1,25 @@ name: stac-geoparquet -dataset: microsoft/stac-geoparquet +dataset: stac-geoparquet id: stac-geoparquet jobs: - stac: + geoparquet: tasks: - - id: create - image: pccomponents.azurecr.io/pctasks-stac-geoparquet:2023.7.10.1 + - id: update + image: pccomponents.azurecr.io/pctasks-stac-geoparquet:guhidalgo code: src: ${{ local.path(pc_stac_geoparquet.py) }} + requirements: ${{ local.path(./requirements.txt) }} task: pc_stac_geoparquet:StacGeoparquetTask + # tags: + # # temporarily set this when creating the geoparquet files + # # for a new collection to avoid OOM kills + # batch_pool_id: tsk_db_gr args: table_account_url: "https://pcapi.table.core.windows.net" - table_name: "bluecollectionconfig" + table_name: "greencollectionconfig" storage_options_account_name: "pcstacitems" - # collections: "io-lulc-annual-v02" # Set if you want to generate only one geoparquet file + # collections: "hls2-l30" # Set if you want to generate only one geoparquet file extra_skip: - "chesapeake-lc-13" - "chesapeake-lc-7" From 2beeae58abdd653226f6f227e35b9bd4baf8b8e0 Mon Sep 17 00:00:00 2001 From: Gustavo Hidalgo Date: Sat, 10 May 2025 17:48:05 -0400 Subject: [PATCH 02/14] Fix HLS2 collection metadata --- .../hls2/collection/hls2-l30/template.json | 317 +++++++++++++++ .../hls2/collection/hls2-s30/template.json | 365 ++++++++++++++++++ 2 files changed, 682 insertions(+) create mode 100644 datasets/hls2/collection/hls2-l30/template.json create mode 100644 datasets/hls2/collection/hls2-s30/template.json diff --git a/datasets/hls2/collection/hls2-l30/template.json b/datasets/hls2/collection/hls2-l30/template.json new file mode 100644 index 00000000..f32b0863 --- /dev/null +++ b/datasets/hls2/collection/hls2-l30/template.json @@ -0,0 +1,317 @@ +{ + "stac_version": "1.0.0", + "type": "Collection", + "id": "hls2-l30", + "title": "Harmonized Landsat Sentinel-2 (HLS) Version 2.0, Landsat Data", + "description": "{{ collection.description }}", + "license": "proprietary", + "links": [ + { + "rel": "license", + "href": "https://lpdaac.usgs.gov/data/data-citation-and-policies/", + "title": "LP DAAC - Data Citation and Policies" + } + ], + "stac_extensions": [ + "https://stac-extensions.github.io/item-assets/v1.0.0/schema.json", + "https://stac-extensions.github.io/table/v1.2.0/schema.json", + "https://stac-extensions.github.io/eo/v1.0.0/schema.json", + "https://stac-extensions.github.io/projection/v1.0.0/schema.json", + "https://stac-extensions.github.io/view/v1.0.0/schema.json", + "https://stac-extensions.github.io/scientific/v1.0.0/schema.json" + ], + "keywords": [ + "Sentinel", + "Landsat", + "HLS", + "Satellite", + "Global", + "Imagery" + ], + "msft:short_description": "Harmonized Landsat Sentinel-2 (HLS) Version 2.0, Landsat Data", + "msft:storage_account": "hls2euwest", + "msft:group_id": "hls2", + "msft:container": "hls2", + "msft:region": "westeurope", + "providers": [ + { + "name": "LP DAAC", + "roles": [ + "producer", + "licensor" + ], + "url": "https://doi.org/10.5067/HLS/HLSL30.002" + }, + { + "name": "Microsoft", + "roles": [ + "host" + ], + "url": "https://planetarycomputer.microsoft.com" + } + ], + "assets": { + "thumbnail": { + "type": "image/webp", + "href": "https://ai4edatasetspublicassets.blob.core.windows.net/assets/pc_thumbnails/hls2-l30.webp", + "title": "HLS2 Landsat Collection Thumbnail" + }, + "geoparquet-items": { + "href": "abfs://items/hls2-l30.parquet", + "type": "application/x-parquet", + "roles": [ + "stac-items" + ], + "title": "GeoParquet STAC items", + "description": "Snapshot of the collection's STAC items exported to GeoParquet format.", + "msft:partition_info": { + "is_partitioned": true, + "partition_frequency": "W-MON" + }, + "table:storage_options": { + "account_name": "pcstacitems" + } + } + }, + "summaries": { + "platform": [ + "Landsat 8", + "Landsat 9" + ], + "gsd": [ + 30 + ] + }, + "item_assets": { + "B01": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "B01", + "common_name": "coastal", + "center_wavelength": 0.48, + "full_width_half_max": 0.02 + } + ], + "roles": [ + "data" + ], + "title": "Coastal Aerosol" + }, + "B02": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "B02", + "common_name": "blue", + "center_wavelength": 0.44, + "full_width_half_max": 0.06 + } + ], + "roles": [ + "data" + ], + "title": "Blue" + }, + "B03": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "B03", + "common_name": "green", + "center_wavelength": 0.56, + "full_width_half_max": 0.06 + } + ], + "roles": [ + "data" + ], + "title": "Green" + }, + "B04": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "B04", + "common_name": "red", + "center_wavelength": 0.65, + "full_width_half_max": 0.04 + } + ], + "roles": [ + "data" + ], + "title": "Red" + }, + "B05": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "B05", + "common_name": "nir", + "center_wavelength": 0.86, + "full_width_half_max": 0.03 + } + ], + "roles": [ + "data" + ], + "title": "NIR Narrow" + }, + "B06": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "B06", + "common_name": "swir16", + "center_wavelength": 1.6, + "full_width_half_max": 0.08 + } + ], + "roles": [ + "data" + ], + "title": "SWIR 1" + }, + "B07": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "B07", + "common_name": "swir22", + "center_wavelength": 2.2, + "full_width_half_max": 0.2 + } + ], + "roles": [ + "data" + ], + "title": "SWIR 2" + }, + "B09": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "B09", + "common_name": "cirrus", + "center_wavelength": 1.37, + "full_width_half_max": 0.02 + } + ], + "roles": [ + "data" + ], + "title": "Cirrus" + }, + "B10": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "B10", + "common_name": "lwir11", + "center_wavelength": 10.9, + "full_width_half_max": 0.8 + } + ], + "roles": [ + "data" + ], + "title": "Thermal Infrared 1" + }, + "B11": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "B11", + "common_name": "lwir12", + "center_wavelength": 12.0, + "full_width_half_max": 1.0 + } + ], + "roles": [ + "data" + ], + "title": "Thermal Infrared 2" + }, + "Fmask": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "Fmask" + } + ], + "roles": [ + "data" + ], + "title": "Fmask" + }, + "SZA": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "SZA" + } + ], + "roles": [ + "data" + ], + "title": "SZA" + }, + "SAA": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "SAA" + } + ], + "roles": [ + "data" + ], + "title": "SAA" + }, + "VZA": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "VZA" + } + ], + "roles": [ + "data" + ], + "title": "VZA" + }, + "VAA": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "VAA" + } + ], + "roles": [ + "data" + ], + "title": "VAA" + } + }, + "extent": { + "spatial": { + "bbox": [ + [ + -180, + -90, + 180, + 90 + ] + ] + }, + "temporal": { + "interval": [ + [ + "2020-01-01T00:00:00Z", + null + ] + ] + } + } +} \ No newline at end of file diff --git a/datasets/hls2/collection/hls2-s30/template.json b/datasets/hls2/collection/hls2-s30/template.json new file mode 100644 index 00000000..04f78bee --- /dev/null +++ b/datasets/hls2/collection/hls2-s30/template.json @@ -0,0 +1,365 @@ +{ + "stac_version": "1.0.0", + "type": "Collection", + "id": "hls2-s30", + "title": "Harmonized Landsat Sentinel-2 (HLS) Version 2.0, Sentinel-2 Data", + "description": "{{ collection.description }}", + "license": "proprietary", + "links": [ + { + "rel": "license", + "href": "https://lpdaac.usgs.gov/data/data-citation-and-policies/", + "title": "LP DAAC - Data Citation and Policies" + } + ], + "stac_extensions": [ + "https://stac-extensions.github.io/item-assets/v1.0.0/schema.json", + "https://stac-extensions.github.io/table/v1.2.0/schema.json", + "https://stac-extensions.github.io/eo/v1.0.0/schema.json", + "https://stac-extensions.github.io/projection/v1.0.0/schema.json", + "https://stac-extensions.github.io/view/v1.0.0/schema.json", + "https://stac-extensions.github.io/scientific/v1.0.0/schema.json" + ], + "keywords": [ + "Sentinel", + "Landsat", + "HLS", + "Satellite", + "Global", + "Imagery" + ], + "msft:short_description": "Harmonized Landsat Sentinel-2 (HLS) Version 2.0, Sentinel-2 Data", + "msft:storage_account": "hls2euwest", + "msft:container": "hls2", + "msft:group_id": "hls2", + "msft:region": "westeurope", + "providers": [ + { + "name": "ESA", + "roles": [ + "producer" + ], + "url": "https://www.esa.int/Applications/Observing_the_Earth/Copernicus/Sentinel-2" + }, + { + "name": "LP DAAC", + "roles": [ + "producer", + "licensor" + ], + "url": "https://doi.org/10.5067/HLS/HLSL30.002" + }, + { + "name": "Microsoft", + "roles": [ + "host" + ], + "url": "https://planetarycomputer.microsoft.com" + } + ], + "assets": { + "thumbnail": { + "type": "image/webp", + "href": "https://ai4edatasetspublicassets.blob.core.windows.net/assets/pc_thumbnails/hls2-s30.webp", + "title": "HLS2 Sentinel Collection Thumbnail" + }, + "geoparquet-items": { + "href": "abfs://items/hls2-s30.parquet", + "type": "application/x-parquet", + "roles": [ + "stac-items" + ], + "title": "GeoParquet STAC items", + "description": "Snapshot of the collection's STAC items exported to GeoParquet format.", + "msft:partition_info": { + "is_partitioned": true, + "partition_frequency": "W-MON" + }, + "table:storage_options": { + "account_name": "pcstacitems" + } + } + }, + "summaries": { + "platform": [ + "Sentinel-2A", + "Sentinel-2B", + "Sentinel-2C" + ], + "gsd": [ + 30 + ] + }, + "item_assets": { + "B01": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "B01", + "common_name": "coastal", + "center_wavelength": 0.4439, + "full_width_half_max": 0.027 + } + ], + "roles": [ + "data" + ], + "title": "Coastal Aerosol" + }, + "B02": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "B02", + "common_name": "blue", + "center_wavelength": 0.4966, + "full_width_half_max": 0.098 + } + ], + "roles": [ + "data" + ], + "title": "Blue" + }, + "B03": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "B03", + "common_name": "green", + "center_wavelength": 0.56, + "full_width_half_max": 0.045 + } + ], + "roles": [ + "data" + ], + "title": "Green" + }, + "B04": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "B04", + "common_name": "red", + "center_wavelength": 0.6645, + "full_width_half_max": 0.038 + } + ], + "roles": [ + "data" + ], + "title": "Red" + }, + "B05": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "B05", + "center_wavelength": 0.7039, + "full_width_half_max": 0.019 + } + ], + "roles": [ + "data" + ], + "title": "Red-Edge 1" + }, + "B06": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "B06", + "center_wavelength": 0.7402, + "full_width_half_max": 0.018 + } + ], + "roles": [ + "data" + ], + "title": "Red-Edge 2" + }, + "B07": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "B07", + "center_wavelength": 0.7825, + "full_width_half_max": 0.028 + } + ], + "roles": [ + "data" + ], + "title": "Red-Edge 3" + }, + "B08": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "B08", + "common_name": "nir", + "center_wavelength": 0.8351, + "full_width_half_max": 0.145 + } + ], + "roles": [ + "data" + ], + "title": "NIR Broad" + }, + "B8A": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "B8A", + "center_wavelength": 0.8648, + "full_width_half_max": 0.033 + } + ], + "roles": [ + "data" + ], + "title": "NIR Narrow" + }, + "B09": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "B09", + "center_wavelength": 0.945, + "full_width_half_max": 0.026 + } + ], + "roles": [ + "data" + ], + "title": "Water Vapor" + }, + "B10": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "B10", + "common_name": "cirrus", + "center_wavelength": 1.3735, + "full_width_half_max": 0.075 + } + ], + "roles": [ + "data" + ], + "title": "Cirrus" + }, + "B11": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "B11", + "common_name": "swir16", + "center_wavelength": 1.6137, + "full_width_half_max": 0.143 + } + ], + "roles": [ + "data" + ], + "title": "SWIR 1" + }, + "B12": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "B12", + "common_name": "swir22", + "center_wavelength": 2.22024, + "full_width_half_max": 0.242 + } + ], + "roles": [ + "data" + ], + "title": "SWIR 2" + }, + "Fmask": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "Fmask" + } + ], + "roles": [ + "data" + ], + "title": "Fmask" + }, + "SZA": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "SZA" + } + ], + "roles": [ + "data" + ], + "title": "SZA" + }, + "SAA": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "SAA" + } + ], + "roles": [ + "data" + ], + "title": "SAA" + }, + "VZA": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "VZA" + } + ], + "roles": [ + "data" + ], + "title": "VZA" + }, + "VAA": { + "type": "image/tiff; application=geotiff; profile=cloud-optimized", + "eo:bands": [ + { + "name": "VAA" + } + ], + "roles": [ + "data" + ], + "title": "VAA" + } + }, + "extent": { + "spatial": { + "bbox": [ + [ + -180, + -90, + 180, + 90 + ] + ] + }, + "temporal": { + "interval": [ + [ + "2020-01-01T00:00:00Z", + null + ] + ] + } + } +} \ No newline at end of file From 27694f8ab92dbe148a4575ce0624e5ae0b5b1d70 Mon Sep 17 00:00:00 2001 From: Gustavo Hidalgo Date: Mon, 12 May 2025 19:08:21 -0400 Subject: [PATCH 03/14] Update to latest stac-geoparquet --- .../stac-geoparquet/pc_stac_geoparquet.py | 103 +++++++----------- datasets/stac-geoparquet/requirements.txt | 2 +- 2 files changed, 42 insertions(+), 63 deletions(-) diff --git a/datasets/stac-geoparquet/pc_stac_geoparquet.py b/datasets/stac-geoparquet/pc_stac_geoparquet.py index 0ab780b2..ef574065 100644 --- a/datasets/stac-geoparquet/pc_stac_geoparquet.py +++ b/datasets/stac-geoparquet/pc_stac_geoparquet.py @@ -24,7 +24,7 @@ import pystac import requests from stac_geoparquet.arrow import parse_stac_items_to_arrow, to_parquet -from stac_geoparquet.pgstac_reader import pgstac_to_iter +from stac_geoparquet.pgstac_reader import pgstac_to_iter, get_pgstac_partitions, Partition, pgstac_to_parquet, pgstac_to_arrow from pctasks.core.models.base import PCBaseModel from pctasks.core.models.task import FailedTaskResult, WaitTaskResult @@ -147,35 +147,6 @@ def _pairwise( next(b, None) return zip(a, b) -def query_collection_partitions_updated_after(conninfo: str, updated_after: datetime.datetime | None = None) -> Generator[Partition, None, None]: - with psycopg.connect(conninfo) as conn: - with conn.cursor(row_factory=psycopg.rows.class_row(Partition)) as cur: - q = """ - SELECT - collection, - CASE WHEN lower(partition_dtrange) = '-infinity' OR upper(partition_dtrange) = 'infinity' THEN - 'items.parquet' - ELSE - format( - 'items_%%s_%%s.parquet', - to_char(lower(partition_dtrange),'YYYYMMDD'), - to_char(upper(partition_dtrange),'YYYYMMDD') - ) - END AS partition, - lower(dtrange) as start, - upper(dtrange) as end, - last_updated - FROM partitions_view - """ - args = () - if updated_after is not None: - q += " WHERE last_updated >= %s" - args = (updated_after,) - q += " ORDER BY last_updated asc" - cur.execute(q, args) - for row in cur: - yield row - def _build_output_path( base_output_path: str, part_number: int | None, @@ -200,14 +171,6 @@ def _build_output_path( ) return output_path -@dataclasses.dataclass -class Partition: - collection: str - partition: str - start: datetime.datetime - end: datetime.datetime - last_updated: datetime.datetime - def inject_links(item: dict[str, Any]) -> dict[str, Any]: item["links"] = [ { @@ -264,6 +227,24 @@ def inject_assets(item: dict[str, Any], render_config: str | None) -> dict[str, } return item +def naip_year_to_int(item: dict[str, Any]) -> dict[str, Any]: + """Convert the year to an integer.""" + if "naip:year" in item["properties"] and isinstance(item["properties"]["naip:year"], str): + item["properties"]["naip:year"] = int(item["properties"]["naip:year"]) + return item + +def clean_item(item: dict[str, Any], render_config: str | None) -> dict[str, Any]: + """Clean items by making sure that naip:year is an int and injecting links and assets.""" + item = inject_links(inject_assets(item, render_config)) + + if "proj:epsg" in item["properties"] and not item["properties"]["proj:epsg"]: + # This cannot be null + item["properties"]["proj:epsg"] = "" + + if item["collection"] == "naip": + item = naip_year_to_int(item) + return item + @dataclasses.dataclass class CollectionConfig: """ @@ -330,28 +311,33 @@ def export_partition( conninfo: str, output_protocol: str, output_path: str, - partition: Partition | None = None, + start_datetime: datetime.datetime | None = None, + end_datetime: datetime.datetime | None = None, storage_options: dict[str, Any] | None = None, rewrite: bool = False, - skip_empty_partitions: bool = False, ) -> str | None: - fs = fsspec.filesystem(output_protocol, **storage_options) + # pass + fs = fsspec.filesystem(output_protocol, **storage_options) # type: ignore if fs.exists(output_path) and not rewrite: logger.debug("Path %s already exists.", output_path) return output_path - if partition is not None: - items = pgstac_to_iter( - conninfo, collection=self.collection_id, start_datetime=partition.start, end_datetime=partition.end - ) - else: - items = pgstac_to_iter( - conninfo, collection=self.collection_id - ) - items = map(lambda i: inject_assets(inject_links(i), self.render_config), items) - arrow = parse_stac_items_to_arrow(items) - to_parquet(arrow, output_path, filesystem=fs) + + def _row_func(item: dict[str, Any]) -> dict[str, Any]: + return clean_item(item, self.render_config) + arrow = pgstac_to_arrow( + conninfo=conninfo, + collection=self.collection_id, + start_datetime=start_datetime, + end_datetime=end_datetime, + row_func=_row_func, + ) + to_parquet( + arrow, + output_path, + filesystem=fs) return output_path + def export_partition_for_endpoints( self, endpoints: tuple[datetime.datetime, datetime.datetime], @@ -369,21 +355,14 @@ def export_partition_for_endpoints( """ start, end = endpoints partition_path = _build_output_path(output_path, part_number, total, start, end) - p = Partition( - collection=self.collection_id, - partition=partition_path, - start=start, - end=end, - last_updated=datetime.datetime.now(), - ) return self.export_partition( conninfo, output_protocol, partition_path, - partition=p, + start_datetime=start, + end_datetime=end, storage_options=storage_options, rewrite=rewrite, - skip_empty_partitions=skip_empty_partitions, ) def export_exists( @@ -694,7 +673,7 @@ def run( failure = [] one_year_ago = datetime.datetime.now() - datetime.timedelta(days=365) - collection_partitions = list(query_collection_partitions_updated_after(conninfo=connection_info, updated_after=one_year_ago)) + collection_partitions = list(get_pgstac_partitions(conninfo=connection_info, updated_after=one_year_ago)) recent_collection_updates: dict[str, list[Partition]] = {} for partition in collection_partitions: recent_collection_updates.setdefault(partition.collection, []).append(partition) diff --git a/datasets/stac-geoparquet/requirements.txt b/datasets/stac-geoparquet/requirements.txt index 9ee4f0b3..cb12ccc0 100644 --- a/datasets/stac-geoparquet/requirements.txt +++ b/datasets/stac-geoparquet/requirements.txt @@ -1,5 +1,5 @@ # stac-geoparquet[pgstac,pc]==0.6.0 -git+https://github.com/stac-utils/stac-geoparquet.git@30aa59fde4d2df30b1011340b41af2419887916c#egg=stac-geoparquet[pgstac,pc] +git+https://github.com/stac-utils/stac-geoparquet.git@fda217f821795397ae28b6426fdb62a616a92a1d#egg=stac-geoparquet[pgstac,pc] psycopg[binary,pool]==3.2.6 azure-data-tables==12.5.0 pypgstac==0.8.6 \ No newline at end of file From 31da4a65df2267e5272958c79c0ecdbef1f15535 Mon Sep 17 00:00:00 2001 From: Gustavo Hidalgo Date: Tue, 13 May 2025 12:03:13 -0400 Subject: [PATCH 04/14] Consume latest version now without cast --- .../stac-geoparquet/pc_stac_geoparquet.py | 45 +++++++++++-------- datasets/stac-geoparquet/requirements.txt | 2 +- 2 files changed, 27 insertions(+), 20 deletions(-) diff --git a/datasets/stac-geoparquet/pc_stac_geoparquet.py b/datasets/stac-geoparquet/pc_stac_geoparquet.py index ef574065..b3c7e4b1 100644 --- a/datasets/stac-geoparquet/pc_stac_geoparquet.py +++ b/datasets/stac-geoparquet/pc_stac_geoparquet.py @@ -11,7 +11,7 @@ import os import time import urllib -from typing import Any, Generator, Set, Union +from typing import Any, Set, Union import azure.core.credentials import azure.data.tables @@ -20,11 +20,10 @@ import dateutil import fsspec import pandas as pd -import psycopg import pystac import requests -from stac_geoparquet.arrow import parse_stac_items_to_arrow, to_parquet -from stac_geoparquet.pgstac_reader import pgstac_to_iter, get_pgstac_partitions, Partition, pgstac_to_parquet, pgstac_to_arrow +from stac_geoparquet.arrow import to_parquet +from stac_geoparquet.pgstac_reader import get_pgstac_partitions, Partition, pgstac_to_arrow, pgstac_to_iter from pctasks.core.models.base import PCBaseModel from pctasks.core.models.task import FailedTaskResult, WaitTaskResult @@ -324,20 +323,28 @@ def export_partition( def _row_func(item: dict[str, Any]) -> dict[str, Any]: return clean_item(item, self.render_config) - arrow = pgstac_to_arrow( - conninfo=conninfo, - collection=self.collection_id, - start_datetime=start_datetime, - end_datetime=end_datetime, - row_func=_row_func, - ) - to_parquet( - arrow, - output_path, - filesystem=fs) + if any( + pgstac_to_iter( + conninfo=conninfo, + collection=self.collection_id, + start_datetime=start_datetime, + end_datetime=end_datetime, + row_func=_row_func, + ) + ): + arrow = pgstac_to_arrow( + conninfo=conninfo, + collection=self.collection_id, + start_datetime=start_datetime, + end_datetime=end_datetime, + row_func=_row_func, + ) + to_parquet( + arrow, + output_path, + filesystem=fs) return output_path - def export_partition_for_endpoints( self, endpoints: tuple[datetime.datetime, datetime.datetime], @@ -364,7 +371,7 @@ def export_partition_for_endpoints( storage_options=storage_options, rewrite=rewrite, ) - + def export_exists( self, output_protocol: str, @@ -385,10 +392,10 @@ def export_collection( rewrite: bool = False, skip_empty_partitions: bool = False, ) -> list[str | None]: - + if not self.partition_frequency: logger.info("Exporting single-partition collection %s", self.collection_id) - + results = [ self.export_partition( conninfo, diff --git a/datasets/stac-geoparquet/requirements.txt b/datasets/stac-geoparquet/requirements.txt index cb12ccc0..d1541946 100644 --- a/datasets/stac-geoparquet/requirements.txt +++ b/datasets/stac-geoparquet/requirements.txt @@ -1,5 +1,5 @@ # stac-geoparquet[pgstac,pc]==0.6.0 -git+https://github.com/stac-utils/stac-geoparquet.git@fda217f821795397ae28b6426fdb62a616a92a1d#egg=stac-geoparquet[pgstac,pc] +git+https://github.com/stac-utils/stac-geoparquet.git@89c71d17a4d9e7cde677a40d3eefa775b6b407eb#egg=stac-geoparquet[pgstac,pc] psycopg[binary,pool]==3.2.6 azure-data-tables==12.5.0 pypgstac==0.8.6 \ No newline at end of file From e06d72762acb6268f47f4a63d69d2fcf6aecbe0c Mon Sep 17 00:00:00 2001 From: Gustavo Hidalgo Date: Tue, 13 May 2025 20:01:30 -0400 Subject: [PATCH 05/14] Use pgstac partitioning for partitioned collections --- .../stac-geoparquet/pc_stac_geoparquet.py | 76 +++++++++++++++---- 1 file changed, 63 insertions(+), 13 deletions(-) diff --git a/datasets/stac-geoparquet/pc_stac_geoparquet.py b/datasets/stac-geoparquet/pc_stac_geoparquet.py index b3c7e4b1..72790237 100644 --- a/datasets/stac-geoparquet/pc_stac_geoparquet.py +++ b/datasets/stac-geoparquet/pc_stac_geoparquet.py @@ -383,12 +383,30 @@ def export_exists( output_path = f"{output_protocol}://{output_path}" return fs.exists(output_path) + def _partition_needs_to_be_rewritten( + self, + output_protocol: str, + output_path: str, + storage_options: dict[str, Any], + partition: Partition, + ) -> bool: + fs = fsspec.filesystem(output_protocol, **storage_options) + if output_protocol: + output_path = f"{output_protocol}://{output_path}" + if not fs.exists(output_path): + return True + file_info = fs.info(output_path) + file_modified_time = datetime.datetime.fromtimestamp(file_info["last_modified"]) + partition_modified_time = partition.last_updated + return file_modified_time < partition_modified_time + def export_collection( self, conninfo: str, output_protocol: str, output_path: str, storage_options: dict[str, Any], + pgstac_partitions: dict[str, list[Partition]], rewrite: bool = False, skip_empty_partitions: bool = False, ) -> list[str | None]: @@ -404,11 +422,11 @@ def export_collection( storage_options=storage_options) ] - else: + elif self.partition_frequency and len(pgstac_partitions[self.collection_id]) == 1: endpoints = self.generate_endpoints() total = len(endpoints) logger.info( - "Exporting %d partitions for collection %s", total, self.collection_id + "Exporting %d partitions for collection %s with frequency %s", total, self.collection_id, self.partition_frequency ) results = [] @@ -426,6 +444,45 @@ def export_collection( total=total, ) ) + else: + partitions = pgstac_partitions[self.collection_id] + total = len(partitions) + # some collections are not partitioned in pgstac, some are. + # If a collection is not partition in pgstac, then we will apply the partitioning scheme of the STAC collection + # In pgstac, you always have to opt-into a partitioning scheme, + # either None/Monthly/Yearly in the collections table. + # Ideal size is 10M to 20M rows per partition, but that it dataset dependent. + logger.info( + "Exporting %d partitions for collection %s using pgstac partitions", total, self.collection_id + ) + + results = [] + for i, partition in tqdm.auto.tqdm(enumerate(partitions), total=total): + partition_path = _build_output_path(output_path, i, total, partition.start, partition.end) + if self._partition_needs_to_be_rewritten( + output_protocol=output_protocol, + output_path=partition_path, + storage_options=storage_options, + partition=partition, + ): + results.append( + self.export_partition( + conninfo=conninfo, + output_protocol=output_protocol, + output_path=partition_path, + start_datetime=partition.start, + end_datetime=partition.end, + storage_options=storage_options, + rewrite=rewrite + ) + ) + else: + logger.info( + "Partition %s already exists and was last updated at %s, skipping", + partition_path, + partition.last_updated, + ) + results.append(partition_path) return results @@ -679,23 +736,14 @@ def run( success = [] failure = [] - one_year_ago = datetime.datetime.now() - datetime.timedelta(days=365) - collection_partitions = list(get_pgstac_partitions(conninfo=connection_info, updated_after=one_year_ago)) + collection_partitions = list(get_pgstac_partitions(conninfo=connection_info)) recent_collection_updates: dict[str, list[Partition]] = {} for partition in collection_partitions: recent_collection_updates.setdefault(partition.collection, []).append(partition) - logger.info(f"Found {len(collection_partitions)} partitions updated after {one_year_ago}") + logger.info(f"Found {len(collection_partitions)} pgstac partitions") for i, config in enumerate(configs.values(), 1): output_path=f"items/{config.collection_id}.parquet" - if config.export_exists( - output_protocol=output_protocol, - output_path=output_path, - storage_options=storage_options, - ) and (config.collection_id not in recent_collection_updates): - logger.info(f"Existing collection export for {config.collection_id} has no updates since {one_year_ago}") - continue - logger.info(f"Processing {config.collection_id} [{i}/{N}]") try: t0 = time.monotonic() config.export_collection( @@ -703,7 +751,9 @@ def run( output_protocol, output_path, storage_options, + pgstac_partitions=recent_collection_updates, skip_empty_partitions=True, + rewrite=True ) t1 = time.monotonic() logger.info(f"Completed {config.collection_id} [{i}/{N}] in {t1-t0:.2f}s") From 5117cc5316994482866a8452f17c50cfc123a9a5 Mon Sep 17 00:00:00 2001 From: Gustavo Hidalgo Date: Mon, 4 Aug 2025 11:48:03 -0400 Subject: [PATCH 06/14] save --- datasets/stac-geoparquet/test.ipynb | 632 +++++++++++++++++++++++++++- 1 file changed, 629 insertions(+), 3 deletions(-) diff --git a/datasets/stac-geoparquet/test.ipynb b/datasets/stac-geoparquet/test.ipynb index 4c6e4667..e409ce6d 100644 --- a/datasets/stac-geoparquet/test.ipynb +++ b/datasets/stac-geoparquet/test.ipynb @@ -11,7 +11,7 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 1, "id": "1d4b6b68", "metadata": {}, "outputs": [], @@ -19,7 +19,7 @@ "import geopandas\n", "from azure.identity import DefaultAzureCredential\n", "\n", - "account = \"guhidalgogclite\"\n", + "account = \"pcstacitems\"\n", "container = \"items\"\n", "storage_options = {\n", " \"account_name\": account,\n", @@ -29,7 +29,7 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": null, "id": "3404f46e", "metadata": {}, "outputs": [ @@ -408,6 +408,632 @@ ")\n", "df" ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "20869535", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
assetscollectiongeometryidlinksstac_extensionsstac_versiontypedatetimeend_datetimegsdproj:bboxproj:epsgproj:shapeproj:transformproj:wkt2start_datetime
0{'biomass': {'file:size': 20568072, 'href': 'h...chloris-biomassPOLYGON ((-179.95 90, -179.95 -60, 179.95 -60,...chloris_biomass_50km_2017[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/file/v2.0.0...1.0.0Feature2017-01-01 00:00:00+00:002017-07-31 00:00:00+00:004633[-20015109.354, -6671703.11790004, 20015109.35...[3600, 8640][4633.12716525001, 0.0, -20015109.354, 0.0, -4...PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU...2016-07-31 00:00:00+00:00
1{'biomass': {'file:size': 20581566, 'href': 'h...chloris-biomassPOLYGON ((-179.95 90, -179.95 -60, 179.95 -60,...chloris_biomass_50km_2018[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/file/v2.0.0...1.0.0Feature2018-01-01 00:00:00+00:002018-07-31 00:00:00+00:004633[-20015109.354, -6671703.11790004, 20015109.35...[3600, 8640][4633.12716525001, 0.0, -20015109.354, 0.0, -4...PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU...2017-07-31 00:00:00+00:00
2{'biomass': {'file:size': 20524826, 'href': 'h...chloris-biomassPOLYGON ((-179.95 90, -179.95 -60, 179.95 -60,...chloris_biomass_50km_2012[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/file/v2.0.0...1.0.0Feature2012-01-01 00:00:00+00:002012-07-31 00:00:00+00:004633[-20015109.354, -6671703.11790004, 20015109.35...[3600, 8640][4633.12716525001, 0.0, -20015109.354, 0.0, -4...PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU...2011-07-31 00:00:00+00:00
3{'biomass': {'file:size': 20527688, 'href': 'h...chloris-biomassPOLYGON ((-179.95 90, -179.95 -60, 179.95 -60,...chloris_biomass_50km_2013[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/file/v2.0.0...1.0.0Feature2013-01-01 00:00:00+00:002013-07-31 00:00:00+00:004633[-20015109.354, -6671703.11790004, 20015109.35...[3600, 8640][4633.12716525001, 0.0, -20015109.354, 0.0, -4...PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU...2012-07-31 00:00:00+00:00
4{'biomass': {'file:size': 20485203, 'href': 'h...chloris-biomassPOLYGON ((-179.95 90, -179.95 -60, 179.95 -60,...chloris_biomass_50km_2007[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/file/v2.0.0...1.0.0Feature2007-01-01 00:00:00+00:002007-07-31 00:00:00+00:004633[-20015109.354, -6671703.11790004, 20015109.35...[3600, 8640][4633.12716525001, 0.0, -20015109.354, 0.0, -4...PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU...2006-07-31 00:00:00+00:00
5{'biomass': {'file:size': 20489507, 'href': 'h...chloris-biomassPOLYGON ((-179.95 90, -179.95 -60, 179.95 -60,...chloris_biomass_50km_2008[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/file/v2.0.0...1.0.0Feature2008-01-01 00:00:00+00:002008-07-31 00:00:00+00:004633[-20015109.354, -6671703.11790004, 20015109.35...[3600, 8640][4633.12716525001, 0.0, -20015109.354, 0.0, -4...PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU...2007-07-31 00:00:00+00:00
6{'biomass': {'file:size': 20495912, 'href': 'h...chloris-biomassPOLYGON ((-179.95 90, -179.95 -60, 179.95 -60,...chloris_biomass_50km_2004[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/file/v2.0.0...1.0.0Feature2004-01-01 00:00:00+00:002004-07-31 00:00:00+00:004633[-20015109.354, -6671703.11790004, 20015109.35...[3600, 8640][4633.12716525001, 0.0, -20015109.354, 0.0, -4...PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU...2003-07-31 00:00:00+00:00
7{'biomass': {'file:size': 20502386, 'href': 'h...chloris-biomassPOLYGON ((-179.95 90, -179.95 -60, 179.95 -60,...chloris_biomass_50km_2003[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/file/v2.0.0...1.0.0Feature2003-01-01 00:00:00+00:002003-07-31 00:00:00+00:004633[-20015109.354, -6671703.11790004, 20015109.35...[3600, 8640][4633.12716525001, 0.0, -20015109.354, 0.0, -4...PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU...2002-07-31 00:00:00+00:00
8{'biomass': {'file:size': 20496575, 'href': 'h...chloris-biomassPOLYGON ((-179.95 90, -179.95 -60, 179.95 -60,...chloris_biomass_50km_2009[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/file/v2.0.0...1.0.0Feature2009-01-01 00:00:00+00:002009-07-31 00:00:00+00:004633[-20015109.354, -6671703.11790004, 20015109.35...[3600, 8640][4633.12716525001, 0.0, -20015109.354, 0.0, -4...PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU...2008-07-31 00:00:00+00:00
9{'biomass': {'file:size': 20482098, 'href': 'h...chloris-biomassPOLYGON ((-179.95 90, -179.95 -60, 179.95 -60,...chloris_biomass_50km_2005[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/file/v2.0.0...1.0.0Feature2005-01-01 00:00:00+00:002005-07-31 00:00:00+00:004633[-20015109.354, -6671703.11790004, 20015109.35...[3600, 8640][4633.12716525001, 0.0, -20015109.354, 0.0, -4...PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU...2004-07-31 00:00:00+00:00
10{'biomass': {'file:size': 20533390, 'href': 'h...chloris-biomassPOLYGON ((-179.95 90, -179.95 -60, 179.95 -60,...chloris_biomass_50km_2014[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/file/v2.0.0...1.0.0Feature2014-01-01 00:00:00+00:002014-07-31 00:00:00+00:004633[-20015109.354, -6671703.11790004, 20015109.35...[3600, 8640][4633.12716525001, 0.0, -20015109.354, 0.0, -4...PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU...2013-07-31 00:00:00+00:00
11{'biomass': {'file:size': 20609463, 'href': 'h...chloris-biomassPOLYGON ((-179.95 90, -179.95 -60, 179.95 -60,...chloris_biomass_50km_2019[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/file/v2.0.0...1.0.0Feature2019-01-01 00:00:00+00:002019-07-31 00:00:00+00:004633[-20015109.354, -6671703.11790004, 20015109.35...[3600, 8640][4633.12716525001, 0.0, -20015109.354, 0.0, -4...PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU...2018-07-31 00:00:00+00:00
12{'biomass': {'file:size': 20484635, 'href': 'h...chloris-biomassPOLYGON ((-179.95 90, -179.95 -60, 179.95 -60,...chloris_biomass_50km_2006[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/file/v2.0.0...1.0.0Feature2006-01-01 00:00:00+00:002006-07-31 00:00:00+00:004633[-20015109.354, -6671703.11790004, 20015109.35...[3600, 8640][4633.12716525001, 0.0, -20015109.354, 0.0, -4...PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU...2005-07-31 00:00:00+00:00
13{'biomass': {'file:size': 20508844, 'href': 'h...chloris-biomassPOLYGON ((-179.95 90, -179.95 -60, 179.95 -60,...chloris_biomass_50km_2010[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/file/v2.0.0...1.0.0Feature2010-01-01 00:00:00+00:002010-07-31 00:00:00+00:004633[-20015109.354, -6671703.11790004, 20015109.35...[3600, 8640][4633.12716525001, 0.0, -20015109.354, 0.0, -4...PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU...2009-07-31 00:00:00+00:00
14{'biomass': {'file:size': 20521439, 'href': 'h...chloris-biomassPOLYGON ((-179.95 90, -179.95 -60, 179.95 -60,...chloris_biomass_50km_2011[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/file/v2.0.0...1.0.0Feature2011-01-01 00:00:00+00:002011-07-31 00:00:00+00:004633[-20015109.354, -6671703.11790004, 20015109.35...[3600, 8640][4633.12716525001, 0.0, -20015109.354, 0.0, -4...PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU...2010-07-31 00:00:00+00:00
15{'biomass': {'file:size': 20542835, 'href': 'h...chloris-biomassPOLYGON ((-179.95 90, -179.95 -60, 179.95 -60,...chloris_biomass_50km_2015[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/file/v2.0.0...1.0.0Feature2015-01-01 00:00:00+00:002015-07-31 00:00:00+00:004633[-20015109.354, -6671703.11790004, 20015109.35...[3600, 8640][4633.12716525001, 0.0, -20015109.354, 0.0, -4...PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU...2014-07-31 00:00:00+00:00
16{'biomass': {'file:size': 20556718, 'href': 'h...chloris-biomassPOLYGON ((-179.95 90, -179.95 -60, 179.95 -60,...chloris_biomass_50km_2016[{'href': 'https://planetarycomputer.microsoft...[https://stac-extensions.github.io/file/v2.0.0...1.0.0Feature2016-01-01 00:00:00+00:002016-07-31 00:00:00+00:004633[-20015109.354, -6671703.11790004, 20015109.35...[3600, 8640][4633.12716525001, 0.0, -20015109.354, 0.0, -4...PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU...2015-07-31 00:00:00+00:00
\n", + "
" + ], + "text/plain": [ + " assets collection \\\n", + "0 {'biomass': {'file:size': 20568072, 'href': 'h... chloris-biomass \n", + "1 {'biomass': {'file:size': 20581566, 'href': 'h... chloris-biomass \n", + "2 {'biomass': {'file:size': 20524826, 'href': 'h... chloris-biomass \n", + "3 {'biomass': {'file:size': 20527688, 'href': 'h... chloris-biomass \n", + "4 {'biomass': {'file:size': 20485203, 'href': 'h... chloris-biomass \n", + "5 {'biomass': {'file:size': 20489507, 'href': 'h... chloris-biomass \n", + "6 {'biomass': {'file:size': 20495912, 'href': 'h... chloris-biomass \n", + "7 {'biomass': {'file:size': 20502386, 'href': 'h... chloris-biomass \n", + "8 {'biomass': {'file:size': 20496575, 'href': 'h... chloris-biomass \n", + "9 {'biomass': {'file:size': 20482098, 'href': 'h... chloris-biomass \n", + "10 {'biomass': {'file:size': 20533390, 'href': 'h... chloris-biomass \n", + "11 {'biomass': {'file:size': 20609463, 'href': 'h... chloris-biomass \n", + "12 {'biomass': {'file:size': 20484635, 'href': 'h... chloris-biomass \n", + "13 {'biomass': {'file:size': 20508844, 'href': 'h... chloris-biomass \n", + "14 {'biomass': {'file:size': 20521439, 'href': 'h... chloris-biomass \n", + "15 {'biomass': {'file:size': 20542835, 'href': 'h... chloris-biomass \n", + "16 {'biomass': {'file:size': 20556718, 'href': 'h... chloris-biomass \n", + "\n", + " geometry \\\n", + "0 POLYGON ((-179.95 90, -179.95 -60, 179.95 -60,... \n", + "1 POLYGON ((-179.95 90, -179.95 -60, 179.95 -60,... \n", + "2 POLYGON ((-179.95 90, -179.95 -60, 179.95 -60,... \n", + "3 POLYGON ((-179.95 90, -179.95 -60, 179.95 -60,... \n", + "4 POLYGON ((-179.95 90, -179.95 -60, 179.95 -60,... \n", + "5 POLYGON ((-179.95 90, -179.95 -60, 179.95 -60,... \n", + "6 POLYGON ((-179.95 90, -179.95 -60, 179.95 -60,... \n", + "7 POLYGON ((-179.95 90, -179.95 -60, 179.95 -60,... \n", + "8 POLYGON ((-179.95 90, -179.95 -60, 179.95 -60,... \n", + "9 POLYGON ((-179.95 90, -179.95 -60, 179.95 -60,... \n", + "10 POLYGON ((-179.95 90, -179.95 -60, 179.95 -60,... \n", + "11 POLYGON ((-179.95 90, -179.95 -60, 179.95 -60,... \n", + "12 POLYGON ((-179.95 90, -179.95 -60, 179.95 -60,... \n", + "13 POLYGON ((-179.95 90, -179.95 -60, 179.95 -60,... \n", + "14 POLYGON ((-179.95 90, -179.95 -60, 179.95 -60,... \n", + "15 POLYGON ((-179.95 90, -179.95 -60, 179.95 -60,... \n", + "16 POLYGON ((-179.95 90, -179.95 -60, 179.95 -60,... \n", + "\n", + " id \\\n", + "0 chloris_biomass_50km_2017 \n", + "1 chloris_biomass_50km_2018 \n", + "2 chloris_biomass_50km_2012 \n", + "3 chloris_biomass_50km_2013 \n", + "4 chloris_biomass_50km_2007 \n", + "5 chloris_biomass_50km_2008 \n", + "6 chloris_biomass_50km_2004 \n", + "7 chloris_biomass_50km_2003 \n", + "8 chloris_biomass_50km_2009 \n", + "9 chloris_biomass_50km_2005 \n", + "10 chloris_biomass_50km_2014 \n", + "11 chloris_biomass_50km_2019 \n", + "12 chloris_biomass_50km_2006 \n", + "13 chloris_biomass_50km_2010 \n", + "14 chloris_biomass_50km_2011 \n", + "15 chloris_biomass_50km_2015 \n", + "16 chloris_biomass_50km_2016 \n", + "\n", + " links \\\n", + "0 [{'href': 'https://planetarycomputer.microsoft... \n", + "1 [{'href': 'https://planetarycomputer.microsoft... \n", + "2 [{'href': 'https://planetarycomputer.microsoft... \n", + "3 [{'href': 'https://planetarycomputer.microsoft... \n", + "4 [{'href': 'https://planetarycomputer.microsoft... \n", + "5 [{'href': 'https://planetarycomputer.microsoft... \n", + "6 [{'href': 'https://planetarycomputer.microsoft... \n", + "7 [{'href': 'https://planetarycomputer.microsoft... \n", + "8 [{'href': 'https://planetarycomputer.microsoft... \n", + "9 [{'href': 'https://planetarycomputer.microsoft... \n", + "10 [{'href': 'https://planetarycomputer.microsoft... \n", + "11 [{'href': 'https://planetarycomputer.microsoft... \n", + "12 [{'href': 'https://planetarycomputer.microsoft... \n", + "13 [{'href': 'https://planetarycomputer.microsoft... \n", + "14 [{'href': 'https://planetarycomputer.microsoft... \n", + "15 [{'href': 'https://planetarycomputer.microsoft... \n", + "16 [{'href': 'https://planetarycomputer.microsoft... \n", + "\n", + " stac_extensions stac_version type \\\n", + "0 [https://stac-extensions.github.io/file/v2.0.0... 1.0.0 Feature \n", + "1 [https://stac-extensions.github.io/file/v2.0.0... 1.0.0 Feature \n", + "2 [https://stac-extensions.github.io/file/v2.0.0... 1.0.0 Feature \n", + "3 [https://stac-extensions.github.io/file/v2.0.0... 1.0.0 Feature \n", + "4 [https://stac-extensions.github.io/file/v2.0.0... 1.0.0 Feature \n", + "5 [https://stac-extensions.github.io/file/v2.0.0... 1.0.0 Feature \n", + "6 [https://stac-extensions.github.io/file/v2.0.0... 1.0.0 Feature \n", + "7 [https://stac-extensions.github.io/file/v2.0.0... 1.0.0 Feature \n", + "8 [https://stac-extensions.github.io/file/v2.0.0... 1.0.0 Feature \n", + "9 [https://stac-extensions.github.io/file/v2.0.0... 1.0.0 Feature \n", + "10 [https://stac-extensions.github.io/file/v2.0.0... 1.0.0 Feature \n", + "11 [https://stac-extensions.github.io/file/v2.0.0... 1.0.0 Feature \n", + "12 [https://stac-extensions.github.io/file/v2.0.0... 1.0.0 Feature \n", + "13 [https://stac-extensions.github.io/file/v2.0.0... 1.0.0 Feature \n", + "14 [https://stac-extensions.github.io/file/v2.0.0... 1.0.0 Feature \n", + "15 [https://stac-extensions.github.io/file/v2.0.0... 1.0.0 Feature \n", + "16 [https://stac-extensions.github.io/file/v2.0.0... 1.0.0 Feature \n", + "\n", + " datetime end_datetime gsd \\\n", + "0 2017-01-01 00:00:00+00:00 2017-07-31 00:00:00+00:00 4633 \n", + "1 2018-01-01 00:00:00+00:00 2018-07-31 00:00:00+00:00 4633 \n", + "2 2012-01-01 00:00:00+00:00 2012-07-31 00:00:00+00:00 4633 \n", + "3 2013-01-01 00:00:00+00:00 2013-07-31 00:00:00+00:00 4633 \n", + "4 2007-01-01 00:00:00+00:00 2007-07-31 00:00:00+00:00 4633 \n", + "5 2008-01-01 00:00:00+00:00 2008-07-31 00:00:00+00:00 4633 \n", + "6 2004-01-01 00:00:00+00:00 2004-07-31 00:00:00+00:00 4633 \n", + "7 2003-01-01 00:00:00+00:00 2003-07-31 00:00:00+00:00 4633 \n", + "8 2009-01-01 00:00:00+00:00 2009-07-31 00:00:00+00:00 4633 \n", + "9 2005-01-01 00:00:00+00:00 2005-07-31 00:00:00+00:00 4633 \n", + "10 2014-01-01 00:00:00+00:00 2014-07-31 00:00:00+00:00 4633 \n", + "11 2019-01-01 00:00:00+00:00 2019-07-31 00:00:00+00:00 4633 \n", + "12 2006-01-01 00:00:00+00:00 2006-07-31 00:00:00+00:00 4633 \n", + "13 2010-01-01 00:00:00+00:00 2010-07-31 00:00:00+00:00 4633 \n", + "14 2011-01-01 00:00:00+00:00 2011-07-31 00:00:00+00:00 4633 \n", + "15 2015-01-01 00:00:00+00:00 2015-07-31 00:00:00+00:00 4633 \n", + "16 2016-01-01 00:00:00+00:00 2016-07-31 00:00:00+00:00 4633 \n", + "\n", + " proj:bbox proj:epsg proj:shape \\\n", + "0 [-20015109.354, -6671703.11790004, 20015109.35... [3600, 8640] \n", + "1 [-20015109.354, -6671703.11790004, 20015109.35... [3600, 8640] \n", + "2 [-20015109.354, -6671703.11790004, 20015109.35... [3600, 8640] \n", + "3 [-20015109.354, -6671703.11790004, 20015109.35... [3600, 8640] \n", + "4 [-20015109.354, -6671703.11790004, 20015109.35... [3600, 8640] \n", + "5 [-20015109.354, -6671703.11790004, 20015109.35... [3600, 8640] \n", + "6 [-20015109.354, -6671703.11790004, 20015109.35... [3600, 8640] \n", + "7 [-20015109.354, -6671703.11790004, 20015109.35... [3600, 8640] \n", + "8 [-20015109.354, -6671703.11790004, 20015109.35... [3600, 8640] \n", + "9 [-20015109.354, -6671703.11790004, 20015109.35... [3600, 8640] \n", + "10 [-20015109.354, -6671703.11790004, 20015109.35... [3600, 8640] \n", + "11 [-20015109.354, -6671703.11790004, 20015109.35... [3600, 8640] \n", + "12 [-20015109.354, -6671703.11790004, 20015109.35... [3600, 8640] \n", + "13 [-20015109.354, -6671703.11790004, 20015109.35... [3600, 8640] \n", + "14 [-20015109.354, -6671703.11790004, 20015109.35... [3600, 8640] \n", + "15 [-20015109.354, -6671703.11790004, 20015109.35... [3600, 8640] \n", + "16 [-20015109.354, -6671703.11790004, 20015109.35... [3600, 8640] \n", + "\n", + " proj:transform \\\n", + "0 [4633.12716525001, 0.0, -20015109.354, 0.0, -4... \n", + "1 [4633.12716525001, 0.0, -20015109.354, 0.0, -4... \n", + "2 [4633.12716525001, 0.0, -20015109.354, 0.0, -4... \n", + "3 [4633.12716525001, 0.0, -20015109.354, 0.0, -4... \n", + "4 [4633.12716525001, 0.0, -20015109.354, 0.0, -4... \n", + "5 [4633.12716525001, 0.0, -20015109.354, 0.0, -4... \n", + "6 [4633.12716525001, 0.0, -20015109.354, 0.0, -4... \n", + "7 [4633.12716525001, 0.0, -20015109.354, 0.0, -4... \n", + "8 [4633.12716525001, 0.0, -20015109.354, 0.0, -4... \n", + "9 [4633.12716525001, 0.0, -20015109.354, 0.0, -4... \n", + "10 [4633.12716525001, 0.0, -20015109.354, 0.0, -4... \n", + "11 [4633.12716525001, 0.0, -20015109.354, 0.0, -4... \n", + "12 [4633.12716525001, 0.0, -20015109.354, 0.0, -4... \n", + "13 [4633.12716525001, 0.0, -20015109.354, 0.0, -4... \n", + "14 [4633.12716525001, 0.0, -20015109.354, 0.0, -4... \n", + "15 [4633.12716525001, 0.0, -20015109.354, 0.0, -4... \n", + "16 [4633.12716525001, 0.0, -20015109.354, 0.0, -4... \n", + "\n", + " proj:wkt2 \\\n", + "0 PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU... \n", + "1 PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU... \n", + "2 PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU... \n", + "3 PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU... \n", + "4 PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU... \n", + "5 PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU... \n", + "6 PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU... \n", + "7 PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU... \n", + "8 PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU... \n", + "9 PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU... \n", + "10 PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU... \n", + "11 PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU... \n", + "12 PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU... \n", + "13 PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU... \n", + "14 PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU... \n", + "15 PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU... \n", + "16 PROJCS[\"unnamed\",GEOGCS[\"unnamed ellipse\",DATU... \n", + "\n", + " start_datetime \n", + "0 2016-07-31 00:00:00+00:00 \n", + "1 2017-07-31 00:00:00+00:00 \n", + "2 2011-07-31 00:00:00+00:00 \n", + "3 2012-07-31 00:00:00+00:00 \n", + "4 2006-07-31 00:00:00+00:00 \n", + "5 2007-07-31 00:00:00+00:00 \n", + "6 2003-07-31 00:00:00+00:00 \n", + "7 2002-07-31 00:00:00+00:00 \n", + "8 2008-07-31 00:00:00+00:00 \n", + "9 2004-07-31 00:00:00+00:00 \n", + "10 2013-07-31 00:00:00+00:00 \n", + "11 2018-07-31 00:00:00+00:00 \n", + "12 2005-07-31 00:00:00+00:00 \n", + "13 2009-07-31 00:00:00+00:00 \n", + "14 2010-07-31 00:00:00+00:00 \n", + "15 2014-07-31 00:00:00+00:00 \n", + "16 2015-07-31 00:00:00+00:00 " + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Unpartitioned mobi\n", + "collection = \"io-biodiversity\"\n", + "df = geopandas.read_parquet(\n", + " f\"abfs://{container}/{collection}.parquet\", storage_options=storage_options\n", + ")\n", + "df" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fbc097b6", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "1" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "len(df)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ceb42bf3", + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { From 27a3ac4275949060a678b1f3d2bde31160d9c19b Mon Sep 17 00:00:00 2001 From: Liam Morrison Date: Tue, 7 Oct 2025 09:42:05 -0700 Subject: [PATCH 07/14] update stac-geoparquet partitioned export and usage of stac-geoparquet --- .../stac-geoparquet/pc_stac_geoparquet.py | 39 +++++++++++++------ datasets/stac-geoparquet/requirements.txt | 8 +++- datasets/stac-geoparquet/workflow.yaml | 4 +- datasets/stac-geoparquet/workflow_test.yaml | 4 +- 4 files changed, 37 insertions(+), 18 deletions(-) diff --git a/datasets/stac-geoparquet/pc_stac_geoparquet.py b/datasets/stac-geoparquet/pc_stac_geoparquet.py index 72790237..d3ed9c17 100644 --- a/datasets/stac-geoparquet/pc_stac_geoparquet.py +++ b/datasets/stac-geoparquet/pc_stac_geoparquet.py @@ -30,6 +30,7 @@ from pctasks.task.context import TaskContext from pctasks.task.task import Task import tqdm.auto +import tempfile handler = logging.StreamHandler() handler.setFormatter(logging.Formatter("[%(levelname)s]:%(asctime)s: %(message)s")) @@ -38,6 +39,7 @@ logger.addHandler(handler) logger.setLevel(logging.DEBUG) +CHUNK_SIZE = 8192 PARTITION_FREQUENCIES = { "3dep-lidar-classification": "YS", @@ -332,17 +334,22 @@ def _row_func(item: dict[str, Any]) -> dict[str, Any]: row_func=_row_func, ) ): - arrow = pgstac_to_arrow( - conninfo=conninfo, - collection=self.collection_id, - start_datetime=start_datetime, - end_datetime=end_datetime, - row_func=_row_func, - ) - to_parquet( - arrow, - output_path, - filesystem=fs) + logger.info(f"Running parquet export with chunk size of {CHUNK_SIZE}") + with tempfile.TemporaryDirectory() as tmpdir: + arrow = pgstac_to_arrow( + conninfo=conninfo, + collection=self.collection_id, + start_datetime=start_datetime, + end_datetime=end_datetime, + row_func=_row_func, + schema="ChunksToDisk", + tmpdir=tmpdir, + chunk_size=CHUNK_SIZE + ) + to_parquet( + arrow, + output_path, + filesystem=fs) return output_path def export_partition_for_endpoints( @@ -396,7 +403,15 @@ def _partition_needs_to_be_rewritten( if not fs.exists(output_path): return True file_info = fs.info(output_path) - file_modified_time = datetime.datetime.fromtimestamp(file_info["last_modified"]) + + # Handle case where last_modified is already a datetime object or a timestamp + last_modified = file_info["last_modified"] + if isinstance(last_modified, datetime.datetime): + file_modified_time = last_modified + else: + # Assume it's a timestamp (int/float) + file_modified_time = datetime.datetime.fromtimestamp(last_modified) + partition_modified_time = partition.last_updated return file_modified_time < partition_modified_time diff --git a/datasets/stac-geoparquet/requirements.txt b/datasets/stac-geoparquet/requirements.txt index d1541946..a6be2c32 100644 --- a/datasets/stac-geoparquet/requirements.txt +++ b/datasets/stac-geoparquet/requirements.txt @@ -1,5 +1,9 @@ # stac-geoparquet[pgstac,pc]==0.6.0 -git+https://github.com/stac-utils/stac-geoparquet.git@89c71d17a4d9e7cde677a40d3eefa775b6b407eb#egg=stac-geoparquet[pgstac,pc] +git+https://github.com/liamlego/stac-geoparquet.git@47aff71bc7eb674b1f2ffa38a1f1496c59cf7319#egg=stac-geoparquet[pgstac,pc] psycopg[binary,pool]==3.2.6 azure-data-tables==12.5.0 -pypgstac==0.8.6 \ No newline at end of file +pypgstac==0.8.6 +fsspec==2025.9.0 +tqdm==4.67.1 +adlfs==2025.8.0 +azure-datalake-store==0.0.53 diff --git a/datasets/stac-geoparquet/workflow.yaml b/datasets/stac-geoparquet/workflow.yaml index 7601cca0..7b4d4f26 100644 --- a/datasets/stac-geoparquet/workflow.yaml +++ b/datasets/stac-geoparquet/workflow.yaml @@ -6,7 +6,7 @@ jobs: geoparquet: tasks: - id: update - image: pccomponents.azurecr.io/pctasks-stac-geoparquet:guhidalgo + image: pccomponents.azurecr.io/pctasks-stac-geoparquet:2025.10.1.1 code: src: ${{ local.path(pc_stac_geoparquet.py) }} requirements: ${{ local.path(./requirements.txt) }} @@ -19,7 +19,7 @@ jobs: table_account_url: "https://pcapi.table.core.windows.net" table_name: "greencollectionconfig" storage_options_account_name: "pcstacitems" - # collections: "hls2-l30" # Set if you want to generate only one geoparquet file + collections: "sentinel-2-l2a" # Set if you want to generate only one geoparquet file extra_skip: - "chesapeake-lc-13" - "chesapeake-lc-7" diff --git a/datasets/stac-geoparquet/workflow_test.yaml b/datasets/stac-geoparquet/workflow_test.yaml index 14d13c19..07e0b724 100644 --- a/datasets/stac-geoparquet/workflow_test.yaml +++ b/datasets/stac-geoparquet/workflow_test.yaml @@ -6,7 +6,7 @@ jobs: stac: tasks: - id: create - image: pccomponentstest.azurecr.io/pctasks-stac-geoparquet:2023.7.10.1 + image: pccomponentstest.azurecr.io/pctasks-stac-geoparquet:2025.9.25.0 code: src: ${{ local.path(pc_stac_geoparquet.py) }} task: pc_stac_geoparquet:StacGeoparquetTask @@ -14,7 +14,7 @@ jobs: table_account_url: "https://pctapisstagingsa.table.core.windows.net" table_name: "collectionconfig" storage_options_account_name: "pcstacitems" - collections: "io-lulc-annual-v02" + collections: "sentinel-2-l2a" environment: APPLICATIONINSIGHTS_CONNECTION_STRING: ${{ secrets.task-application-insights-connection-string }} STAC_GEOPARQUET_CONNECTION_INFO: ${{secrets.pgstac-connection-string}} From 3885deb1a3d3a61a084cbcc3c38c7bde9959382a Mon Sep 17 00:00:00 2001 From: Liam Morrison Date: Tue, 28 Oct 2025 14:54:45 -0700 Subject: [PATCH 08/14] run off of bitners commit --- datasets/stac-geoparquet/requirements.txt | 2 +- datasets/stac-geoparquet/workflow.yaml | 2 +- datasets/stac-geoparquet/workflow_test.yaml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datasets/stac-geoparquet/requirements.txt b/datasets/stac-geoparquet/requirements.txt index a6be2c32..98834a5c 100644 --- a/datasets/stac-geoparquet/requirements.txt +++ b/datasets/stac-geoparquet/requirements.txt @@ -1,5 +1,5 @@ # stac-geoparquet[pgstac,pc]==0.6.0 -git+https://github.com/liamlego/stac-geoparquet.git@47aff71bc7eb674b1f2ffa38a1f1496c59cf7319#egg=stac-geoparquet[pgstac,pc] +git+https://github.com/stac-utils/stac-geoparquet.git@3efa35bdd9d467334f260395d5edc9347fe45efb#egg=stac-geoparquet[pgstac,pc] psycopg[binary,pool]==3.2.6 azure-data-tables==12.5.0 pypgstac==0.8.6 diff --git a/datasets/stac-geoparquet/workflow.yaml b/datasets/stac-geoparquet/workflow.yaml index 7b4d4f26..8597e3c3 100644 --- a/datasets/stac-geoparquet/workflow.yaml +++ b/datasets/stac-geoparquet/workflow.yaml @@ -6,7 +6,7 @@ jobs: geoparquet: tasks: - id: update - image: pccomponents.azurecr.io/pctasks-stac-geoparquet:2025.10.1.1 + image: pccomponents.azurecr.io/pctasks-stac-geoparquet:2025.10.28.0 code: src: ${{ local.path(pc_stac_geoparquet.py) }} requirements: ${{ local.path(./requirements.txt) }} diff --git a/datasets/stac-geoparquet/workflow_test.yaml b/datasets/stac-geoparquet/workflow_test.yaml index 07e0b724..2223d546 100644 --- a/datasets/stac-geoparquet/workflow_test.yaml +++ b/datasets/stac-geoparquet/workflow_test.yaml @@ -6,7 +6,7 @@ jobs: stac: tasks: - id: create - image: pccomponentstest.azurecr.io/pctasks-stac-geoparquet:2025.9.25.0 + image: pccomponentstest.azurecr.io/pctasks-stac-geoparquet:2025.10.28.0 code: src: ${{ local.path(pc_stac_geoparquet.py) }} task: pc_stac_geoparquet:StacGeoparquetTask From a47dbb85adeaabaa83ee4e3d514f06f518b46bbf Mon Sep 17 00:00:00 2001 From: Liam Morrison Date: Tue, 28 Oct 2025 15:15:16 -0700 Subject: [PATCH 09/14] update to not use pgstac_to_arrow --- .../stac-geoparquet/pc_stac_geoparquet.py | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/datasets/stac-geoparquet/pc_stac_geoparquet.py b/datasets/stac-geoparquet/pc_stac_geoparquet.py index d3ed9c17..08060597 100644 --- a/datasets/stac-geoparquet/pc_stac_geoparquet.py +++ b/datasets/stac-geoparquet/pc_stac_geoparquet.py @@ -23,7 +23,11 @@ import pystac import requests from stac_geoparquet.arrow import to_parquet -from stac_geoparquet.pgstac_reader import get_pgstac_partitions, Partition, pgstac_to_arrow, pgstac_to_iter +from stac_geoparquet.pgstac_reader import ( + get_pgstac_partitions, + Partition, + pgstac_to_iter +) from pctasks.core.models.base import PCBaseModel from pctasks.core.models.task import FailedTaskResult, WaitTaskResult @@ -31,6 +35,10 @@ from pctasks.task.task import Task import tqdm.auto import tempfile +from stac_geoparquet.arrow import ( + parse_stac_items_to_arrow, +) + handler = logging.StreamHandler() handler.setFormatter(logging.Formatter("[%(levelname)s]:%(asctime)s: %(message)s")) @@ -336,16 +344,17 @@ def _row_func(item: dict[str, Any]) -> dict[str, Any]: ): logger.info(f"Running parquet export with chunk size of {CHUNK_SIZE}") with tempfile.TemporaryDirectory() as tmpdir: - arrow = pgstac_to_arrow( - conninfo=conninfo, + items = pgstac_to_iter( + conninfo, collection=self.collection_id, start_datetime=start_datetime, end_datetime=end_datetime, + cursor_itersize=CHUNK_SIZE, row_func=_row_func, - schema="ChunksToDisk", - tmpdir=tmpdir, - chunk_size=CHUNK_SIZE ) + + arrow = parse_stac_items_to_arrow(items, chunk_size=CHUNK_SIZE, schema="ChunksToDisk") + to_parquet( arrow, output_path, From b17ed50d6e225322d8f5291c68ccea0b3a7abdc7 Mon Sep 17 00:00:00 2001 From: Liam Morrison Date: Tue, 28 Oct 2025 15:44:19 -0700 Subject: [PATCH 10/14] ensure passing of tmpdir --- datasets/stac-geoparquet/pc_stac_geoparquet.py | 7 ++++++- datasets/stac-geoparquet/workflow.yaml | 2 +- datasets/stac-geoparquet/workflow_test.yaml | 2 +- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/datasets/stac-geoparquet/pc_stac_geoparquet.py b/datasets/stac-geoparquet/pc_stac_geoparquet.py index 08060597..2bb1df33 100644 --- a/datasets/stac-geoparquet/pc_stac_geoparquet.py +++ b/datasets/stac-geoparquet/pc_stac_geoparquet.py @@ -353,7 +353,12 @@ def _row_func(item: dict[str, Any]) -> dict[str, Any]: row_func=_row_func, ) - arrow = parse_stac_items_to_arrow(items, chunk_size=CHUNK_SIZE, schema="ChunksToDisk") + arrow = parse_stac_items_to_arrow( + items, + chunk_size=CHUNK_SIZE, + schema="ChunksToDisk", + tmpdir=tmpdir, + ) to_parquet( arrow, diff --git a/datasets/stac-geoparquet/workflow.yaml b/datasets/stac-geoparquet/workflow.yaml index 8597e3c3..3b6c9281 100644 --- a/datasets/stac-geoparquet/workflow.yaml +++ b/datasets/stac-geoparquet/workflow.yaml @@ -6,7 +6,7 @@ jobs: geoparquet: tasks: - id: update - image: pccomponents.azurecr.io/pctasks-stac-geoparquet:2025.10.28.0 + image: pccomponents.azurecr.io/pctasks-stac-geoparquet:2025.10.28.2 code: src: ${{ local.path(pc_stac_geoparquet.py) }} requirements: ${{ local.path(./requirements.txt) }} diff --git a/datasets/stac-geoparquet/workflow_test.yaml b/datasets/stac-geoparquet/workflow_test.yaml index 2223d546..4a4d394b 100644 --- a/datasets/stac-geoparquet/workflow_test.yaml +++ b/datasets/stac-geoparquet/workflow_test.yaml @@ -6,7 +6,7 @@ jobs: stac: tasks: - id: create - image: pccomponentstest.azurecr.io/pctasks-stac-geoparquet:2025.10.28.0 + image: pccomponentstest.azurecr.io/pctasks-stac-geoparquet:2025.10.28.2 code: src: ${{ local.path(pc_stac_geoparquet.py) }} task: pc_stac_geoparquet:StacGeoparquetTask From 5f837e6435e752fa97e78f2d52ae47d2370d2cf2 Mon Sep 17 00:00:00 2001 From: Liam Morrison Date: Thu, 30 Oct 2025 14:04:07 -0700 Subject: [PATCH 11/14] some changes based on latest main from stac-geoparquet --- datasets/stac-geoparquet/pc_stac_geoparquet.py | 14 +++++--------- datasets/stac-geoparquet/requirements.txt | 4 +++- datasets/stac-geoparquet/workflow.yaml | 2 +- datasets/stac-geoparquet/workflow_test.yaml | 2 +- 4 files changed, 10 insertions(+), 12 deletions(-) diff --git a/datasets/stac-geoparquet/pc_stac_geoparquet.py b/datasets/stac-geoparquet/pc_stac_geoparquet.py index 2bb1df33..aa6416a4 100644 --- a/datasets/stac-geoparquet/pc_stac_geoparquet.py +++ b/datasets/stac-geoparquet/pc_stac_geoparquet.py @@ -26,7 +26,8 @@ from stac_geoparquet.pgstac_reader import ( get_pgstac_partitions, Partition, - pgstac_to_iter + pgstac_to_arrow, + pgstac_to_iter, ) from pctasks.core.models.base import PCBaseModel @@ -344,20 +345,15 @@ def _row_func(item: dict[str, Any]) -> dict[str, Any]: ): logger.info(f"Running parquet export with chunk size of {CHUNK_SIZE}") with tempfile.TemporaryDirectory() as tmpdir: - items = pgstac_to_iter( - conninfo, + arrow = pgstac_to_arrow( + conninfo=conninfo, collection=self.collection_id, start_datetime=start_datetime, end_datetime=end_datetime, - cursor_itersize=CHUNK_SIZE, row_func=_row_func, - ) - - arrow = parse_stac_items_to_arrow( - items, - chunk_size=CHUNK_SIZE, schema="ChunksToDisk", tmpdir=tmpdir, + chunk_size=CHUNK_SIZE ) to_parquet( diff --git a/datasets/stac-geoparquet/requirements.txt b/datasets/stac-geoparquet/requirements.txt index 98834a5c..3efc9534 100644 --- a/datasets/stac-geoparquet/requirements.txt +++ b/datasets/stac-geoparquet/requirements.txt @@ -1,5 +1,5 @@ # stac-geoparquet[pgstac,pc]==0.6.0 -git+https://github.com/stac-utils/stac-geoparquet.git@3efa35bdd9d467334f260395d5edc9347fe45efb#egg=stac-geoparquet[pgstac,pc] +git+https://github.com/stac-utils/stac-geoparquet.git@c3b7c324ad0a2ef3c09d3d8c246817c943075fe1#egg=stac-geoparquet[pgstac,pc] psycopg[binary,pool]==3.2.6 azure-data-tables==12.5.0 pypgstac==0.8.6 @@ -7,3 +7,5 @@ fsspec==2025.9.0 tqdm==4.67.1 adlfs==2025.8.0 azure-datalake-store==0.0.53 +pyarrow==21.0.0 +psutil diff --git a/datasets/stac-geoparquet/workflow.yaml b/datasets/stac-geoparquet/workflow.yaml index 3b6c9281..5f9cc67d 100644 --- a/datasets/stac-geoparquet/workflow.yaml +++ b/datasets/stac-geoparquet/workflow.yaml @@ -6,7 +6,7 @@ jobs: geoparquet: tasks: - id: update - image: pccomponents.azurecr.io/pctasks-stac-geoparquet:2025.10.28.2 + image: pccomponents.azurecr.io/pctasks-stac-geoparquet:2025.10.28.5 code: src: ${{ local.path(pc_stac_geoparquet.py) }} requirements: ${{ local.path(./requirements.txt) }} diff --git a/datasets/stac-geoparquet/workflow_test.yaml b/datasets/stac-geoparquet/workflow_test.yaml index 4a4d394b..abd21125 100644 --- a/datasets/stac-geoparquet/workflow_test.yaml +++ b/datasets/stac-geoparquet/workflow_test.yaml @@ -6,7 +6,7 @@ jobs: stac: tasks: - id: create - image: pccomponentstest.azurecr.io/pctasks-stac-geoparquet:2025.10.28.2 + image: pccomponentstest.azurecr.io/pctasks-stac-geoparquet:2025.10.28.5 code: src: ${{ local.path(pc_stac_geoparquet.py) }} task: pc_stac_geoparquet:StacGeoparquetTask From a9d5528d49c7be36a05acf2f6e1d41b378b8823b Mon Sep 17 00:00:00 2001 From: Liam Morrison Date: Fri, 31 Oct 2025 11:09:22 -0700 Subject: [PATCH 12/14] export of all collections --- datasets/stac-geoparquet/workflow.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datasets/stac-geoparquet/workflow.yaml b/datasets/stac-geoparquet/workflow.yaml index 5f9cc67d..d5344b0e 100644 --- a/datasets/stac-geoparquet/workflow.yaml +++ b/datasets/stac-geoparquet/workflow.yaml @@ -19,7 +19,7 @@ jobs: table_account_url: "https://pcapi.table.core.windows.net" table_name: "greencollectionconfig" storage_options_account_name: "pcstacitems" - collections: "sentinel-2-l2a" # Set if you want to generate only one geoparquet file + # collections: "sentinel-2-l2a" # Set if you want to generate only one geoparquet file extra_skip: - "chesapeake-lc-13" - "chesapeake-lc-7" From 64184dc10ce5b871d6a5840e8260985ace76da68 Mon Sep 17 00:00:00 2001 From: Liam Morrison Date: Fri, 31 Oct 2025 11:11:48 -0700 Subject: [PATCH 13/14] remove comments from dockerfile --- datasets/stac-geoparquet/Dockerfile | 8 -------- 1 file changed, 8 deletions(-) diff --git a/datasets/stac-geoparquet/Dockerfile b/datasets/stac-geoparquet/Dockerfile index df6f2f87..f68deb0c 100644 --- a/datasets/stac-geoparquet/Dockerfile +++ b/datasets/stac-geoparquet/Dockerfile @@ -39,14 +39,6 @@ COPY pctasks/client /opt/src/pctasks/client RUN cd /opt/src/pctasks/client && \ uv pip install . -# COPY pctasks/ingest /opt/src/pctasks/ingest -# RUN cd /opt/src/pctasks/ingest && \ -# uv pip install . - -# COPY pctasks/dataset /opt/src/pctasks/dataset -# RUN cd /opt/src/pctasks/dataset && \ -# uv pip install . - COPY datasets/stac-geoparquet /opt/src/datasets/stac-geoparquet RUN uv pip install -r /opt/src/datasets/stac-geoparquet/requirements.txt From af045284f7bda61f04be71d3f4805b28159368c4 Mon Sep 17 00:00:00 2001 From: liamlego <91280476+liamlego@users.noreply.github.com> Date: Fri, 31 Oct 2025 11:39:17 -0700 Subject: [PATCH 14/14] Apply suggestions from code review committing copilot suggestions Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- datasets/stac-geoparquet/pc_stac_geoparquet.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/datasets/stac-geoparquet/pc_stac_geoparquet.py b/datasets/stac-geoparquet/pc_stac_geoparquet.py index 0e626493..39991cb0 100644 --- a/datasets/stac-geoparquet/pc_stac_geoparquet.py +++ b/datasets/stac-geoparquet/pc_stac_geoparquet.py @@ -238,7 +238,7 @@ def inject_assets(item: dict[str, Any], render_config: str | None) -> dict[str, def naip_year_to_int(item: dict[str, Any]) -> dict[str, Any]: """Convert the year to an integer.""" if "naip:year" in item["properties"] and isinstance(item["properties"]["naip:year"], str): - item["properties"]["naip:year"] = int(item["properties"]["naip:year"]) + item["properties"]["naip:year"] = int(item["properties"]["naip:year"]) return item def clean_item(item: dict[str, Any], render_config: str | None) -> dict[str, Any]: @@ -737,10 +737,6 @@ def run( "STAC_GEOPARQUET_TABLE_CREDENTIAL", azure.identity.DefaultAzureCredential() ) assert table_credential is not None - table_name = table_name or os.environ["STAC_GEOPARQUET_TABLE_NAME"] - table_account_url = ( - table_account_url or os.environ["STAC_GEOPARQUET_TABLE_ACCOUNT_URL"] - ) storage_options_account_name = ( storage_options_account_name or os.environ["STAC_GEOPARQUET_STORAGE_OPTIONS_ACCOUNT_NAME"] @@ -766,7 +762,7 @@ def run( logger.info(f"Found {len(collection_partitions)} pgstac partitions") for i, config in enumerate(configs.values(), 1): - output_path=f"items/{config.collection_id}.parquet" + output_path = f"items/{config.collection_id}.parquet" try: t0 = time.monotonic() config.export_collection(