Skip to content

Commit 7156bf2

Browse files
authored
Fix EnqueueTimeUtc in EventHub and expose ServiceBus new properties (#65)
* Fix eventhub enqueue time * Add more robust test in properties * Fix syntax
1 parent 7b87b35 commit 7156bf2

File tree

4 files changed

+156
-21
lines changed

4 files changed

+156
-21
lines changed

azure/functions/_servicebus.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,18 @@ def delivery_count(self) -> typing.Optional[int]:
3131
"""Number of times delivery has been attempted."""
3232
pass
3333

34+
@property
35+
@abc.abstractmethod
36+
def enqueued_time_utc(self) -> typing.Optional[datetime.datetime]:
37+
"""The date and time in UTC at which the message is enqueued"""
38+
pass
39+
40+
@property
41+
@abc.abstractmethod
42+
def expires_at_utc(self) -> typing.Optional[datetime.datetime]:
43+
"""The date and time in UTC at which the message is set to expire."""
44+
pass
45+
3446
@property
3547
@abc.abstractmethod
3648
def expiration_time(self) -> typing.Optional[datetime.datetime]:

azure/functions/eventhub.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ def decode_single_event(
125125
body=body,
126126
trigger_metadata=trigger_metadata,
127127
enqueued_time=cls._parse_datetime_metadata(
128-
trigger_metadata, 'EnqueuedTime'),
128+
trigger_metadata, 'EnqueuedTimeUtc'),
129129
partition_key=cls._decode_trigger_metadata_field(
130130
trigger_metadata, 'PartitionKey', python_type=str),
131131
sequence_number=cls._decode_trigger_metadata_field(

tests/test_eventhub.py

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Copyright (c) Microsoft Corporation. All rights reserved.
22
# Licensed under the MIT License.
33

4-
from typing import List
4+
from typing import List, Mapping
55
import unittest
66
import json
77
from unittest.mock import patch
@@ -194,7 +194,7 @@ def test_single_eventhub_trigger_metadata_field(self):
194194
self.assertIsNotNone(metadata_dict.get('SystemProperties'))
195195

196196
# EnqueuedTime should be in iso8601 string format
197-
self.assertEqual(metadata_dict['EnqueuedTime'],
197+
self.assertEqual(metadata_dict['EnqueuedTimeUtc'],
198198
self.MOCKED_ENQUEUE_TIME.isoformat())
199199
self.assertEqual(metadata_dict['SystemProperties'][
200200
'iothub-connection-device-id'
@@ -224,6 +224,78 @@ def test_multiple_eventhub_triggers_metadata_field(self):
224224
'iothub-connection-device-id'
225225
], 'MyTestDevice1')
226226

227+
def test_eventhub_properties(self):
228+
"""Test if properties from public interface _eventhub.py returns
229+
the correct values from metadata"""
230+
231+
result = azf_eh.EventHubTriggerConverter.decode(
232+
data=meta.Datum(b'body_bytes', 'bytes'),
233+
trigger_metadata=self._generate_full_metadata()
234+
)
235+
236+
self.assertEqual(result.get_body(), b'body_bytes')
237+
self.assertIsNone(result.partition_key)
238+
self.assertDictEqual(result.iothub_metadata,
239+
{'connection-device-id': 'awesome-device-id'})
240+
self.assertEqual(result.sequence_number, 47)
241+
self.assertEqual(result.enqueued_time.isoformat(),
242+
'2020-07-14T01:27:55.627000+00:00')
243+
self.assertEqual(result.offset, '3696')
244+
245+
def _generate_full_metadata(self):
246+
mocked_metadata: Mapping[str, meta.Datum] = {}
247+
mocked_metadata['Offset'] = meta.Datum(type='string', value='3696')
248+
mocked_metadata['EnqueuedTimeUtc'] = meta.Datum(
249+
type='string', value='2020-07-14T01:27:55.627Z')
250+
mocked_metadata['SequenceNumber'] = meta.Datum(type='int', value=47)
251+
mocked_metadata['Properties'] = meta.Datum(type='json', value='{}')
252+
mocked_metadata['sys'] = meta.Datum(type='json', value='''
253+
{
254+
"MethodName":"metadata_trigger",
255+
"UtcNow":"2020-07-14T01:27:55.8940305Z",
256+
"RandGuid":"db413fd6-8411-4e51-844c-c9b5345e537d"
257+
}''')
258+
mocked_metadata['SystemProperties'] = meta.Datum(type='json', value='''
259+
{
260+
"x-opt-sequence-number":47,
261+
"x-opt-offset":"3696",
262+
"x-opt-enqueued-time":"2020-07-14T01:27:55.627Z",
263+
"SequenceNumber":47,
264+
"Offset":"3696",
265+
"PartitionKey":null,
266+
"EnqueuedTimeUtc":"2020-07-14T01:27:55.627Z",
267+
"iothub-connection-device-id":"awesome-device-id"
268+
}''')
269+
mocked_metadata['PartitionContext'] = meta.Datum(type='json', value='''
270+
{
271+
"CancellationToken":{
272+
"IsCancellationRequested":false,
273+
"CanBeCanceled":true,
274+
"WaitHandle":{
275+
"Handle":{
276+
"value":2472
277+
},
278+
"SafeWaitHandle":{
279+
"IsInvalid":false,
280+
"IsClosed":false
281+
}
282+
}
283+
},
284+
"ConsumerGroupName":"$Default",
285+
"EventHubPath":"python-worker-ci-eventhub-one-metadata",
286+
"PartitionId":"0",
287+
"Owner":"88cec2e2-94c9-4e08-acb6-4f2b97cd888e",
288+
"RuntimeInformation":{
289+
"PartitionId":"0",
290+
"LastSequenceNumber":0,
291+
"LastEnqueuedTimeUtc":"0001-01-01T00:00:00",
292+
"LastEnqueuedOffset":null,
293+
"RetrievalTime":"0001-01-01T00:00:00"
294+
}
295+
}''')
296+
297+
return mocked_metadata
298+
227299
def _generate_single_iothub_datum(self, datum_type='json'):
228300
datum = '{"device-status": "good"}'
229301
if datum_type == 'bytes':
@@ -248,7 +320,7 @@ def _generate_multiple_iothub_data(self, data_type='json'):
248320

249321
def _generate_single_trigger_metadatum(self):
250322
return {
251-
'EnqueuedTime': meta.Datum(
323+
'EnqueuedTimeUtc': meta.Datum(
252324
f'"{self.MOCKED_ENQUEUE_TIME.isoformat()}"', 'json'
253325
),
254326
'SystemProperties': meta.Datum(

tests/test_servicebus.py

Lines changed: 68 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -56,19 +56,31 @@ def test_servicebus_data(self):
5656

5757
def test_servicebus_properties(self):
5858
# SystemProperties in metadata should propagate to class properties
59-
servicebus_msg = azf_sb.ServiceBusMessageInConverter.decode(
60-
data=self._generate_servicebus_data(),
59+
msg = azf_sb.ServiceBusMessageInConverter.decode(
60+
data=meta.Datum(b'body_bytes', 'bytes'),
6161
trigger_metadata=self._generate_servicebus_metadata())
6262

63-
self.assertEqual(servicebus_msg.content_type, 'application/json')
64-
self.assertEqual(servicebus_msg.label, 'Microsoft.Azure.ServiceBus')
65-
self.assertEqual(servicebus_msg.message_id,
66-
'87c66eaf88e84119b66a26278a7b4149')
67-
self.assertEqual(servicebus_msg.enqueued_time_utc,
68-
self.MOCKED_ENQUEUE_TIME)
69-
self.assertEqual(servicebus_msg.expires_at_utc,
63+
self.assertEqual(msg.get_body(), b'body_bytes')
64+
self.assertEqual(msg.content_type, 'application/json')
65+
self.assertIsNone(msg.correlation_id)
66+
self.assertEqual(msg.enqueued_time_utc, self.MOCKED_ENQUEUE_TIME)
67+
self.assertEqual(msg.expires_at_utc,
7068
datetime(2020, 7, 2, 5, 39, 12, 170000,
7169
tzinfo=timezone.utc))
70+
self.assertIsNone(msg.expiration_time)
71+
self.assertEqual(msg.label, 'Microsoft.Azure.ServiceBus')
72+
self.assertEqual(msg.message_id, '87c66eaf88e84119b66a26278a7b4149')
73+
self.assertEqual(msg.partition_key, 'sample_part')
74+
self.assertIsNone(msg.reply_to)
75+
self.assertIsNone(msg.reply_to_session_id)
76+
self.assertIsNone(msg.scheduled_enqueue_time)
77+
self.assertIsNone(msg.session_id)
78+
self.assertIsNone(msg.time_to_live)
79+
self.assertIsNone(msg.to)
80+
self.assertDictEqual(msg.user_properties, {
81+
'$AzureWebJobsParentId': '6ceef68b-0794-45dd-bb2e-630748515552',
82+
'x-opt-enqueue-sequence-number': 0
83+
})
7284

7385
def test_servicebus_metadata(self):
7486
# Trigger metadata should contains all the essential information
@@ -79,7 +91,7 @@ def test_servicebus_metadata(self):
7991

8092
# Datetime should be in iso8601 string instead of datetime object
8193
metadata_dict = servicebus_msg.metadata
82-
self.assertDictEqual(metadata_dict, {
94+
self.assertGreaterEqual(metadata_dict.items(), {
8395
'DeliveryCount': 1,
8496
'LockToken': '87931fd2-39f4-415a-9fdc-adfdcbed3148',
8597
'ExpiresAtUtc': '2020-07-02T05:39:12.17Z',
@@ -93,7 +105,7 @@ def test_servicebus_metadata(self):
93105
'UtcNow': '2020-06-18T05:39:12.2860411Z',
94106
'RandGuid': 'bb38deae-cc75-49f2-89f5-96ec6eb857db'
95107
}
96-
})
108+
}.items())
97109

98110
def test_servicebus_should_not_override_metadata(self):
99111
# SystemProperties in metadata should propagate to class properties
@@ -135,14 +147,53 @@ def _generate_servicebus_metadata(self):
135147
'application/json', 'string'
136148
)
137149
mocked_metadata['SequenceNumber'] = meta.Datum(3, 'int')
150+
mocked_metadata['PartitionKey'] = meta.Datum('sample_part', 'string')
138151
mocked_metadata['Label'] = meta.Datum(
139152
'Microsoft.Azure.ServiceBus', 'string'
140153
)
141-
mocked_metadata['sys'] = meta.Datum(type='json', value='''
142-
{
143-
"MethodName": "ServiceBusSMany",
144-
"UtcNow": "2020-06-18T05:39:12.2860411Z",
145-
"RandGuid": "bb38deae-cc75-49f2-89f5-96ec6eb857db"
154+
mocked_metadata['MessageReceiver'] = meta.Datum(type='json', value='''
155+
{
156+
"RegisteredPlugins": [],
157+
"ReceiveMode": 0,
158+
"PrefetchCount": 0,
159+
"LastPeekedSequenceNumber": 0,
160+
"Path": "testqueue",
161+
"OperationTimeout": "00:01:00",
162+
"ServiceBusConnection": {
163+
"Endpoint": "sb://python-worker-36-sbns.servicebus.win.net",
164+
"OperationTimeout": "00:01:00",
165+
"RetryPolicy": {
166+
"MinimalBackoff": "00:00:00",
167+
"MaximumBackoff": "00:00:30",
168+
"DeltaBackoff": "00:00:03",
169+
"MaxRetryCount": 5,
170+
"IsServerBusy": false,
171+
"ServerBusyExceptionMessage": null
172+
},
173+
"TransportType": 0,
174+
"TokenProvider": {}
175+
},
176+
"IsClosedOrClosing": false,
177+
"ClientId": "MessageReceiver1testqueue",
178+
"RetryPolicy": {
179+
"MinimalBackoff": "00:00:00",
180+
"MaximumBackoff": "00:00:30",
181+
"DeltaBackoff": "00:00:03",
182+
"MaxRetryCount": 5,
183+
"IsServerBusy": false,
184+
"ServerBusyExceptionMessage": null
146185
}
147-
''')
186+
}''')
187+
mocked_metadata['UserProperties'] = meta.Datum(type='json', value='''
188+
{
189+
"$AzureWebJobsParentId": "6ceef68b-0794-45dd-bb2e-630748515552",
190+
"x-opt-enqueue-sequence-number": 0
191+
}''')
192+
mocked_metadata['sys'] = meta.Datum(type='json', value='''
193+
{
194+
"MethodName": "ServiceBusSMany",
195+
"UtcNow": "2020-06-18T05:39:12.2860411Z",
196+
"RandGuid": "bb38deae-cc75-49f2-89f5-96ec6eb857db"
197+
}
198+
''')
148199
return mocked_metadata

0 commit comments

Comments
 (0)