Skip to content

Conversation

treadyaparna
Copy link

@treadyaparna treadyaparna commented Sep 14, 2025

Issue:

#405

Description of changes:

This PR introduces a comprehensive Distributed Data Stream Aggregator workflow that demonstrates large-scale data aggregation from multiple third-party locations using AWS Step Functions' distributed processing capabilities.

A key highlight of this solution is its unique no-Lambda approach, making it a low-code (almost no-code) architecture — with minimal coding required only in the AWS Glue job for final consolidation.

Key Features:

  • 3-Tier Architecture: Main orchestrator + Standard execution child + Express execution child
  • Distributed Processing: Parallel processing of multiple data sources with configurable concurrency
  • Scalable Pagination: Handles large datasets with limit/offset pagination strategy
  • Data Consolidation: AWS Glue integration for combining partial files into final output
  • Robust Error Handling: Comprehensive retry logic and graceful failure management
  • Low-Code Implementation: Minimal reliance on custom code; orchestration is entirely achieved with native Step Functions integrations

Technical Implementation:

  • JSONata Query Language: Modern state machine definitions with advanced data manipulation
  • HTTP Integration: Secure third-party API connections via EventBridge
  • Storage Strategy: Organized S3 temporary storage with task-based directory structure
  • Status Tracking: DynamoDB integration for task lifecycle management
  • No-Lambda Pattern: Achieves distributed data aggregation without requiring custom Lambda functions

Use Cases:

  • Multi-source data aggregation
  • Third-party API data consolidation
  • Large-scale ETL operations
  • Distributed data processing workflows

Why this matters:

  • Reduced Operational Overhead: Eliminates the need to manage, patch, and scale Lambda functions.
  • Lower Costs: Step Functions native integrations avoid Lambda invocation charges.
  • Simplified Maintenance: A low-code architecture that is easier to evolve and extend.
  • Future-Ready: Leverages AWS-managed services for scalability, reliability, and modernization without custom infrastructure.

@treadyaparna treadyaparna marked this pull request as draft September 14, 2025 16:37
@treadyaparna treadyaparna force-pushed the distributed-data-stream-aggregator branch from 4c402da to 91fcbdb Compare September 14, 2025 17:23
@treadyaparna treadyaparna marked this pull request as ready for review September 14, 2025 17:24
@treadyaparna treadyaparna changed the title feat: Distributed Data Stream Aggregator workflow with 3-tier architecture New Workflow Submit: Distributed Data Stream Aggregator workflow with 3-tier architecture Sep 15, 2025
Copy link
Contributor

@bfreiberg bfreiberg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, thanks for your submission. The workflow seems targeted to specific circumstances, e.g. a summary API endpoint and Glue script. I'm not sure what the added benefit over using something like distributed map directly is?

--command Name=glueetl,ScriptLocation=s3://your-bucket/glue-script.py
```

6. Create HTTP connections for third-party API access:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would make more sense to link to the documentation here instead as you don't target a specific API. Also to highlight that there are other auth methods as well

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I have included these details.

aws glue create-job \
--name data-aggregation-job \
--role arn:aws:iam::YOUR_ACCOUNT:role/GlueServiceRole \
--command Name=glueetl,ScriptLocation=s3://your-bucket/glue-script.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What script are you referring to here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have uploaded the glue job file.

### Data Processing Workflow (Express Execution)
The express workflow handles the actual API calls to third-party endpoints. It receives location details, data type, and pagination parameters, makes HTTP calls with query parameters, formats the retrieved data into standardized JSON format, and returns results with count and pagination metadata.

### Data Consolidation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't Step Functions Distributed Map be an alternative to this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The aggregation phase is a reduce step across all items, which the Distributed Map isn’t designed to handle directly. I write per-item results as part files and use a Glue job to merge them. This also helps avoid Step Functions’ 256 KB state payload limit by keeping large intermediate data out of the state.

"Type": "Map",
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "DISTRIBUTED",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why use a distributed map here, how many locations do you expect to query in parallel?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose a Distributed Map to keep the parent execution lightweight and suited for large workloads, with room to increase concurrency later through configuration rather than a redesign. In load tests with ~1,000 locations (MaxConcurrency=1000), it performed reliably, and this approach gives me headroom to scale further if needed.

"Label": "IterateContainers",
"MaxConcurrency": 5,
"ItemBatcher": {
"MaxItemsPerBatch": 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why enable batching but set it to 1 item per batch then?

Copy link
Author

@treadyaparna treadyaparna Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I aim to handle large-scale data.
I am using Distributed Map to keep the parent execution’s history small. An Inline Map records every per-item step in the parent, which can push a Standard workflow toward the 25,000-event limit at scale. With Distributed Map, each item runs as its own child execution, so the parent stays lightweight—even with large fan-out.

},
"Next": "Combine Part Files",
"Label": "IterateContainers",
"MaxConcurrency": 5,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you only need a concurrency of 5, why not use inline map mode?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I set MaxConcurrency to 5 here to respect downstream rate limits. However, I chose a Distributed Map to keep the parent execution light, to get per-item isolation for our multi-step logic, and to leave room to increase concurrency later without redesign. I’ve load-tested ~1,000 items in parallel successfully; this PR uses 5 simply as a starting point.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it be really helpful to include this in the README

7. Deploy the state machines by updating the placeholder values in each ASL file:
- Replace `'s3-bucket-name'` with your source bucket name
- Replace `'destination_bucket'` with your destination bucket name
- Replace `'api_endpoint'` and `'summary_api_endpoint'` with your API URLs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this summary_api_endpoint?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point. summary_api_endpoint is the URL the Get Location Summary state uses to retrieve each location summary. I previously switched ARNs to static names and missed updating this reference. I’ve corrected the endpoint now.

"--output_file_name": "{% $outputFileName %}"
}
},
"Next": "Wait for the status",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do a wait loop instead of invoking Glue synchronously?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Glue can be invoked synchronously. I chose a small wait/poll loop to allow a tunable polling interval, a dedicated timeout/circuit breaker, and explicit handling of intermediate states with retries/backoff.

If you prefer the .sync pattern for simplicity and fewer states, I’m happy to switch—both approaches work; this just gave me finer control. 😄

@treadyaparna
Copy link
Author

Hi @bfreiberg Thank you for your review. I have addressed all the comments and look forward to your response.

@bfreiberg
Copy link
Contributor

Thanks for your quick updates. The PR looks good now. Thank you for your contribution. Your workflow will be added to Serverlessland soon.

},
"Next": "Combine Part Files",
"Label": "IterateContainers",
"MaxConcurrency": 5,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it be really helpful to include this in the README

@treadyaparna
Copy link
Author

treadyaparna commented Sep 24, 2025

@bfreiberg Thank you for the approval. I’ll update the README in the following PR and look forward to the workflow being added to Serverlessland.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants