diff --git a/.flake8 b/.flake8 index 2627b81d..ac4a7662 100644 --- a/.flake8 +++ b/.flake8 @@ -2,4 +2,4 @@ ignore = W503,E402,E731 exclude = .git, __pycache__, build, dist, .eggs, .github, .local, - Samples, azure/functions/_thirdparty, docs/, .venv*/, .env*/, .vscode/ + Samples, azure/functions/_thirdparty, docs/, .venv*/, .env*/, .vscode/, venv diff --git a/.gitignore b/.gitignore index 792f2917..74050370 100644 --- a/.gitignore +++ b/.gitignore @@ -86,6 +86,7 @@ celerybeat-schedule .venv* venv/ ENV/ +py3env/ # Spyder project settings .spyderproject @@ -103,3 +104,10 @@ ENV/ .testconfig .pytest_cache + +# mac osx specific files +.DS_Store + +# PyCharm related files +.idea/ +.idea_modules/ \ No newline at end of file diff --git a/azure/functions/__init__.py b/azure/functions/__init__.py index 2cf64288..bf374bb0 100644 --- a/azure/functions/__init__.py +++ b/azure/functions/__init__.py @@ -1,6 +1,6 @@ from ._abc import TimerRequest, InputStream, Context, Out # NoQA from ._eventhub import EventHubEvent # NoQA -from ._eventgrid import EventGridEvent # NoQA +from ._eventgrid import EventGridEvent, EventGridOutputEvent # NoQA from ._cosmosdb import Document, DocumentList # NoQA from ._http import HttpRequest # NoQA from ._http import HttpResponse # NoQA diff --git a/azure/functions/_abc.py b/azure/functions/_abc.py index f955139e..43e9352e 100644 --- a/azure/functions/_abc.py +++ b/azure/functions/_abc.py @@ -241,6 +241,37 @@ def data_version(self) -> str: pass +class EventGridOutputEvent(abc.ABC): + @property + @abc.abstractmethod + def id(self) -> str: + pass + + @abc.abstractmethod + def get_json(self) -> typing.Any: + pass + + @property + @abc.abstractmethod + def subject(self) -> str: + pass + + @property + @abc.abstractmethod + def event_type(self) -> str: + pass + + @property + @abc.abstractmethod + def event_time(self) -> typing.Optional[datetime.datetime]: + pass + + @property + @abc.abstractmethod + def data_version(self) -> str: + pass + + class Document(abc.ABC): @classmethod diff --git a/azure/functions/_eventgrid.py b/azure/functions/_eventgrid.py index a3b8e3b8..ea210594 100644 --- a/azure/functions/_eventgrid.py +++ b/azure/functions/_eventgrid.py @@ -57,3 +57,52 @@ def __repr__(self) -> str: f'subject={self.subject} ' f'at 0x{id(self):0x}>' ) + + +class EventGridOutputEvent(azf_abc.EventGridOutputEvent): + """An EventGrid event message.""" + + def __init__(self, *, + id: str, + data: typing.Dict[str, object], + subject: str, + event_type: str, + event_time: typing.Optional[datetime.datetime], + data_version: str) -> None: + self.__id = id + self.__data = data + self.__subject = subject + self.__event_type = event_type + self.__event_time = event_time + self.__data_version = data_version + + @property + def id(self) -> str: + return self.__id + + def get_json(self) -> typing.Any: + return self.__data + + @property + def subject(self) -> str: + return self.__subject + + @property + def event_type(self) -> str: + return self.__event_type + + @property + def event_time(self) -> typing.Optional[datetime.datetime]: + return self.__event_time + + @property + def data_version(self) -> str: + return self.__data_version + + def __repr__(self) -> str: + return ( + f'' + ) diff --git a/azure/functions/eventgrid.py b/azure/functions/eventgrid.py index 7cfda2d3..94441e49 100644 --- a/azure/functions/eventgrid.py +++ b/azure/functions/eventgrid.py @@ -1,20 +1,30 @@ +import collections +import datetime import json -import typing +from typing import Optional, List, Any, Dict, Union -from azure.functions import _eventgrid +from azure.functions import _eventgrid as azf_eventgrid from . import meta +from .meta import Datum -class EventGridEventInConverter(meta.InConverter, - binding='eventGridTrigger', trigger=True): +class EventGridEventInConverter(meta.InConverter, binding='eventGridTrigger', + trigger=True): @classmethod def check_input_type_annotation(cls, pytype: type) -> bool: - return issubclass(pytype, _eventgrid.EventGridEvent) + """ + Event Grid always sends an array and may send more than one event in + the array. The runtime invokes function once for each array element, + thus no need to parse List[EventGridEvent] + """ + valid_types = azf_eventgrid.EventGridEvent + return isinstance(pytype, type) and issubclass(pytype, valid_types) @classmethod - def decode(cls, data: meta.Datum, *, trigger_metadata) -> typing.Any: + def decode(cls, data: meta.Datum, *, + trigger_metadata) -> azf_eventgrid.EventGridEvent: data_type = data.type if data_type == 'json': @@ -23,11 +33,7 @@ def decode(cls, data: meta.Datum, *, trigger_metadata) -> typing.Any: raise NotImplementedError( f'unsupported event grid payload type: {data_type}') - if trigger_metadata is None: - raise NotImplementedError( - f'missing trigger metadata for event grid input') - - return _eventgrid.EventGridEvent( + return azf_eventgrid.EventGridEvent( id=body.get('id'), topic=body.get('topic'), subject=body.get('subject'), @@ -36,3 +42,70 @@ def decode(cls, data: meta.Datum, *, trigger_metadata) -> typing.Any: data=body.get('data'), data_version=body.get('dataVersion'), ) + + +class EventGridEventOutConverter(meta.OutConverter, binding="eventGrid"): + @classmethod + def check_output_type_annotation(cls, pytype: type) -> bool: + valid_types = (str, bytes, azf_eventgrid.EventGridOutputEvent, + List[azf_eventgrid.EventGridOutputEvent]) + return (meta.is_iterable_type_annotation(pytype, str) or meta. + is_iterable_type_annotation(pytype, + azf_eventgrid.EventGridOutputEvent) + or (isinstance(pytype, type) + and issubclass(pytype, valid_types))) + + @classmethod + def encode(cls, obj: Any, *, expected_type: + Optional[type]) -> Optional[Datum]: + if isinstance(obj, str): + return meta.Datum(type='string', value=obj) + + elif isinstance(obj, bytes): + return meta.Datum(type='bytes', value=obj) + + elif isinstance(obj, azf_eventgrid.EventGridOutputEvent): + return meta.Datum( + type='json', + value=json.dumps({ + 'id': obj.id, + 'subject': obj.subject, + 'dataVersion': obj.data_version, + 'eventType': obj.event_type, + 'data': obj.get_json(), + 'eventTime': cls._format_datetime(obj.event_time) + }) + ) + + elif isinstance(obj, collections.abc.Iterable): + msgs: List[Union[str, Dict[str, Any]]] = [] + for item in obj: + if isinstance(item, str): + msgs.append(item) + elif isinstance(item, azf_eventgrid.EventGridOutputEvent): + msgs.append({'id': item.id, + 'subject': item.subject, + 'dataVersion': item.data_version, + 'eventType': item.event_type, + 'data': item.get_json(), + 'eventTime': cls._format_datetime( + item.event_time) + }) + else: + raise NotImplementedError( + 'invalid data type in output ' + 'queue message list: {}'.format(type(item))) + + return meta.Datum( + type='json', + value=json.dumps(msgs) + ) + + raise NotImplementedError + + @classmethod + def _format_datetime(cls, dt: Optional[datetime.datetime]): + if dt is None: + return None + else: + return dt.isoformat() diff --git a/tests/test_eventgrid.py b/tests/test_eventgrid.py new file mode 100644 index 00000000..9ffa3ad7 --- /dev/null +++ b/tests/test_eventgrid.py @@ -0,0 +1,163 @@ +from datetime import datetime +import unittest +from typing import List + +import azure.functions as func +import azure.functions.eventgrid as azf_event_grid + + +class MyTestCase(unittest.TestCase): + def test_eventgrid_input_type(self): + check_input_type = azf_event_grid.EventGridEventInConverter.\ + check_input_type_annotation + self.assertTrue(check_input_type(func.EventGridEvent)) + self.assertFalse(check_input_type(List[func.EventGridEvent])) + self.assertFalse(check_input_type(str)) + self.assertFalse(check_input_type(bytes)) + + def test_eventgrid_output_type(self): + check_output_type = azf_event_grid.EventGridEventOutConverter.\ + check_output_type_annotation + self.assertTrue(check_output_type(func.EventGridOutputEvent)) + self.assertTrue(check_output_type(List[func.EventGridOutputEvent])) + self.assertTrue(check_output_type(str)) + self.assertTrue(check_output_type(bytes)) + self.assertTrue(check_output_type(List[str])) + + def test_eventgrid_decode(self): + eventGridEvent = azf_event_grid.EventGridEventInConverter.decode( + data=self._generate_single_eventgrid_datum(), trigger_metadata=None + ) + self.assertEqual( + eventGridEvent.id, + "00010001-0001-0001-0001-000100010001") + self.assertEqual(eventGridEvent.subject, "eventhubs/test") + self.assertEqual(eventGridEvent.event_type, "captureFileCreated") + self.assertEqual(eventGridEvent.topic, "/TestTopic/namespaces/test") + self.assertIsNotNone(eventGridEvent.get_json()) + + def test_eventgrid_decode_with_null_data(self): + eventGridEvent = azf_event_grid.EventGridEventInConverter.decode( + data=self._generate_single_eventgrid_datum( + with_data=False), trigger_metadata=None) + self.assertEqual( + eventGridEvent.id, + "00010001-0001-0001-0001-000100010001") + self.assertEqual(eventGridEvent.subject, "eventhubs/test") + self.assertEqual(eventGridEvent.event_type, "captureFileCreated") + self.assertEqual(eventGridEvent.topic, "/TestTopic/namespaces/test") + self.assertIsNone(eventGridEvent.get_json()) + + def test_eventgrid_encode_with_str_data(self): + example_data = self._generate_single_eventgrid_str() + eventGridDatum = azf_event_grid.EventGridEventOutConverter.encode( + example_data, expected_type=type(example_data)) + self.assertEqual(eventGridDatum.type, "string") + + def test_eventgrid_encode_with_bytes_data(self): + example_data = self._generate_single_eventgrid_str(True) + eventGridDatum = azf_event_grid.EventGridEventOutConverter.encode( + example_data, expected_type=type(example_data)) + self.assertEqual(eventGridDatum.type, "bytes") + + def test_eventgrid_encode_with_EventGridData(self): + example_data = self._generate_single_eventgrid_event() + event_grid_datum = azf_event_grid.EventGridEventOutConverter.encode( + example_data, expected_type=type(example_data)) + + self.assertEqual(event_grid_datum.type, "json") + + def test_eventgrid_encode_with_multiple_EventGridData(self): + example_data = self._generate_multiple_eventgrid_event() + event_grid_datum = azf_event_grid.EventGridEventOutConverter.encode( + example_data, expected_type=type(example_data)) + + self.assertEqual(event_grid_datum.type, "json") + + @staticmethod + def _generate_single_eventgrid_datum(with_data=True, datum_type='json'): + datum_with_data = """ +{ + "topic": "/TestTopic/namespaces/test", + "subject": "eventhubs/test", + "eventType": "captureFileCreated", + "eventTime": "2017-07-14T23:10:27.7689666Z", + "id": "00010001-0001-0001-0001-000100010001", + "data": { + "fileUrl": "https://test.blob.core.windows.net/debugging/testblob.txt", + "fileType": "AzureBlockBlob", + "partitionId": "1", + "sizeInBytes": 0, + "eventCount": 0, + "firstSequenceNumber": -1, + "lastSequenceNumber": -1, + "firstEnqueueTime": "0001-01-01T00:00:00", + "lastEnqueueTime": "0001-01-01T00:00:00" + }, + "dataVersion": "", + "metadataVersion": "1" +} +""" + datum_without_data = """ +{ + "topic": "/TestTopic/namespaces/test", + "subject": "eventhubs/test", + "eventType": "captureFileCreated", + "eventTime": "2017-07-14T23:10:27.7689666Z", + "id": "00010001-0001-0001-0001-000100010001", + "dataVersion": "", + "metadataVersion": "1" +}""" + + datum = datum_with_data if with_data else datum_without_data + + if datum_type == 'bytes': + datum = datum.encode('utf-8') + + return func.meta.Datum(datum, datum_type) + + @staticmethod + def _generate_single_eventgrid_event(with_date=True): + return azf_event_grid.azf_eventgrid.EventGridOutputEvent( + id="id", + subject='subject', + event_type='eventType', + event_time=datetime.utcnow(), + data={"tag1": "value1", "tag2": "value2"} if with_date else {}, + data_version='dataVersion', + ) + + @staticmethod + def _generate_multiple_eventgrid_event(with_date=True): + return [azf_event_grid.azf_eventgrid.EventGridOutputEvent( + id="id1", + subject='subject1', + event_type='eventType1', + event_time=datetime.utcnow(), + data={"tag1": "value1", "tag2": "value2"} if with_date else {}, + data_version='dataVersion', + ), azf_event_grid.azf_eventgrid.EventGridOutputEvent( + id="id2", + subject='subject2', + event_type='eventType2', + event_time=datetime.utcnow(), + data={"tag1": "value1", "tag2": "value2"} if with_date else {}, + data_version='dataVersion', + )] + + @staticmethod + def _generate_single_eventgrid_str(in_bytes=False): + string_representation = '{"id": "id", ' \ + '"subject": "subject", ' \ + '"dataVersion": "dataVersion", ' \ + '"eventType": "eventType", ' \ + '"data": {"tag1": "value1", ' \ + '"tag2": "value2"}, ' \ + '"eventTime": "2020-04-22T18:19:19Z"}' + return string_representation.encode('utf-8') \ + if in_bytes \ + else string_representation + + +if __name__ == '__main__': + unittest.main()