1
+ import collections
2
+ import datetime
1
3
import json
2
- import typing
4
+ from typing import Optional , List , Any , Dict , Union
3
5
4
- from azure .functions import _eventgrid
6
+ from azure .functions import _eventgrid as azf_eventgrid
5
7
6
8
from . import meta
9
+ from .meta import Datum
7
10
8
11
9
- class EventGridEventInConverter (meta .InConverter ,
10
- binding = 'eventGridTrigger' , trigger = True ):
12
+ class EventGridEventInConverter (meta .InConverter , binding = 'eventGridTrigger' ,
13
+ trigger = True ):
11
14
12
15
@classmethod
13
16
def check_input_type_annotation (cls , pytype : type ) -> bool :
14
- return issubclass (pytype , _eventgrid .EventGridEvent )
17
+ """
18
+ Event Grid always sends an array and may send more than one event in
19
+ the array. The runtime invokes function once for each array element,
20
+ thus no need to parse List[EventGridEvent]
21
+ """
22
+ valid_types = azf_eventgrid .EventGridEvent
23
+ return isinstance (pytype , type ) and issubclass (pytype , valid_types )
15
24
16
25
@classmethod
17
- def decode (cls , data : meta .Datum , * , trigger_metadata ) -> typing .Any :
26
+ def decode (cls , data : meta .Datum , * ,
27
+ trigger_metadata ) -> azf_eventgrid .EventGridEvent :
18
28
data_type = data .type
19
29
20
30
if data_type == 'json' :
@@ -23,11 +33,7 @@ def decode(cls, data: meta.Datum, *, trigger_metadata) -> typing.Any:
23
33
raise NotImplementedError (
24
34
f'unsupported event grid payload type: { data_type } ' )
25
35
26
- if trigger_metadata is None :
27
- raise NotImplementedError (
28
- f'missing trigger metadata for event grid input' )
29
-
30
- return _eventgrid .EventGridEvent (
36
+ return azf_eventgrid .EventGridEvent (
31
37
id = body .get ('id' ),
32
38
topic = body .get ('topic' ),
33
39
subject = body .get ('subject' ),
@@ -36,3 +42,70 @@ def decode(cls, data: meta.Datum, *, trigger_metadata) -> typing.Any:
36
42
data = body .get ('data' ),
37
43
data_version = body .get ('dataVersion' ),
38
44
)
45
+
46
+
47
+ class EventGridEventOutConverter (meta .OutConverter , binding = "eventGrid" ):
48
+ @classmethod
49
+ def check_output_type_annotation (cls , pytype : type ) -> bool :
50
+ valid_types = (str , bytes , azf_eventgrid .EventGridOutputEvent ,
51
+ List [azf_eventgrid .EventGridOutputEvent ])
52
+ return (meta .is_iterable_type_annotation (pytype , str ) or meta .
53
+ is_iterable_type_annotation (pytype ,
54
+ azf_eventgrid .EventGridOutputEvent )
55
+ or (isinstance (pytype , type )
56
+ and issubclass (pytype , valid_types )))
57
+
58
+ @classmethod
59
+ def encode (cls , obj : Any , * , expected_type :
60
+ Optional [type ]) -> Optional [Datum ]:
61
+ if isinstance (obj , str ):
62
+ return meta .Datum (type = 'string' , value = obj )
63
+
64
+ elif isinstance (obj , bytes ):
65
+ return meta .Datum (type = 'bytes' , value = obj )
66
+
67
+ elif isinstance (obj , azf_eventgrid .EventGridOutputEvent ):
68
+ return meta .Datum (
69
+ type = 'json' ,
70
+ value = json .dumps ({
71
+ 'id' : obj .id ,
72
+ 'subject' : obj .subject ,
73
+ 'dataVersion' : obj .data_version ,
74
+ 'eventType' : obj .event_type ,
75
+ 'data' : obj .get_json (),
76
+ 'eventTime' : cls ._format_datetime (obj .event_time )
77
+ })
78
+ )
79
+
80
+ elif isinstance (obj , collections .abc .Iterable ):
81
+ msgs : List [Union [str , Dict [str , Any ]]] = []
82
+ for item in obj :
83
+ if isinstance (item , str ):
84
+ msgs .append (item )
85
+ elif isinstance (item , azf_eventgrid .EventGridOutputEvent ):
86
+ msgs .append ({'id' : item .id ,
87
+ 'subject' : item .subject ,
88
+ 'dataVersion' : item .data_version ,
89
+ 'eventType' : item .event_type ,
90
+ 'data' : item .get_json (),
91
+ 'eventTime' : cls ._format_datetime (
92
+ item .event_time )
93
+ })
94
+ else :
95
+ raise NotImplementedError (
96
+ 'invalid data type in output '
97
+ 'queue message list: {}' .format (type (item )))
98
+
99
+ return meta .Datum (
100
+ type = 'json' ,
101
+ value = json .dumps (msgs )
102
+ )
103
+
104
+ raise NotImplementedError
105
+
106
+ @classmethod
107
+ def _format_datetime (cls , dt : Optional [datetime .datetime ]):
108
+ if dt is None :
109
+ return None
110
+ else :
111
+ return dt .isoformat ()
0 commit comments