Skip to content

Commit 267dabc

Browse files
committed
feat(typescript): add missing CDK stack file for airflow-lambda-dynamodb-approval
- Add lib/aws-mwaa-cdk-stack.ts with complete MWAA infrastructure - Update .gitignore to track TypeScript source files in lib/ - Stack includes VPC, S3, Lambda, DynamoDB, and MWAA environment - Addresses PR #1207 review feedback
1 parent 9788379 commit 267dabc

File tree

2 files changed

+366
-0
lines changed

2 files changed

+366
-0
lines changed

typescript/airflow-lambda-dynamodb-approval/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ downloads/
2020
eggs/
2121
.eggs/
2222
lib/
23+
!lib/*.ts
2324
lib64/
2425
parts/
2526
sdist/
Lines changed: 365 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,365 @@
1+
import { Stack, StackProps, RemovalPolicy, CfnOutput, Duration } from 'aws-cdk-lib';
2+
import * as s3 from 'aws-cdk-lib/aws-s3';
3+
import * as s3deploy from 'aws-cdk-lib/aws-s3-deployment';
4+
import * as iam from 'aws-cdk-lib/aws-iam';
5+
import * as mwaa from 'aws-cdk-lib/aws-mwaa';
6+
import * as ec2 from 'aws-cdk-lib/aws-ec2';
7+
import * as lambda from 'aws-cdk-lib/aws-lambda';
8+
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
9+
import { Construct } from 'constructs';
10+
11+
export class AwsMwaaCdkStack extends Stack {
12+
constructor(scope: Construct, id: string, props?: StackProps) {
13+
super(scope, id, props);
14+
15+
// 1. Create VPC for MWAA (required)
16+
const vpc = new ec2.Vpc(this, 'MwaaVpc', {
17+
maxAzs: 2,
18+
natGateways: 1,
19+
subnetConfiguration: [
20+
{
21+
cidrMask: 24,
22+
name: 'public',
23+
subnetType: ec2.SubnetType.PUBLIC,
24+
},
25+
{
26+
cidrMask: 24,
27+
name: 'private',
28+
subnetType: ec2.SubnetType.PRIVATE_WITH_EGRESS,
29+
}
30+
]
31+
});
32+
33+
// 2. Create S3 bucket for MWAA DAGs
34+
const dagsBucket = new s3.Bucket(this, 'MwaaDagsBucket', {
35+
bucketName: `mwaa-dags-${this.account}-${this.region}`,
36+
versioned: true,
37+
removalPolicy: RemovalPolicy.DESTROY, // For demo purposes
38+
autoDeleteObjects: true, // For demo purposes
39+
});
40+
41+
// 3. Upload local dags folder to the S3 bucket
42+
new s3deploy.BucketDeployment(this, 'DeployDags', {
43+
sources: [s3deploy.Source.asset('./dags')],
44+
destinationBucket: dagsBucket,
45+
destinationKeyPrefix: 'dags',
46+
});
47+
48+
// 4. Create Demo Lambda Function for MWAA testing (original simple demo)
49+
const demoLambdaFunction = new lambda.Function(this, 'MwaaTestLambda', {
50+
functionName: 'mwaa-demo-function',
51+
runtime: lambda.Runtime.PYTHON_3_9,
52+
handler: 'index.lambda_handler',
53+
code: lambda.Code.fromInline(`
54+
import json
55+
import datetime
56+
57+
def lambda_handler(event, context):
58+
"""
59+
Demo Lambda function for MWAA integration testing
60+
Processes data sent from Airflow and returns a response
61+
"""
62+
63+
# Log the incoming event
64+
print(f"Received event: {json.dumps(event)}")
65+
66+
# Extract Airflow context if present
67+
airflow_context = {
68+
'dag_id': event.get('dag_id', 'unknown'),
69+
'task_id': event.get('task_id', 'unknown'),
70+
'timestamp': event.get('timestamp', 'unknown'),
71+
'message': event.get('message', 'Hello from Lambda!')
72+
}
73+
74+
# Process the data (simple example)
75+
processed_data = {
76+
'status': 'success',
77+
'processed_at': datetime.datetime.now().isoformat(),
78+
'original_message': airflow_context['message'],
79+
'airflow_context': airflow_context,
80+
'lambda_function_name': context.function_name,
81+
'lambda_request_id': context.aws_request_id,
82+
'processed_by': 'mwaa-demo-lambda'
83+
}
84+
85+
# Return response
86+
response = {
87+
'statusCode': 200,
88+
'body': json.dumps(processed_data)
89+
}
90+
91+
print(f"Returning response: {json.dumps(response)}")
92+
return response
93+
`),
94+
timeout: Duration.seconds(30),
95+
memorySize: 128,
96+
description: 'Demo Lambda function for MWAA Airflow integration testing'
97+
});
98+
99+
// 4. Create DynamoDB Approval Table for Human Approval Workflows
100+
const approvalTable = new dynamodb.Table(this, 'ApprovalTable', {
101+
tableName: `mwaa-approval-table-${this.region}`,
102+
partitionKey: {
103+
name: 'id',
104+
type: dynamodb.AttributeType.STRING
105+
},
106+
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
107+
removalPolicy: RemovalPolicy.DESTROY, // For demo purposes
108+
pointInTimeRecovery: false, // For demo purposes
109+
encryption: dynamodb.TableEncryption.AWS_MANAGED,
110+
});
111+
112+
// Add GSI for querying by status and timestamp
113+
approvalTable.addGlobalSecondaryIndex({
114+
indexName: 'approval_status-index',
115+
partitionKey: {
116+
name: 'approval_status',
117+
type: dynamodb.AttributeType.STRING
118+
},
119+
sortKey: {
120+
name: 'created_at',
121+
type: dynamodb.AttributeType.STRING
122+
}
123+
});
124+
125+
// 5. Create MWAA Execution Role
126+
const mwaaExecutionRole = new iam.Role(this, 'MwaaExecutionRole', {
127+
assumedBy: new iam.ServicePrincipal('airflow-env.amazonaws.com'),
128+
inlinePolicies: {
129+
MwaaExecutionRolePolicy: new iam.PolicyDocument({
130+
statements: [
131+
// S3 permissions for DAG bucket
132+
new iam.PolicyStatement({
133+
effect: iam.Effect.ALLOW,
134+
actions: [
135+
's3:GetObject*',
136+
's3:GetBucket*',
137+
's3:List*'
138+
],
139+
resources: [
140+
dagsBucket.bucketArn,
141+
`${dagsBucket.bucketArn}/*`
142+
]
143+
}),
144+
// CloudWatch Logs permissions - Comprehensive MWAA logging access
145+
new iam.PolicyStatement({
146+
effect: iam.Effect.ALLOW,
147+
actions: [
148+
'logs:CreateLogStream',
149+
'logs:CreateLogGroup',
150+
'logs:PutLogEvents',
151+
'logs:GetLogEvents',
152+
'logs:GetLogRecord',
153+
'logs:GetLogGroupFields',
154+
'logs:GetQueryResults',
155+
'logs:DescribeLogGroups',
156+
'logs:DescribeLogStreams',
157+
'logs:DescribeDestinations',
158+
'logs:DescribeExportTasks',
159+
'logs:DescribeMetricFilters',
160+
'logs:DescribeQueries',
161+
'logs:DescribeResourcePolicies',
162+
'logs:DescribeSubscriptionFilters',
163+
'logs:FilterLogEvents',
164+
'logs:StartQuery',
165+
'logs:StopQuery'
166+
],
167+
resources: [
168+
`arn:aws:logs:${this.region}:${this.account}:log-group:airflow-*`,
169+
`arn:aws:logs:${this.region}:${this.account}:log-group:/aws/mwaa/*`,
170+
`arn:aws:logs:${this.region}:${this.account}:log-group:*airflow*`,
171+
`arn:aws:logs:${this.region}:${this.account}:*`
172+
]
173+
}),
174+
// CloudWatch Metrics permissions - Comprehensive access
175+
new iam.PolicyStatement({
176+
effect: iam.Effect.ALLOW,
177+
actions: [
178+
'cloudwatch:PutMetricData',
179+
'cloudwatch:GetMetricStatistics',
180+
'cloudwatch:GetMetricData',
181+
'cloudwatch:ListMetrics',
182+
'cloudwatch:DescribeAlarms',
183+
'cloudwatch:DescribeAlarmsForMetric',
184+
'cloudwatch:GetDashboard',
185+
'cloudwatch:ListDashboards'
186+
],
187+
resources: ['*']
188+
}),
189+
// SQS permissions for Airflow
190+
new iam.PolicyStatement({
191+
effect: iam.Effect.ALLOW,
192+
actions: [
193+
'sqs:ChangeMessageVisibility',
194+
'sqs:DeleteMessage',
195+
'sqs:GetQueueAttributes',
196+
'sqs:GetQueueUrl',
197+
'sqs:ReceiveMessage',
198+
'sqs:SendMessage'
199+
],
200+
resources: [
201+
`arn:aws:sqs:${this.region}:*:airflow-celery-*`
202+
]
203+
}),
204+
// KMS permissions for encryption
205+
new iam.PolicyStatement({
206+
effect: iam.Effect.ALLOW,
207+
actions: [
208+
'kms:Decrypt',
209+
'kms:DescribeKey',
210+
'kms:GenerateDataKey*',
211+
'kms:Encrypt'
212+
],
213+
resources: ['*'],
214+
conditions: {
215+
StringEquals: {
216+
'kms:ViaService': [
217+
`sqs.${this.region}.amazonaws.com`,
218+
`s3.${this.region}.amazonaws.com`
219+
]
220+
}
221+
}
222+
})
223+
]
224+
}),
225+
LambdaExecutionPolicy: new iam.PolicyDocument({
226+
statements: [
227+
new iam.PolicyStatement({
228+
effect: iam.Effect.ALLOW,
229+
actions: [
230+
'lambda:InvokeFunction',
231+
'lambda:GetFunction',
232+
'lambda:ListFunctions'
233+
],
234+
resources: ['*'], // Allows access to all Lambda functions in the account
235+
sid: 'LambdaInvokePermissions'
236+
})
237+
]
238+
}),
239+
DynamoDBAccessPolicy: new iam.PolicyDocument({
240+
statements: [
241+
new iam.PolicyStatement({
242+
effect: iam.Effect.ALLOW,
243+
actions: [
244+
'dynamodb:GetItem',
245+
'dynamodb:PutItem',
246+
'dynamodb:UpdateItem',
247+
'dynamodb:DeleteItem',
248+
'dynamodb:Query',
249+
'dynamodb:Scan',
250+
'dynamodb:BatchGetItem',
251+
'dynamodb:BatchWriteItem',
252+
'dynamodb:DescribeTable'
253+
],
254+
resources: [
255+
approvalTable.tableArn,
256+
`${approvalTable.tableArn}/index/*`
257+
],
258+
sid: 'DynamoDBAccessPermissions'
259+
})
260+
]
261+
})
262+
}
263+
});
264+
265+
// Grant S3 permissions to the role
266+
dagsBucket.grantReadWrite(mwaaExecutionRole);
267+
268+
// 5. Create Security Group for MWAA
269+
const mwaaSecurityGroup = new ec2.SecurityGroup(this, 'MwaaSecurityGroup', {
270+
vpc,
271+
description: 'Security group for MWAA environment',
272+
allowAllOutbound: true,
273+
});
274+
275+
// Allow inbound traffic within the security group
276+
mwaaSecurityGroup.addIngressRule(
277+
mwaaSecurityGroup,
278+
ec2.Port.allTraffic(),
279+
'Allow all traffic within security group'
280+
);
281+
282+
// 6. Create MWAA Environment
283+
const mwaaEnvironment = new mwaa.CfnEnvironment(this, 'MwaaEnvironment', {
284+
name: 'MyMWAAEnvironment',
285+
dagS3Path: 'dags',
286+
executionRoleArn: mwaaExecutionRole.roleArn,
287+
sourceBucketArn: dagsBucket.bucketArn,
288+
289+
// Network configuration
290+
networkConfiguration: {
291+
subnetIds: vpc.privateSubnets.map(subnet => subnet.subnetId),
292+
securityGroupIds: [mwaaSecurityGroup.securityGroupId],
293+
},
294+
295+
// Environment configuration
296+
environmentClass: 'mw1.small', // Smallest size for cost efficiency
297+
maxWorkers: 2,
298+
minWorkers: 1,
299+
300+
// Airflow configuration
301+
airflowVersion: '2.7.2',
302+
webserverAccessMode: 'PUBLIC_ONLY',
303+
304+
// Environment variables for DAGs
305+
airflowConfigurationOptions: {
306+
'core.default_timezone': 'UTC',
307+
'webserver.expose_config': 'True'
308+
},
309+
310+
// Resource names are automatically discovered by DAGs
311+
// DAGs use the same naming patterns as CDK to find resources automatically
312+
// No manual configuration of Airflow Variables required
313+
314+
// Logging configuration
315+
loggingConfiguration: {
316+
dagProcessingLogs: {
317+
enabled: true,
318+
logLevel: 'INFO'
319+
},
320+
schedulerLogs: {
321+
enabled: true,
322+
logLevel: 'INFO'
323+
},
324+
taskLogs: {
325+
enabled: true,
326+
logLevel: 'INFO'
327+
},
328+
webserverLogs: {
329+
enabled: true,
330+
logLevel: 'INFO'
331+
},
332+
workerLogs: {
333+
enabled: true,
334+
logLevel: 'INFO'
335+
}
336+
}
337+
});
338+
339+
// 7. Output important information
340+
new CfnOutput(this, 'S3BucketName', {
341+
value: dagsBucket.bucketName,
342+
description: 'Name of the S3 bucket containing DAG files'
343+
});
344+
345+
new CfnOutput(this, 'MwaaEnvironmentName', {
346+
value: mwaaEnvironment.name!,
347+
description: 'Name of the MWAA environment'
348+
});
349+
350+
new CfnOutput(this, 'MwaaWebServerUrl', {
351+
value: `https://${mwaaEnvironment.attrWebserverUrl}`,
352+
description: 'MWAA Airflow Web Server URL'
353+
});
354+
355+
new CfnOutput(this, 'DemoLambdaFunctionName', {
356+
value: demoLambdaFunction.functionName,
357+
description: 'Name of the demo Lambda function for MWAA testing'
358+
});
359+
360+
new CfnOutput(this, 'ApprovalTableName', {
361+
value: approvalTable.tableName,
362+
description: 'Name of the DynamoDB approval table for human approval workflows'
363+
});
364+
}
365+
}

0 commit comments

Comments
 (0)