Stactools-Pipelines is a large scale, turnkey processing framework to create STAC metadata and cloud optimized formats for data stored on S3 using stactools packages. It's goal is to make it easier for teams to develop scalable metadata pipelines by reducing the infrastructure boilerplate they need to build themselves.
- Python>=3.12
- Docker
- tox
- awscli
- An IAM role with sufficient permissions for creating, destroying and modifying the relevant stack resources.
This framework expects that a stactools package has already been created for your dataset of interest and that the dataset is stored on S3. Check out stactools-packages to find out if a package already exists or how to create your own.
This is a template repository. You can follow these instructions to create your organization's own repo from the template. Once you have created your own repository from the template, you develop your own pipelines.
To install for pipeline development run
pip install .["dev"]
An example pipeline structure for Sentinel 1 is included in the repo here to use as a reference for developing your own pipeline.
To develop a new pipeline create a directory in pipelines using a simple name for your pipeline dataset.
At a minimum you need to include
-
app.pyA Lambda application with ahandlerfunction defined which consumes anSQSEventcreates a STAC Item and posts it to theingestor. -
requirements.txtThat contains theapp.py's dependencies so a container image can be built. -
config.yamlYour pipeline's configuration settings. -
test_app.pyApytestbased unit test file which exercisesapp.py. -
collection.pyA Lambda application with ahandlerfunction which creates a STAC Collection and posts it to theingestor. -
test_collection.pyApytestbased unit test file which exercisescollection.py.
If you have pre-existing granules in your bucket and your bucket has S3 Inventory enabled, you can also optionally include functions which read the inventory to process these pre-existing granules. These include
-
historic.pyA Lambda application which queries rows from the bucket's inventory file loaded into an Athena table. -
test_historic.pyApytestbased unit test file which exerciseshistoric.py.
-
idRequired The pipeline name. This should be the same as the pipeline's parent folder and should use_s for separators to support Python module discovery. -
computeRequired Currently only theawslambdavalue is supported. -
ingestor_urlRequired The ingestor API's root path with the stage value included. -
secret_arnRequired The secret manager ARN for using a Cognito JWKS implementation with the ingestor API. -
snsOptional The SNS topic to listen to for new granule notifications. -
inventory_locationOptional The location of an S3 inventory or file listing that can be used by thepipelineto process and ingest existing granules. Include ahistoric.pyfile (and atest_historic.py) in your pipeline which implements aquery_inventory,row_to_message_bodyandhandlermethod to query the inventory and send the results to the processing queue. -
historic_frequencyOptional If aninventory_locationis included thehistoric_frequency(how often in hours thehistoric.pyis run) must also be included. A value of0indicates that thehistoric.pyfunction will run a single time on deployment and process the entire inventory. If a value of >0is specified then aninitial_chunkmust also be specified. The pipeline will build a stack which uses these values to incrementally chunk through the inventory file withcronexecutions to process until the entire inventory has been processed.
Your pipeline's app.py function receives notification messages about granules to be processed and uses the information in these messages to create new STAC items (and optionally, new cloud optimized versions of the data). You'll need to understand a bit about the strcture of the files that make up a granule in your dataset and a bit about how the SNS notifications are configured for your bucket (if they are enabled).
As an example, note that the SNS topic for the Sentinel-1 bucket uses a custom message structure that includes an id and path property describing the "directory" key rather than individual file keys like most SNS configurations. Note that the Sentinel-1 app.py has logic to correctly parse this message and that the Sentinel-1 stactools package actually expects an href value that is a "directory" rather than a file.
At a minimum the handler method in your pipeline's app.py should create a STAC item and post that item to the ingestor endpoint.
To make unit testing easier and more consistent, stactools-pipelines includes a number of pytest fixtures and mocks which you can use to isolate your handler function from external services and verify that it behaving as expected. These fixtures use some pytest parameterization magic to patch the appropriate imports in your pipeline's app.py when your tests are run.
Using the Sentinel-1 pipeline as an example. The test_handler [is decorated with[(https://docs.pytest.org/en/7.1.x/how-to/parametrize.html) the pipeline_id and module. When the test is run all the fixtures included in conftest.py patch your app.py's use of the real module allowing you to test in isolation. It is recommended to use these fixtures for unit testing and behavior verification.
To run the unit tests for your pipeline
Create an environment setting using your pipline name.
$ export PIPELINE=<Your pipeline name>
And call tox to run your unit tests
$ tox
Deploying a pipeline will use the pipeline's config.yaml to deploy all the necessary infrastructure. This includes STAC Collection and Item creation Lambdas and any queues or Athena tables that are required. A new completely separate Cloudformation stack is created for each pipeline. The pipleine being deployed is controlled by the PIPELINE environment variable. The collection.py Lambda is executed automatically by the stack after deployment in order to create the STAC collection in the target STAC API. If an sns was specified in the config.yaml the app.py Lambda will be subscribed and will begin processing notifications as soon as the stack deployment is complete.
To deploy a pipeline
Create a development virtual environment with
$ tox -e dev
$ source devenv/bin/activate
Create environment settings for your pipeline deployment
$ export PROJECT=<The project name for resource cost tracking>
$ export PIPELINE=<Your pipeline name>
With an AWS profile enabled with sufficient permissions to create your infrastructure you can deploy via
$ cdk deploy
This will create Cloudformation stack for your pipline.
To create a development virtual environment for core repository development use
$ tox -e dev
$ source devenv/bin/activate
