Skip to content
221 changes: 186 additions & 35 deletions aws_lambda_powertools/utilities/parser/models/appsync.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,108 @@
from typing import Any, Dict, List, Optional, Union

from pydantic import BaseModel
from pydantic import BaseModel, Field


class AppSyncIamIdentity(BaseModel):
accountId: str
cognitoIdentityPoolId: Optional[str]
cognitoIdentityId: Optional[str]
sourceIp: List[str]
username: str
userArn: str
cognitoIdentityAuthType: Optional[str]
cognitoIdentityAuthProvider: Optional[str]
accountId: str = Field(description="The AWS account ID of the caller.", examples=["123456789012", "666666666666"])
cognitoIdentityPoolId: Optional[str] = Field(
default=None,
description="The Amazon Cognito identity pool ID associated with the caller.",
examples=["us-east-1:12345678-1234-1234-1234-123456789012", None],
)
cognitoIdentityId: Optional[str] = Field(
default=None,
description="The Amazon Cognito identity ID of the caller.",
examples=["us-east-1:12345678-1234-1234-1234-123456789012", None],
)
sourceIp: List[str] = Field(
description=(
"The source IP address of the caller that AWS AppSync receives. "
"If the request includes a x-forwarded-for header, this is a list of IP addresses."
),
)
username: str = Field(
description="The IAM user principal name.", examples=["AIDAAAAAAAAAAAAAAAAAA", "appsync-user"],
)
userArn: str = Field(
description="The Amazon Resource Name (ARN) of the IAM user.",
examples=["arn:aws:iam::123456789012:user/appsync", "arn:aws:iam::123456789012:user/service-user"],
)
cognitoIdentityAuthType: Optional[str] = Field(
default=None,
description="Either authenticated or unauthenticated based on the identity type.",
examples=["authenticated", "unauthenticated", None],
)
cognitoIdentityAuthProvider: Optional[str] = Field(
default=None,
description=(
"A comma-separated list of external identity provider information "
"used in obtaining the credentials used to sign the request."
),
examples=[
"cognito-idp.us-east-1.amazonaws.com/us-east-1_POOL_ID",
"graph.facebook.com,cognito-idp.us-east-1.amazonaws.com/us-east-1_POOL_ID",
None,
],
)


class AppSyncCognitoIdentity(BaseModel):
sub: str
issuer: str
username: str
claims: Dict[str, Any]
sourceIp: List[str]
defaultAuthStrategy: str
groups: Optional[List[str]]
sub: str = Field(
description="The UUID of the authenticated user from Cognito User Pool.",
examples=["user-uuid-1234-5678-9012-123456789012", "user-uuid-abcd-efgh-ijkl-mnopqrstuvwx"],
)
issuer: str = Field(
description="The token issuer URL from Cognito User Pool.",
examples=[
"https://cognito-idp.us-east-1.amazonaws.com/us-east-1_POOL_ID",
"https://cognito-idp.us-west-2.amazonaws.com/us-west-xxxxxxxxxxx",
],
)
username: str = Field(
description="The username of the authenticated user (cognito:username attribute).",
examples=["mike", "jdoe", "user123"],
)
claims: Dict[str, Any] = Field(description="The JWT claims that the user has from Cognito User Pool.")
sourceIp: List[str] = Field(
description=(
"The source IP address of the caller that AWS AppSync receives. "
"If the request includes a x-forwarded-for header, this is a list of IP addresses."
),
)
defaultAuthStrategy: str = Field(
description="The default authorization strategy for this caller (ALLOW or DENY).", examples=["ALLOW", "DENY"],
)
groups: Optional[List[str]] = Field(
default=None,
description="The Cognito User Pool groups that the user belongs to.",
examples=[["admin", "users"], ["developers"], None],
)


class AppSyncOidcIdentity(BaseModel):
claims: Dict[str, Any]
issuer: str
sub: str
claims: Dict[str, Any] = Field(description="The JWT claims from the OpenID Connect provider.")
issuer: str = Field(
description="The token issuer URL from the OpenID Connect provider.",
examples=["https://accounts.google.com", "https://login.microsoftonline.com/tenant-id/v2.0"],
)
sub: str = Field(
description="The subject identifier from the OpenID Connect provider.",
examples=["248289761001", "provider-subject-identifier"],
)


class AppSyncLambdaIdentity(BaseModel):
resolverContext: Dict[str, Any]
resolverContext: Dict[str, Any] = Field(
description=(
"The resolver context returned by the Lambda function authorizing the request. "
"Contains custom authorization data from AWS_LAMBDA authorization."
),
examples=[
{"userId": "user123", "role": "admin", "permissions": ["read", "write"]},
{"customClaim": "value", "authLevel": "premium"},
],
)


AppSyncIdentity = Union[
Expand All @@ -43,30 +114,110 @@ class AppSyncLambdaIdentity(BaseModel):


class AppSyncRequestModel(BaseModel):
domainName: Optional[str]
headers: Dict[str, str]
domainName: Optional[str] = Field(
default=None,
description=(
"The custom domain name used to access the GraphQL endpoint. "
"Returns null when using the default GraphQL endpoint domain name."
),
examples=["api.example.com", "graphql.mycompany.com", None],
)
headers: Dict[str, str] = Field(
description="HTTP headers from the GraphQL request, including custom headers.",
examples=[
{
"cloudfront-viewer-country": "US",
"host": "example.appsync-api.us-east-1.amazonaws.com",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
"content-type": "application/json",
},
],
)


class AppSyncInfoModel(BaseModel):
selectionSetList: List[str]
selectionSetGraphQL: str
parentTypeName: str
fieldName: str
variables: Dict[str, Any]
selectionSetList: List[str] = Field(
description=(
"A list representation of the fields in the GraphQL selection set. "
"Fields that are aliased are referenced only by the alias name."
),
examples=[["id", "field1", "field2"], ["postId", "title", "content", "author", "author/id", "author/name"]],
)
selectionSetGraphQL: str = Field(
description=(
"A string representation of the selection set, formatted as GraphQL SDL. "
"Inline fragments are preserved but fragments are not merged."
),
examples=[
"{\n id\n field1\n field2\n}",
"{\n postId\n title\n content\n author {\n id\n name\n }\n}",
],
)
parentTypeName: str = Field(
description="The name of the parent type for the field that is currently being resolved.",
examples=["Query", "Mutation", "Subscription", "User", "Post"],
)
fieldName: str = Field(
description="The name of the field that is currently being resolved.",
examples=["getUser", "createPost", "locations", "updateProfile"],
)
variables: Dict[str, Any] = Field(
description="A map which holds all variables that are passed into the GraphQL request.",
examples=[{"userId": "123", "limit": 10}, {"input": {"name": "John", "email": "[email protected]"}}, {}],
)


class AppSyncPrevModel(BaseModel):
result: Dict[str, Any]
result: Dict[str, Any] = Field(
description=(
"The result of whatever previous operation was executed in a pipeline resolver. "
"Contains the output from the previous function or Before mapping template."
),
examples=[
{"userId": "123", "posts": [{"id": "1", "title": "Hello World"}]},
{"data": {"field1": "value1", "field2": "value2"}},
{},
],
)


class AppSyncResolverEventModel(BaseModel):
arguments: Dict[str, Any]
identity: Optional[AppSyncIdentity]
source: Optional[Dict[str, Any]]
request: AppSyncRequestModel
info: AppSyncInfoModel
prev: Optional[AppSyncPrevModel]
stash: Dict[str, Any]
arguments: Dict[str, Any] = Field(
description="The arguments passed to the GraphQL field.",
examples=[
{"id": "123", "limit": 10},
{"input": {"name": "John", "email": "[email protected]"}},
{"page": 2, "size": 1, "name": "value"},
{},
],
)
identity: Optional[AppSyncIdentity] = Field(
default=None, description="Information about the caller identity (authenticated user or API key).",
)
source: Optional[Dict[str, Any]] = Field(
default=None,
description="The parent object for the field. For top-level fields, this will be null.",
examples=[
None,
{"id": "user123", "name": "John Doe"},
{"name": "Value", "nested": {"name": "value", "list": []}},
{"postId": "post456", "title": "My Post"},
],
)
request: AppSyncRequestModel = Field(description="Information about the GraphQL request context.")
info: AppSyncInfoModel = Field(
description="Information about the GraphQL request including selection set and field details.",
)
prev: Optional[AppSyncPrevModel] = Field(
default=None, description="Results from the previous resolver in a pipeline resolver.",
)
stash: Dict[str, Any] = Field(
description=(
"The stash is a map that is made available inside each resolver and function mapping template. "
"The same stash instance lives through a single resolver execution."
),
examples=[{}, {"customData": "value", "userId": "123"}],
)


AppSyncBatchResolverEventModel = List[AppSyncResolverEventModel]
82 changes: 66 additions & 16 deletions aws_lambda_powertools/utilities/parser/models/appsync_events.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,85 @@
from typing import Any, Dict, List, Literal, Optional

from pydantic import BaseModel
from pydantic import BaseModel, Field

from aws_lambda_powertools.utilities.parser.models.appsync import AppSyncIdentity, AppSyncRequestModel


class AppSyncEventsInfoChannelModel(BaseModel):
path: str
segments: List[str]
path: str = Field(
description="The full path of the AppSync Events channel.",
examples=["/default/channel", "/notifications/user-updates", "/chat/room-123"],
)
segments: List[str] = Field(
description="The path segments of the channel, split by forward slashes.",
examples=[["default", "channel"], ["notifications", "user-updates"], ["chat", "room-123"]],
)


class AppSyncEventsInfoChannelNamespaceModel(BaseModel):
name: str
name: str = Field(
description="The namespace name for the AppSync Events channel.",
examples=["default", "notifications", "chat", "user-events"],
)


class AppSyncEventsInfoModel(BaseModel):
channel: AppSyncEventsInfoChannelModel
channelNamespace: AppSyncEventsInfoChannelNamespaceModel
operation: Literal["PUBLISH", "SUBSCRIBE"]
channel: AppSyncEventsInfoChannelModel = Field(description="Information about the AppSync Events channel.")
channelNamespace: AppSyncEventsInfoChannelNamespaceModel = Field(
description="The namespace information for the channel."
)
operation: Literal["PUBLISH", "SUBSCRIBE"] = Field(
description="The type of operation being performed on the channel.", examples=["PUBLISH", "SUBSCRIBE"]
)


class AppSyncEventsEventModel(BaseModel):
id: str
payload: Dict[str, Any]
id: str = Field(
description="The unique identifier for the event.", examples=["1", "2", "event-123", "notification-456"]
)
payload: Dict[str, Any] = Field(
description="The event data payload containing the actual event information.",
examples=[
{"event_1": "data_1"},
{"event_2": "data_2"},
{"userId": "123", "action": "login", "timestamp": "2023-01-01T00:00:00Z"},
{"message": "Hello World", "type": "notification"},
],
)


class AppSyncEventsModel(BaseModel):
identity: Optional[AppSyncIdentity] = None
request: AppSyncRequestModel
info: AppSyncEventsInfoModel
prev: Optional[str] = None
outErrors: Optional[List[str]] = None
stash: Optional[Dict[str, Any]] = None
events: Optional[List[AppSyncEventsEventModel]] = None
identity: Optional[AppSyncIdentity] = Field(
default=None, description="Information about the caller identity (authenticated user or API key)."
)
request: AppSyncRequestModel = Field(description="Information about the GraphQL request context.")
info: AppSyncEventsInfoModel = Field(
description="Information about the AppSync Events operation including channel details."
)
prev: Optional[str] = Field(
default=None,
description="Results from the previous operation in a pipeline resolver.",
examples=["None", "previous-result-data"],
)
outErrors: Optional[List[str]] = Field(
default=None,
description="List of output errors that occurred during event processing.",
examples=[[], ["Error message 1", "Error message 2"], None],
)
stash: Optional[Dict[str, Any]] = Field(
default=None,
description=(
"The stash is a map that is made available inside each resolver and function mapping template. "
"The same stash instance lives through a single resolver execution."
),
examples=[{}, {"customData": "value", "userId": "123"}, None],
)
events: Optional[List[AppSyncEventsEventModel]] = Field(
default=None,
description="List of events being published or subscribed to in the AppSync Events operation.",
examples=[
[{"id": "1", "payload": {"event_1": "data_1"}}, {"id": "2", "payload": {"event_2": "data_2"}}],
[],
None,
],
)