Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
ignore = W503,E402,E731
exclude =
.git, __pycache__, build, dist, .eggs, .github, .local,
Samples, azure/functions/_thirdparty, docs/, .venv*/, .env*/, .vscode/
Samples, azure/functions/_thirdparty, docs/, .venv*/, .env*/, .vscode/, venv
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ celerybeat-schedule
.venv*
venv/
ENV/
py3env/

# Spyder project settings
.spyderproject
Expand All @@ -103,3 +104,10 @@ ENV/

.testconfig
.pytest_cache

# mac osx specific files
.DS_Store

# PyCharm related files
.idea/
.idea_modules/
2 changes: 1 addition & 1 deletion azure/functions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from ._abc import TimerRequest, InputStream, Context, Out # NoQA
from ._eventhub import EventHubEvent # NoQA
from ._eventgrid import EventGridEvent # NoQA
from ._eventgrid import EventGridEvent, EventGridOutputEvent # NoQA
from ._cosmosdb import Document, DocumentList # NoQA
from ._http import HttpRequest # NoQA
from ._http import HttpResponse # NoQA
Expand Down
31 changes: 31 additions & 0 deletions azure/functions/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,37 @@ def data_version(self) -> str:
pass


class EventGridOutputEvent(abc.ABC):
@property
@abc.abstractmethod
def id(self) -> str:
pass

@abc.abstractmethod
def get_json(self) -> typing.Any:
pass

@property
@abc.abstractmethod
def subject(self) -> str:
pass

@property
@abc.abstractmethod
def event_type(self) -> str:
pass

@property
@abc.abstractmethod
def event_time(self) -> typing.Optional[datetime.datetime]:
pass

@property
@abc.abstractmethod
def data_version(self) -> str:
pass


class Document(abc.ABC):

@classmethod
Expand Down
49 changes: 49 additions & 0 deletions azure/functions/_eventgrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,52 @@ def __repr__(self) -> str:
f'subject={self.subject} '
f'at 0x{id(self):0x}>'
)


class EventGridOutputEvent(azf_abc.EventGridOutputEvent):
"""An EventGrid event message."""

def __init__(self, *,
id: str,
data: typing.Dict[str, object],
subject: str,
event_type: str,
event_time: typing.Optional[datetime.datetime],
data_version: str) -> None:
self.__id = id
self.__data = data
self.__subject = subject
self.__event_type = event_type
self.__event_time = event_time
self.__data_version = data_version

@property
def id(self) -> str:
return self.__id

def get_json(self) -> typing.Any:
return self.__data

@property
def subject(self) -> str:
return self.__subject

@property
def event_type(self) -> str:
return self.__event_type

@property
def event_time(self) -> typing.Optional[datetime.datetime]:
return self.__event_time

@property
def data_version(self) -> str:
return self.__data_version

def __repr__(self) -> str:
return (
f'<azure.EventGridEvent id={self.id} '
f'event_type={self.event_type} '
f'subject={self.subject} '
f'at 0x{id(self):0x}>'
)
95 changes: 84 additions & 11 deletions azure/functions/eventgrid.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
import collections
import datetime
import json
import typing
from typing import Optional, List, Any, Dict, Union

from azure.functions import _eventgrid
from azure.functions import _eventgrid as azf_eventgrid

from . import meta
from .meta import Datum


class EventGridEventInConverter(meta.InConverter,
binding='eventGridTrigger', trigger=True):
class EventGridEventInConverter(meta.InConverter, binding='eventGridTrigger',
trigger=True):

@classmethod
def check_input_type_annotation(cls, pytype: type) -> bool:
return issubclass(pytype, _eventgrid.EventGridEvent)
"""
Event Grid always sends an array and may send more than one event in
the array. The runtime invokes function once for each array element,
thus no need to parse List[EventGridEvent]
"""
valid_types = azf_eventgrid.EventGridEvent
return isinstance(pytype, type) and issubclass(pytype, valid_types)

@classmethod
def decode(cls, data: meta.Datum, *, trigger_metadata) -> typing.Any:
def decode(cls, data: meta.Datum, *,
trigger_metadata) -> azf_eventgrid.EventGridEvent:
data_type = data.type

if data_type == 'json':
Expand All @@ -23,11 +33,7 @@ def decode(cls, data: meta.Datum, *, trigger_metadata) -> typing.Any:
raise NotImplementedError(
f'unsupported event grid payload type: {data_type}')

if trigger_metadata is None:
raise NotImplementedError(
f'missing trigger metadata for event grid input')

return _eventgrid.EventGridEvent(
return azf_eventgrid.EventGridEvent(
id=body.get('id'),
topic=body.get('topic'),
subject=body.get('subject'),
Expand All @@ -36,3 +42,70 @@ def decode(cls, data: meta.Datum, *, trigger_metadata) -> typing.Any:
data=body.get('data'),
data_version=body.get('dataVersion'),
)


class EventGridEventOutConverter(meta.OutConverter, binding="eventGrid"):
@classmethod
def check_output_type_annotation(cls, pytype: type) -> bool:
valid_types = (str, bytes, azf_eventgrid.EventGridOutputEvent,
List[azf_eventgrid.EventGridOutputEvent])
return (meta.is_iterable_type_annotation(pytype, str) or meta.
is_iterable_type_annotation(pytype,
azf_eventgrid.EventGridOutputEvent)
or (isinstance(pytype, type)
and issubclass(pytype, valid_types)))

@classmethod
def encode(cls, obj: Any, *, expected_type:
Optional[type]) -> Optional[Datum]:
if isinstance(obj, str):
return meta.Datum(type='string', value=obj)

elif isinstance(obj, bytes):
return meta.Datum(type='bytes', value=obj)

elif isinstance(obj, azf_eventgrid.EventGridOutputEvent):
return meta.Datum(
type='json',
value=json.dumps({
'id': obj.id,
'subject': obj.subject,
'dataVersion': obj.data_version,
'eventType': obj.event_type,
'data': obj.get_json(),
'eventTime': cls._format_datetime(obj.event_time)
})
)

elif isinstance(obj, collections.abc.Iterable):
msgs: List[Union[str, Dict[str, Any]]] = []
for item in obj:
if isinstance(item, str):
msgs.append(item)
elif isinstance(item, azf_eventgrid.EventGridOutputEvent):
msgs.append({'id': item.id,
'subject': item.subject,
'dataVersion': item.data_version,
'eventType': item.event_type,
'data': item.get_json(),
'eventTime': cls._format_datetime(
item.event_time)
})
else:
raise NotImplementedError(
'invalid data type in output '
'queue message list: {}'.format(type(item)))

return meta.Datum(
type='json',
value=json.dumps(msgs)
)

raise NotImplementedError

@classmethod
def _format_datetime(cls, dt: Optional[datetime.datetime]):
if dt is None:
return None
else:
return dt.isoformat()
Loading