From 3f09485b108963f5ed0d1326e36751a51797a2a1 Mon Sep 17 00:00:00 2001 From: Mahmoud Fathi Date: Thu, 11 Jan 2024 10:43:18 +0100 Subject: [PATCH] Fix get_status_all() to return all the instances. WARNING: THIS PATCH IS JUST TO HINT ABOUT THE EXISTING ISSUE. IT NEEDS SOME ADDITIONAL WORK FROM THE MAINTAINERS TO BE MERGED. get_status_all() function is meant to return all the instances of the durable function. However, if there is more than 100 instances, the function will not return all of them. This is due to the fact that the if more than 100 instances exists, a continuation token is returned in the response header which is called x-ms-continuation-token. Please see: https://learn.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-http-api#response-2 If we set the continuation token in the next request header, we get the next 100 instances. This is essentially what we want to do in this patch: to supply x-ms-continuation-token if it is given from the API. Please note that in DurableOrchestrationClient, I have used "response.header" while this attribute does not exist in the "response" object. Adding it is beyond my knowledge and requires maintainer's attention. --- .../models/DurableOrchestrationClient.py | 38 ++++++++++++------- .../models/utils/http_utils.py | 8 ++-- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/azure/durable_functions/models/DurableOrchestrationClient.py b/azure/durable_functions/models/DurableOrchestrationClient.py index 6b473c2c..7825171d 100644 --- a/azure/durable_functions/models/DurableOrchestrationClient.py +++ b/azure/durable_functions/models/DurableOrchestrationClient.py @@ -311,20 +311,30 @@ async def get_status_all(self) -> List[DurableOrchestrationStatus]: """ options = RpcManagementOptions() request_url = options.to_url(self._orchestration_bindings.rpc_base_url) - response = await self._get_async_request(request_url) - switch_statement = { - 200: lambda: None, # instance completed - } - - has_error_message = switch_statement.get( - response[0], - lambda: f"The operation failed with an unexpected status code {response[0]}") - error_message = has_error_message() - if error_message: - raise Exception(error_message) - else: - statuses: List[Any] = response[1] - return [DurableOrchestrationStatus.from_json(o) for o in statuses] + all_statuses: List[DurableOrchestrationStatus] = [] + + continuation_token = None + while True: + headers = {} + if continuation_token: + headers['x-ms-continuation-token'] = continuation_token + response = await self._get_async_request(request_url, headers=headers) + switch_statement = { + 200: lambda: None, # instance completed + } + has_error_message = switch_statement.get( + response[0], + lambda: f"The operation failed with an unexpected status code {response[0]}") + error_message = has_error_message() + if error_message: + raise Exception(error_message) + else: + statuses: List[Any] = response[1] + all_statuses.extend([DurableOrchestrationStatus.from_json(o) for o in statuses]) + continuation_token = response.headers.get('x-ms-continuation-token') + if not continuation_token: + break + return all_statuses async def get_status_by(self, created_time_from: datetime = None, created_time_to: datetime = None, diff --git a/azure/durable_functions/models/utils/http_utils.py b/azure/durable_functions/models/utils/http_utils.py index e45cef68..d2bfc6fe 100644 --- a/azure/durable_functions/models/utils/http_utils.py +++ b/azure/durable_functions/models/utils/http_utils.py @@ -1,4 +1,4 @@ -from typing import Any, List, Union +from typing import Any, List, Union, Dict, Optional import aiohttp @@ -29,20 +29,22 @@ async def post_async_request(url: str, data: Any = None) -> List[Union[int, Any] return [response.status, data] -async def get_async_request(url: str) -> List[Any]: +async def get_async_request(url: str, headers: Dict[Any, Any] = None) -> List[Any]: """Get the data from the url provided. Parameters ---------- url: str url to get the data from + headers: Dict[str, str] + headers to send with the request Returns ------- [int, Any] Tuple with the Response status code and the data returned from the request """ - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(headers=headers) as session: async with session.get(url) as response: data = await response.json(content_type=None) if data is None: