Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions azure/functions/_servicebus.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ def delivery_count(self) -> typing.Optional[int]:
"""Number of times delivery has been attempted."""
pass

@property
@abc.abstractmethod
def enqueued_time_utc(self) -> typing.Optional[datetime.datetime]:
"""The date and time in UTC at which the message is enqueued"""
pass

@property
@abc.abstractmethod
def expires_at_utc(self) -> typing.Optional[datetime.datetime]:
"""The date and time in UTC at which the message is set to expire."""
pass

@property
@abc.abstractmethod
def expiration_time(self) -> typing.Optional[datetime.datetime]:
Expand Down
2 changes: 1 addition & 1 deletion azure/functions/eventhub.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def decode_single_event(
body=body,
trigger_metadata=trigger_metadata,
enqueued_time=cls._parse_datetime_metadata(
trigger_metadata, 'EnqueuedTime'),
trigger_metadata, 'EnqueuedTimeUtc'),
partition_key=cls._decode_trigger_metadata_field(
trigger_metadata, 'PartitionKey', python_type=str),
sequence_number=cls._decode_trigger_metadata_field(
Expand Down
78 changes: 75 additions & 3 deletions tests/test_eventhub.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

from typing import List
from typing import List, Mapping
import unittest
import json
from unittest.mock import patch
Expand Down Expand Up @@ -194,7 +194,7 @@ def test_single_eventhub_trigger_metadata_field(self):
self.assertIsNotNone(metadata_dict.get('SystemProperties'))

# EnqueuedTime should be in iso8601 string format
self.assertEqual(metadata_dict['EnqueuedTime'],
self.assertEqual(metadata_dict['EnqueuedTimeUtc'],
self.MOCKED_ENQUEUE_TIME.isoformat())
self.assertEqual(metadata_dict['SystemProperties'][
'iothub-connection-device-id'
Expand Down Expand Up @@ -224,6 +224,78 @@ def test_multiple_eventhub_triggers_metadata_field(self):
'iothub-connection-device-id'
], 'MyTestDevice1')

def test_eventhub_properties(self):
"""Test if properties from public interface _eventhub.py returns
the correct values from metadata"""

result = azf_eh.EventHubTriggerConverter.decode(
data=meta.Datum(b'body_bytes', 'bytes'),
trigger_metadata=self._generate_full_metadata()
)

self.assertEqual(result.get_body(), b'body_bytes')
self.assertIsNone(result.partition_key)
self.assertDictEqual(result.iothub_metadata,
{'connection-device-id': 'awesome-device-id'})
self.assertEqual(result.sequence_number, 47)
self.assertEqual(result.enqueued_time.isoformat(),
'2020-07-14T01:27:55.627000+00:00')
self.assertEqual(result.offset, '3696')

def _generate_full_metadata(self):
mocked_metadata: Mapping[str, meta.Datum] = {}
mocked_metadata['Offset'] = meta.Datum(type='string', value='3696')
mocked_metadata['EnqueuedTimeUtc'] = meta.Datum(
type='string', value='2020-07-14T01:27:55.627Z')
mocked_metadata['SequenceNumber'] = meta.Datum(type='int', value=47)
mocked_metadata['Properties'] = meta.Datum(type='json', value='{}')
mocked_metadata['sys'] = meta.Datum(type='json', value='''
{
"MethodName":"metadata_trigger",
"UtcNow":"2020-07-14T01:27:55.8940305Z",
"RandGuid":"db413fd6-8411-4e51-844c-c9b5345e537d"
}''')
mocked_metadata['SystemProperties'] = meta.Datum(type='json', value='''
{
"x-opt-sequence-number":47,
"x-opt-offset":"3696",
"x-opt-enqueued-time":"2020-07-14T01:27:55.627Z",
"SequenceNumber":47,
"Offset":"3696",
"PartitionKey":null,
"EnqueuedTimeUtc":"2020-07-14T01:27:55.627Z",
"iothub-connection-device-id":"awesome-device-id"
}''')
mocked_metadata['PartitionContext'] = meta.Datum(type='json', value='''
{
"CancellationToken":{
"IsCancellationRequested":false,
"CanBeCanceled":true,
"WaitHandle":{
"Handle":{
"value":2472
},
"SafeWaitHandle":{
"IsInvalid":false,
"IsClosed":false
}
}
},
"ConsumerGroupName":"$Default",
"EventHubPath":"python-worker-ci-eventhub-one-metadata",
"PartitionId":"0",
"Owner":"88cec2e2-94c9-4e08-acb6-4f2b97cd888e",
"RuntimeInformation":{
"PartitionId":"0",
"LastSequenceNumber":0,
"LastEnqueuedTimeUtc":"0001-01-01T00:00:00",
"LastEnqueuedOffset":null,
"RetrievalTime":"0001-01-01T00:00:00"
}
}''')

return mocked_metadata

def _generate_single_iothub_datum(self, datum_type='json'):
datum = '{"device-status": "good"}'
if datum_type == 'bytes':
Expand All @@ -248,7 +320,7 @@ def _generate_multiple_iothub_data(self, data_type='json'):

def _generate_single_trigger_metadatum(self):
return {
'EnqueuedTime': meta.Datum(
'EnqueuedTimeUtc': meta.Datum(
f'"{self.MOCKED_ENQUEUE_TIME.isoformat()}"', 'json'
),
'SystemProperties': meta.Datum(
Expand Down
85 changes: 68 additions & 17 deletions tests/test_servicebus.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,31 @@ def test_servicebus_data(self):

def test_servicebus_properties(self):
# SystemProperties in metadata should propagate to class properties
servicebus_msg = azf_sb.ServiceBusMessageInConverter.decode(
data=self._generate_servicebus_data(),
msg = azf_sb.ServiceBusMessageInConverter.decode(
data=meta.Datum(b'body_bytes', 'bytes'),
trigger_metadata=self._generate_servicebus_metadata())

self.assertEqual(servicebus_msg.content_type, 'application/json')
self.assertEqual(servicebus_msg.label, 'Microsoft.Azure.ServiceBus')
self.assertEqual(servicebus_msg.message_id,
'87c66eaf88e84119b66a26278a7b4149')
self.assertEqual(servicebus_msg.enqueued_time_utc,
self.MOCKED_ENQUEUE_TIME)
self.assertEqual(servicebus_msg.expires_at_utc,
self.assertEqual(msg.get_body(), b'body_bytes')
self.assertEqual(msg.content_type, 'application/json')
self.assertIsNone(msg.correlation_id)
self.assertEqual(msg.enqueued_time_utc, self.MOCKED_ENQUEUE_TIME)
self.assertEqual(msg.expires_at_utc,
datetime(2020, 7, 2, 5, 39, 12, 170000,
tzinfo=timezone.utc))
self.assertIsNone(msg.expiration_time)
self.assertEqual(msg.label, 'Microsoft.Azure.ServiceBus')
self.assertEqual(msg.message_id, '87c66eaf88e84119b66a26278a7b4149')
self.assertEqual(msg.partition_key, 'sample_part')
self.assertIsNone(msg.reply_to)
self.assertIsNone(msg.reply_to_session_id)
self.assertIsNone(msg.scheduled_enqueue_time)
self.assertIsNone(msg.session_id)
self.assertIsNone(msg.time_to_live)
self.assertIsNone(msg.to)
self.assertDictEqual(msg.user_properties, {
'$AzureWebJobsParentId': '6ceef68b-0794-45dd-bb2e-630748515552',
'x-opt-enqueue-sequence-number': 0
})

def test_servicebus_metadata(self):
# Trigger metadata should contains all the essential information
Expand All @@ -79,7 +91,7 @@ def test_servicebus_metadata(self):

# Datetime should be in iso8601 string instead of datetime object
metadata_dict = servicebus_msg.metadata
self.assertDictEqual(metadata_dict, {
self.assertGreaterEqual(metadata_dict.items(), {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this compare all the metadata items? If yes, then why the "Greater"?

Copy link
Contributor Author

@Hazhzeng Hazhzeng Aug 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is amended to ensure the metadata_dict has the following expected fields (DeliveryCount, LockToken, ExpiresAtUtc...).

Using self.assertDictEqual is too strict for this test scenario. We just want to ensure the expected fields are a subset of the metadata, no need to be identical. I tried using self.assertDictContainsSubset but it has been deprecated in Python 3.6. So I first convert the dictionary into an ItemView (an item set) using .items(), and use self.assertGreaterEqual for subset check.

'DeliveryCount': 1,
'LockToken': '87931fd2-39f4-415a-9fdc-adfdcbed3148',
'ExpiresAtUtc': '2020-07-02T05:39:12.17Z',
Expand All @@ -93,7 +105,7 @@ def test_servicebus_metadata(self):
'UtcNow': '2020-06-18T05:39:12.2860411Z',
'RandGuid': 'bb38deae-cc75-49f2-89f5-96ec6eb857db'
}
})
}.items())

def test_servicebus_should_not_override_metadata(self):
# SystemProperties in metadata should propagate to class properties
Expand Down Expand Up @@ -135,14 +147,53 @@ def _generate_servicebus_metadata(self):
'application/json', 'string'
)
mocked_metadata['SequenceNumber'] = meta.Datum(3, 'int')
mocked_metadata['PartitionKey'] = meta.Datum('sample_part', 'string')
mocked_metadata['Label'] = meta.Datum(
'Microsoft.Azure.ServiceBus', 'string'
)
mocked_metadata['sys'] = meta.Datum(type='json', value='''
{
"MethodName": "ServiceBusSMany",
"UtcNow": "2020-06-18T05:39:12.2860411Z",
"RandGuid": "bb38deae-cc75-49f2-89f5-96ec6eb857db"
mocked_metadata['MessageReceiver'] = meta.Datum(type='json', value='''
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the new section of metadata received?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is the new section of metadata provided by the extension. We still support the old mocked_metadata['sys'] in line 192.

{
"RegisteredPlugins": [],
"ReceiveMode": 0,
"PrefetchCount": 0,
"LastPeekedSequenceNumber": 0,
"Path": "testqueue",
"OperationTimeout": "00:01:00",
"ServiceBusConnection": {
"Endpoint": "sb://python-worker-36-sbns.servicebus.win.net",
"OperationTimeout": "00:01:00",
"RetryPolicy": {
"MinimalBackoff": "00:00:00",
"MaximumBackoff": "00:00:30",
"DeltaBackoff": "00:00:03",
"MaxRetryCount": 5,
"IsServerBusy": false,
"ServerBusyExceptionMessage": null
},
"TransportType": 0,
"TokenProvider": {}
},
"IsClosedOrClosing": false,
"ClientId": "MessageReceiver1testqueue",
"RetryPolicy": {
"MinimalBackoff": "00:00:00",
"MaximumBackoff": "00:00:30",
"DeltaBackoff": "00:00:03",
"MaxRetryCount": 5,
"IsServerBusy": false,
"ServerBusyExceptionMessage": null
}
''')
}''')
mocked_metadata['UserProperties'] = meta.Datum(type='json', value='''
{
"$AzureWebJobsParentId": "6ceef68b-0794-45dd-bb2e-630748515552",
"x-opt-enqueue-sequence-number": 0
}''')
mocked_metadata['sys'] = meta.Datum(type='json', value='''
{
"MethodName": "ServiceBusSMany",
"UtcNow": "2020-06-18T05:39:12.2860411Z",
"RandGuid": "bb38deae-cc75-49f2-89f5-96ec6eb857db"
}
''')
return mocked_metadata