diff --git a/.pylintrc b/.pylintrc index a134068ff..d0ec3b74a 100644 --- a/.pylintrc +++ b/.pylintrc @@ -158,7 +158,9 @@ disable=print-statement, too-many-return-statements, import-error, no-name-in-module, - too-many-branches + too-many-branches, + too-many-ancestors, + too-many-nested-blocks # Enable the message, report, category or checker with the given id(s). You can # either give multiple identifier separated by comma (,) or put this option diff --git a/libraries/botbuilder-core/botbuilder/core/bot_framework_adapter.py b/libraries/botbuilder-core/botbuilder/core/bot_framework_adapter.py index e18297b53..08ffc659e 100644 --- a/libraries/botbuilder-core/botbuilder/core/bot_framework_adapter.py +++ b/libraries/botbuilder-core/botbuilder/core/bot_framework_adapter.py @@ -1282,6 +1282,16 @@ async def exchange_token_from_credentials( ) raise TypeError(f"exchange token returned improper result: {type(result)}") + def can_process_outgoing_activity( + self, activity: Activity # pylint: disable=unused-argument + ) -> bool: + return False + + async def process_outgoing_activity( + self, turn_context: TurnContext, activity: Activity + ) -> ResourceResponse: + raise Exception("NotImplemented") + @staticmethod def key_for_connector_client(service_url: str, app_id: str, scope: str): return f"{service_url if service_url else ''}:{app_id if app_id else ''}:{scope if scope else ''}" diff --git a/libraries/botbuilder-core/botbuilder/core/integration/aiohttp_channel_service_exception_middleware.py b/libraries/botbuilder-core/botbuilder/core/integration/aiohttp_channel_service_exception_middleware.py index 7c5091121..d58073d06 100644 --- a/libraries/botbuilder-core/botbuilder/core/integration/aiohttp_channel_service_exception_middleware.py +++ b/libraries/botbuilder-core/botbuilder/core/integration/aiohttp_channel_service_exception_middleware.py @@ -1,6 +1,8 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. +import traceback + from aiohttp.web import ( middleware, HTTPNotImplemented, @@ -26,4 +28,5 @@ async def aiohttp_error_middleware(request, handler): except KeyError: raise HTTPNotFound() except Exception: + traceback.print_exc() raise HTTPInternalServerError() diff --git a/libraries/botbuilder-core/botbuilder/core/streaming/__init__.py b/libraries/botbuilder-core/botbuilder/core/streaming/__init__.py new file mode 100644 index 000000000..b92ba04be --- /dev/null +++ b/libraries/botbuilder-core/botbuilder/core/streaming/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from .bot_framework_http_adapter_base import BotFrameworkHttpAdapterBase +from .streaming_activity_processor import StreamingActivityProcessor +from .streaming_request_handler import StreamingRequestHandler +from .version_info import VersionInfo + +__all__ = [ + "BotFrameworkHttpAdapterBase", + "StreamingActivityProcessor", + "StreamingRequestHandler", + "VersionInfo", +] diff --git a/libraries/botbuilder-core/botbuilder/core/streaming/bot_framework_http_adapter_base.py b/libraries/botbuilder-core/botbuilder/core/streaming/bot_framework_http_adapter_base.py new file mode 100644 index 000000000..86bd9246a --- /dev/null +++ b/libraries/botbuilder-core/botbuilder/core/streaming/bot_framework_http_adapter_base.py @@ -0,0 +1,125 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from http import HTTPStatus +from typing import Awaitable, Callable, List + +from botbuilder.core import ( + Bot, + BotFrameworkAdapter, + BotFrameworkAdapterSettings, + InvokeResponse, + TurnContext, +) +from botbuilder.schema import Activity, ActivityTypes, ResourceResponse +from botframework.connector import AsyncBfPipeline, BotFrameworkConnectorConfiguration +from botframework.connector.aio import ConnectorClient +from botframework.connector.auth import ( + ClaimsIdentity, + MicrosoftAppCredentials, + MicrosoftGovernmentAppCredentials, +) + +from .streaming_activity_processor import StreamingActivityProcessor +from .streaming_request_handler import StreamingRequestHandler +from .streaming_http_client import StreamingHttpDriver + + +class BotFrameworkHttpAdapterBase(BotFrameworkAdapter, StreamingActivityProcessor): + # pylint: disable=pointless-string-statement + def __init__(self, settings: BotFrameworkAdapterSettings): + super().__init__(settings) + + self.connected_bot: Bot = None + self.claims_identity: ClaimsIdentity = None + self.request_handlers: List[StreamingRequestHandler] = None + + async def process_streaming_activity( + self, + activity: Activity, + bot_callback_handler: Callable[[TurnContext], Awaitable], + ) -> InvokeResponse: + if not activity: + raise TypeError( + f"'activity: {activity.__class__.__name__}' argument can't be None" + ) + + """ + If a conversation has moved from one connection to another for the same Channel or Skill and + hasn't been forgotten by the previous StreamingRequestHandler. The last requestHandler + the conversation has been associated with should always be the active connection. + """ + request_handler = [ + handler + for handler in self.request_handlers + if handler.service_url == activity.service_url + and handler.has_conversation(activity.conversation.id) + ] + request_handler = request_handler[-1] if request_handler else None + context = TurnContext(self, activity) + + if self.claims_identity: + context.turn_state[self.BOT_IDENTITY_KEY] = self.claims_identity + + connector_client = self._create_streaming_connector_client( + activity, request_handler + ) + context.turn_state[self.BOT_CONNECTOR_CLIENT_KEY] = connector_client + + await self.run_pipeline(context, bot_callback_handler) + + if activity.type == ActivityTypes.invoke: + activity_invoke_response = context.turn_state.get(self._INVOKE_RESPONSE_KEY) + + if not activity_invoke_response: + return InvokeResponse(status=HTTPStatus.NOT_IMPLEMENTED) + return activity_invoke_response.value + + return None + + async def send_streaming_activity(self, activity: Activity) -> ResourceResponse: + raise NotImplementedError() + + def can_process_outgoing_activity(self, activity: Activity) -> bool: + if not activity: + raise TypeError( + f"'activity: {activity.__class__.__name__}' argument can't be None" + ) + + return not activity.service_url.startswith("https") + + async def process_outgoing_activity( + self, turn_context: TurnContext, activity: Activity + ) -> ResourceResponse: + if not activity: + raise TypeError( + f"'activity: {activity.__class__.__name__}' argument can't be None" + ) + + # TODO: Check if we have token responses from OAuth cards. + + # The ServiceUrl for streaming channels begins with the string "urn" and contains + # information unique to streaming connections. Now that we know that this is a streaming + # activity, process it in the streaming pipeline. + # Process streaming activity. + return await self.send_streaming_activity(activity) + + def _create_streaming_connector_client( + self, activity: Activity, request_handler: StreamingRequestHandler + ) -> ConnectorClient: + empty_credentials = ( + MicrosoftAppCredentials.empty() + if self._channel_provider and self._channel_provider.is_government() + else MicrosoftGovernmentAppCredentials.empty() + ) + streaming_driver = StreamingHttpDriver(request_handler) + config = BotFrameworkConnectorConfiguration( + empty_credentials, + activity.service_url, + pipeline_type=AsyncBfPipeline, + driver=streaming_driver, + ) + streaming_driver.config = config + connector_client = ConnectorClient(None, custom_configuration=config) + + return connector_client diff --git a/libraries/botbuilder-core/botbuilder/core/streaming/streaming_activity_processor.py b/libraries/botbuilder-core/botbuilder/core/streaming/streaming_activity_processor.py new file mode 100644 index 000000000..6b6f16893 --- /dev/null +++ b/libraries/botbuilder-core/botbuilder/core/streaming/streaming_activity_processor.py @@ -0,0 +1,21 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from abc import ABC +from typing import Awaitable, Callable + +from botbuilder.core import TurnContext, InvokeResponse +from botbuilder.schema import Activity + + +class StreamingActivityProcessor(ABC): + """ + Process streaming activities. + """ + + async def process_streaming_activity( + self, + activity: Activity, + bot_callback_handler: Callable[[TurnContext], Awaitable], + ) -> InvokeResponse: + raise NotImplementedError() diff --git a/libraries/botbuilder-core/botbuilder/core/streaming/streaming_http_client.py b/libraries/botbuilder-core/botbuilder/core/streaming/streaming_http_client.py new file mode 100644 index 000000000..9d80e63e6 --- /dev/null +++ b/libraries/botbuilder-core/botbuilder/core/streaming/streaming_http_client.py @@ -0,0 +1,94 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from http import HTTPStatus +from logging import Logger +from typing import Any + +from msrest.universal_http import ClientRequest +from msrest.universal_http.async_abc import AsyncClientResponse +from msrest.universal_http.async_requests import ( + AsyncRequestsHTTPSender as AsyncRequestsHTTPDriver, +) +from botframework.streaming import StreamingRequest, ReceiveResponse + +from .streaming_request_handler import StreamingRequestHandler + + +class StreamingProtocolClientResponse(AsyncClientResponse): + def __init__( + self, request: StreamingRequest, streaming_response: ReceiveResponse + ) -> None: + super(StreamingProtocolClientResponse, self).__init__( + request, streaming_response + ) + # https://aiohttp.readthedocs.io/en/stable/client_reference.html#aiohttp.ClientResponse + self.status_code = streaming_response.status_code + # self.headers = streaming_response.headers + # self.reason = streaming_response.reason + self._body = None + + def body(self) -> bytes: + """Return the whole body as bytes in memory. + """ + if not self._body: + return bytes([]) + return self._body + + async def load_body(self) -> None: + """Load in memory the body, so it could be accessible from sync methods.""" + self._body: ReceiveResponse + self._body = self.internal_response.read_body() + + def raise_for_status(self): + if 400 <= self.internal_response.status_code <= 599: + raise Exception(f"Http error: {self.internal_response.status_code}") + + +class StreamingHttpDriver(AsyncRequestsHTTPDriver): + def __init__( + self, + request_handler: StreamingRequestHandler, + *, + config=None, + logger: Logger = None, + ): + super().__init__(config) + if not request_handler: + raise TypeError( + f"'request_handler: {request_handler.__class__.__name__}' argument can't be None" + ) + self._request_handler = request_handler + self._logger = logger + + async def send( + self, request: ClientRequest, **config: Any # pylint: disable=unused-argument + ) -> AsyncClientResponse: + # TODO: validate form of request to perform operations + streaming_request = StreamingRequest( + path=request.url[request.url.index("v3/") :], verb=request.method + ) + streaming_request.set_body(request.data) + + return await self._send_request(streaming_request) + + async def _send_request( + self, request: StreamingRequest + ) -> StreamingProtocolClientResponse: + try: + server_response = await self._request_handler.send_streaming_request( + request + ) + + if not server_response: + raise Exception("Server response from streaming request is None") + + if server_response.status_code == HTTPStatus.OK: + # TODO: this should be an object read from json + + return StreamingProtocolClientResponse(request, server_response) + except Exception as error: + # TODO: log error + raise error + + return None diff --git a/libraries/botbuilder-core/botbuilder/core/streaming/streaming_request_handler.py b/libraries/botbuilder-core/botbuilder/core/streaming/streaming_request_handler.py new file mode 100644 index 000000000..9b9ac4e8d --- /dev/null +++ b/libraries/botbuilder-core/botbuilder/core/streaming/streaming_request_handler.py @@ -0,0 +1,275 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import platform +import traceback +from http import HTTPStatus +from datetime import datetime +from logging import Logger +from json import loads +from typing import Dict, List + +from botbuilder.core import Bot +from botbuilder.schema import Activity, Attachment, ResourceResponse +from botframework.streaming import ( + RequestHandler, + ReceiveRequest, + ReceiveResponse, + StreamingRequest, + StreamingResponse, + __title__, + __version__, +) +from botframework.streaming.transport import DisconnectedEventArgs +from botframework.streaming.transport.web_socket import WebSocket, WebSocketServer + +from .streaming_activity_processor import StreamingActivityProcessor +from .version_info import VersionInfo + + +class StreamContent: + def __init__(self, stream: List[int], *, headers: Dict[str, str] = None): + self.stream = stream + self.headers: Dict[str, str] = headers if headers is not None else {} + + +class StreamingRequestHandler(RequestHandler): + def __init__( + self, + bot: Bot, + activity_processor: StreamingActivityProcessor, + web_socket: WebSocket, + logger: Logger = None, + ): + if not bot: + raise TypeError(f"'bot: {bot.__class__.__name__}' argument can't be None") + if not activity_processor: + raise TypeError( + f"'activity_processor: {activity_processor.__class__.__name__}' argument can't be None" + ) + + self._bot = bot + self._activity_processor = activity_processor + self._logger = logger + self._conversations: Dict[str, datetime] = {} + self._user_agent = StreamingRequestHandler._get_user_agent() + self._server = WebSocketServer(web_socket, self) + self._server_is_connected = True + self._server.disconnected_event_handler = self._server_disconnected + self._service_url: str = None + + @property + def service_url(self) -> str: + return self._service_url + + async def listen(self): + await self._server.start() + + # TODO: log it + + def has_conversation(self, conversation_id: str) -> bool: + return conversation_id in self._conversations + + def conversation_added_time(self, conversation_id: str) -> datetime: + added_time = self._conversations.get(conversation_id) + + if not added_time: + added_time = datetime.min + + return added_time + + def forget_conversation(self, conversation_id: str): + del self._conversations[conversation_id] + + async def process_request( + self, request: ReceiveRequest, logger: Logger, context: object + ) -> StreamingResponse: + # pylint: disable=pointless-string-statement + response = StreamingResponse() + + # We accept all POSTs regardless of path, but anything else requires special treatment. + if not request.verb == StreamingRequest.POST: + return self._handle_custom_paths(request, response) + + # Convert the StreamingRequest into an activity the adapter can understand. + try: + body_str = await request.read_body_as_str() + except Exception as error: + traceback.print_exc() + response.status_code = int(HTTPStatus.BAD_REQUEST) + # TODO: log error + + return response + + try: + # TODO: validate if should use deserialize or from_dict + body_dict = loads(body_str) + activity: Activity = Activity.deserialize(body_dict) + + # All activities received by this StreamingRequestHandler will originate from the same channel, but we won't + # know what that channel is until we've received the first request. + if not self.service_url: + self._service_url = activity.service_url + + # If this is the first time the handler has seen this conversation it needs to be added to the dictionary so + # the adapter is able to route requests to the correct handler. + if not self.has_conversation(activity.conversation.id): + self._conversations[activity.conversation.id] = datetime.now() + + """ + Any content sent as part of a StreamingRequest, including the request body + and inline attachments, appear as streams added to the same collection. The first + stream of any request will be the body, which is parsed and passed into this method + as the first argument, 'body'. Any additional streams are inline attachments that need + to be iterated over and added to the Activity as attachments to be sent to the Bot. + """ + + if len(request.streams) > 1: + stream_attachments = [ + Attachment(content_type=stream.content_type, content=stream.stream) + for stream in request.streams + ] + + if activity.attachments: + activity.attachments += stream_attachments + else: + activity.attachments = stream_attachments + + # Now that the request has been converted into an activity we can send it to the adapter. + adapter_response = await self._activity_processor.process_streaming_activity( + activity, self._bot.on_turn + ) + + # Now we convert the invokeResponse returned by the adapter into a StreamingResponse we can send back + # to the channel. + if not adapter_response: + response.status_code = int(HTTPStatus.OK) + else: + response.status_code = adapter_response.status + if adapter_response.body: + response.set_body(adapter_response.body) + + except Exception as error: + traceback.print_exc() + response.status_code = int(HTTPStatus.INTERNAL_SERVER_ERROR) + response.set_body(str(error)) + # TODO: log error + + return response + + async def send_activity(self, activity: Activity) -> ResourceResponse: + if activity.reply_to_id: + request_path = ( + f"/v3/conversations/{activity.conversation.id if activity.conversation else ''}/" + f"activities/{activity. reply_to_id}" + ) + else: + request_path = f"/v3/conversations/{activity.conversation.id if activity.conversation else ''}/activities" + + stream_attachments = self._update_attachment_streams(activity) + request = StreamingRequest.create_post(request_path) + request.set_body(activity) + if stream_attachments: + for attachment in stream_attachments: + # TODO: might be necessary to serialize this before adding + request.add_stream(attachment) + + try: + if not self._server_is_connected: + raise Exception( + "Error while attempting to send: Streaming transport is disconnected." + ) + + server_response = await self._server.send(request) + + if server_response.status_code == HTTPStatus.OK: + return server_response.read_body_as_json(ResourceResponse) + except Exception: + # TODO: log error + traceback.print_exc() + + return None + + async def send_streaming_request( + self, request: StreamingRequest + ) -> ReceiveResponse: + try: + if not self._server_is_connected: + raise Exception( + "Error while attempting to send: Streaming transport is disconnected." + ) + + return await self._server.send(request) + except Exception: + # TODO: remove printing and log it + traceback.print_exc() + + return None + + @staticmethod + def _get_user_agent() -> str: + package_user_agent = f"{__title__}/{__version__}" + uname = platform.uname() + os_version = f"{uname.machine}-{uname.system}-{uname.version}" + py_version = f"Python,Version={platform.python_version()}" + platform_user_agent = f"({os_version}; {py_version})" + user_agent = f"{package_user_agent} {platform_user_agent}" + return user_agent + + def _update_attachment_streams(self, activity: Activity) -> List[object]: + if not activity or not activity.attachments: + return None + + def validate_int_list(obj: object) -> bool: + if not isinstance(obj, list): + return False + + return all(isinstance(element, int) for element in obj) + + stream_attachments = [ + attachment + for attachment in activity.attachments + if validate_int_list(attachment.content) + ] + + if stream_attachments: + activity.attachments = [ + attachment + for attachment in activity.attachments + if not validate_int_list(attachment.content) + ] + + # TODO: validate StreamContent parallel + return [ + StreamContent( + attachment.content, + headers={"Content-Type": attachment.content_type}, + ) + for attachment in stream_attachments + ] + + return None + + def _server_disconnected( + self, + sender: object, # pylint: disable=unused-argument + event: DisconnectedEventArgs, # pylint: disable=unused-argument + ): + self._server_is_connected = False + + def _handle_custom_paths( + self, request: ReceiveRequest, response: StreamingResponse + ) -> StreamingResponse: + if not request or not request.verb or not request.path: + response.status_code = int(HTTPStatus.BAD_REQUEST) + # TODO: log error + + return response + + if request.verb == StreamingRequest.GET and request.path == "/api/version": + response.status_code = int(HTTPStatus.OK) + response.set_body(VersionInfo(user_agent=self._user_agent)) + + return response + + return None diff --git a/libraries/botbuilder-core/botbuilder/core/streaming/version_info.py b/libraries/botbuilder-core/botbuilder/core/streaming/version_info.py new file mode 100644 index 000000000..b11250375 --- /dev/null +++ b/libraries/botbuilder-core/botbuilder/core/streaming/version_info.py @@ -0,0 +1,22 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import json + +from botframework.streaming.payloads.models import Serializable + + +class VersionInfo(Serializable): + def __init__(self, *, user_agent: str = None): + self.user_agent = user_agent + + def to_json(self) -> str: + obj = {"userAgent": self.user_agent} + + return json.dumps(obj) + + def from_json(self, json_str: str) -> "ResponsePayload": + obj = json.loads(json_str) + + self.user_agent = obj.get("userAgent") + return self diff --git a/libraries/botbuilder-core/setup.py b/libraries/botbuilder-core/setup.py index 964c129e8..3d11adef8 100644 --- a/libraries/botbuilder-core/setup.py +++ b/libraries/botbuilder-core/setup.py @@ -37,6 +37,7 @@ "botbuilder.core.inspection", "botbuilder.core.integration", "botbuilder.core.skills", + "botbuilder.core.streaming", "botbuilder.core.teams", "botbuilder.core.oauth", ], diff --git a/libraries/botbuilder-core/tests/streaming/test_streaming_request_handler.py b/libraries/botbuilder-core/tests/streaming/test_streaming_request_handler.py new file mode 100644 index 000000000..39e6802a6 --- /dev/null +++ b/libraries/botbuilder-core/tests/streaming/test_streaming_request_handler.py @@ -0,0 +1,55 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from unittest.mock import Mock +from typing import Any + +import aiounittest + +from botbuilder.core.streaming import StreamingRequestHandler +from botframework.streaming.transport.web_socket import ( + WebSocket, + WebSocketState, + WebSocketCloseStatus, + WebSocketMessage, + WebSocketMessageType, +) + + +class MockWebSocket(WebSocket): + def __init__(self): + super(MockWebSocket, self).__init__() + + self.receive_called = False + + def dispose(self): + return + + async def close(self, close_status: WebSocketCloseStatus, status_description: str): + return + + async def receive(self) -> WebSocketMessage: + self.receive_called = True + + async def send( + self, buffer: Any, message_type: WebSocketMessageType, end_of_message: bool + ): + raise Exception + + @property + def status(self) -> WebSocketState: + return WebSocketState.OPEN + + +class TestStramingRequestHandler(aiounittest.AsyncTestCase): + async def test_listen(self): + mock_bot = Mock() + mock_activity_processor = Mock() + mock_web_socket = MockWebSocket() + + sut = StreamingRequestHandler( + mock_bot, mock_activity_processor, mock_web_socket + ) + await sut.listen() + + assert mock_web_socket.receive_called diff --git a/libraries/botbuilder-integration-aiohttp/botbuilder/integration/aiohttp/__init__.py b/libraries/botbuilder-integration-aiohttp/botbuilder/integration/aiohttp/__init__.py index 1bb31e665..d43aa50fa 100644 --- a/libraries/botbuilder-integration-aiohttp/botbuilder/integration/aiohttp/__init__.py +++ b/libraries/botbuilder-integration-aiohttp/botbuilder/integration/aiohttp/__init__.py @@ -8,9 +8,11 @@ from .aiohttp_channel_service import aiohttp_channel_service_routes from .aiohttp_channel_service_exception_middleware import aiohttp_error_middleware from .bot_framework_http_client import BotFrameworkHttpClient +from .bot_framework_http_adapter import BotFrameworkHttpAdapter __all__ = [ "aiohttp_channel_service_routes", "aiohttp_error_middleware", "BotFrameworkHttpClient", + "BotFrameworkHttpAdapter", ] diff --git a/libraries/botbuilder-integration-aiohttp/botbuilder/integration/aiohttp/aiohttp_channel_service_exception_middleware.py b/libraries/botbuilder-integration-aiohttp/botbuilder/integration/aiohttp/aiohttp_channel_service_exception_middleware.py index 7c5091121..40b0d105d 100644 --- a/libraries/botbuilder-integration-aiohttp/botbuilder/integration/aiohttp/aiohttp_channel_service_exception_middleware.py +++ b/libraries/botbuilder-integration-aiohttp/botbuilder/integration/aiohttp/aiohttp_channel_service_exception_middleware.py @@ -3,6 +3,7 @@ from aiohttp.web import ( middleware, + HTTPError, HTTPNotImplemented, HTTPUnauthorized, HTTPNotFound, @@ -25,5 +26,8 @@ async def aiohttp_error_middleware(request, handler): raise HTTPUnauthorized() except KeyError: raise HTTPNotFound() + except HTTPError as error: + # In the case the integration adapter raises a specific HTTPError + raise error except Exception: raise HTTPInternalServerError() diff --git a/libraries/botbuilder-integration-aiohttp/botbuilder/integration/aiohttp/bot_framework_http_adapter.py b/libraries/botbuilder-integration-aiohttp/botbuilder/integration/aiohttp/bot_framework_http_adapter.py new file mode 100644 index 000000000..da1b7c3c3 --- /dev/null +++ b/libraries/botbuilder-integration-aiohttp/botbuilder/integration/aiohttp/bot_framework_http_adapter.py @@ -0,0 +1,205 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from typing import Optional + +from aiohttp import ClientSession +from aiohttp.web import ( + Request, + Response, + json_response, + WebSocketResponse, + HTTPBadRequest, + HTTPUnauthorized, + HTTPUnsupportedMediaType, +) +from botbuilder.core import Bot, BotFrameworkAdapterSettings +from botbuilder.core.streaming import ( + BotFrameworkHttpAdapterBase, + StreamingRequestHandler, +) +from botbuilder.schema import Activity, ResourceResponse +from botbuilder.integration.aiohttp.streaming import AiohttpWebSocket +from botframework.connector.auth import AuthenticationConstants, JwtTokenValidation + + +class BotFrameworkHttpAdapter(BotFrameworkHttpAdapterBase): + def __init__(self, settings: BotFrameworkAdapterSettings): + # pylint: disable=invalid-name + super().__init__(settings) + + self._AUTH_HEADER_NAME = "authorization" + self._CHANNEL_ID_HEADER_NAME = "channelid" + + async def process( + self, request: Request, ws_response: WebSocketResponse, bot: Bot + ) -> Optional[Response]: + # TODO: maybe it's not necessary to expose the ws_response + if not request: + raise TypeError("request can't be None") + # if ws_response is None: + # raise TypeError("ws_response can't be None") + if not bot: + raise TypeError("bot can't be None") + + if request.method == "GET": + await self._connect_web_socket(bot, request, ws_response) + else: + # Deserialize the incoming Activity + if "application/json" in request.headers["Content-Type"]: + body = await request.json() + else: + raise HTTPUnsupportedMediaType() + + activity = Activity().deserialize(body) + auth_header = ( + request.headers["Authorization"] + if "Authorization" in request.headers + else "" + ) + + # Process the inbound activity with the bot + invoke_response = await self.process_activity( + activity, auth_header, bot.on_turn + ) + if invoke_response: + return json_response( + data=invoke_response.body, status=invoke_response.status + ) + return Response(status=201) + + async def send_streaming_activity(self, activity: Activity) -> ResourceResponse: + # Check to see if any of this adapter's StreamingRequestHandlers is associated with this conversation. + possible_handlers = [ + handler + for handler in self.request_handlers + if handler.service_url == activity.service_url + and handler.has_conversation(activity.conversation.id) + ] + + if possible_handlers: + if len(possible_handlers) > 1: + # The conversation has moved to a new connection and the former + # StreamingRequestHandler needs to be told to forget about it. + possible_handlers.sort( + key=lambda handler: handler.conversation_added_time( + activity.conversation.id + ) + ) + correct_handler = possible_handlers[-1] + for handler in possible_handlers: + if handler is not correct_handler: + handler.forget_conversation(activity.conversation.id) + + return await correct_handler.send_activity(activity) + + return await possible_handlers[0].send_activity(activity) + + if self.connected_bot: + # This is a proactive message that will need a new streaming connection opened. + # The ServiceUrl of a streaming connection follows the pattern "url:[ChannelName]:[Protocol]:[Host]". + + uri = activity.service_url.split(":") + protocol = uri[len(uri) - 2] + host = uri[len(uri) - 1] + # TODO: discuss if should abstract this from current package + # TODO: manage life cycle of sessions (when should we close them) + session = ClientSession() + aiohttp_ws = await session.ws_connect(protocol + host + "/api/messages") + web_socket = AiohttpWebSocket(aiohttp_ws, session) + handler = StreamingRequestHandler(self.connected_bot, self, web_socket) + + if self.request_handlers is None: + self.request_handlers = [] + + self.request_handlers.append(handler) + + return await handler.send_activity(activity) + + return None + + async def _connect_web_socket( + self, bot: Bot, request: Request, ws_response: WebSocketResponse + ): + if not request: + raise TypeError("request can't be None") + if ws_response is None: + raise TypeError("ws_response can't be None") + + if not bot: + raise TypeError(f"'bot: {bot.__class__.__name__}' argument can't be None") + + if not ws_response.can_prepare(request): + raise HTTPBadRequest(text="Upgrade to WebSocket is required.") + + if not await self._http_authenticate_request(request): + raise HTTPUnauthorized(text="Request authentication failed.") + + try: + await ws_response.prepare(request) + + bf_web_socket = AiohttpWebSocket(ws_response) + + request_handler = StreamingRequestHandler(bot, self, bf_web_socket) + + if self.request_handlers is None: + self.request_handlers = [] + + self.request_handlers.append(request_handler) + + await request_handler.listen() + except Exception as error: + import traceback # pylint: disable=import-outside-toplevel + + traceback.print_exc() + raise Exception(f"Unable to create transport server. Error: {str(error)}") + + async def _http_authenticate_request(self, request: Request) -> bool: + # pylint: disable=no-member + try: + if not await self._credential_provider.is_authentication_disabled(): + auth_header = request.headers.get(self._AUTH_HEADER_NAME) + channel_id = request.headers.get(self._CHANNEL_ID_HEADER_NAME) + + if not auth_header: + await self._write_unauthorized_response(self._AUTH_HEADER_NAME) + return False + if not channel_id: + await self._write_unauthorized_response( + self._CHANNEL_ID_HEADER_NAME + ) + return False + + claims_identity = await JwtTokenValidation.validate_auth_header( + auth_header, + self._credential_provider, + self._channel_provider, + channel_id, + ) + + if not claims_identity.is_authenticated: + raise HTTPUnauthorized() + + self._credentials = ( + self._credentials + or await self._BotFrameworkAdapter__get_app_credentials( + self.settings.app_id, + AuthenticationConstants.TO_CHANNEL_FROM_BOT_OAUTH_SCOPE, + ) + ) + + # Add ServiceURL to the cache of trusted sites in order to allow token refreshing. + self._credentials.trust_service_url( + claims_identity.claims.get( + AuthenticationConstants.SERVICE_URL_CLAIM + ) + ) + self.claims_identity = claims_identity + return True + except Exception as error: + raise error + + async def _write_unauthorized_response(self, header_name: str): + raise HTTPUnauthorized( + text=f"Unable to authenticate. Missing header: {header_name}" + ) diff --git a/libraries/botbuilder-integration-aiohttp/botbuilder/integration/aiohttp/streaming/__init__.py b/libraries/botbuilder-integration-aiohttp/botbuilder/integration/aiohttp/streaming/__init__.py new file mode 100644 index 000000000..4d380bf47 --- /dev/null +++ b/libraries/botbuilder-integration-aiohttp/botbuilder/integration/aiohttp/streaming/__init__.py @@ -0,0 +1,8 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from .aiohttp_web_socket import AiohttpWebSocket + +__all__ = [ + "AiohttpWebSocket", +] diff --git a/libraries/botbuilder-integration-aiohttp/botbuilder/integration/aiohttp/streaming/aiohttp_web_socket.py b/libraries/botbuilder-integration-aiohttp/botbuilder/integration/aiohttp/streaming/aiohttp_web_socket.py new file mode 100644 index 000000000..334c637fb --- /dev/null +++ b/libraries/botbuilder-integration-aiohttp/botbuilder/integration/aiohttp/streaming/aiohttp_web_socket.py @@ -0,0 +1,79 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import asyncio +import traceback + +from typing import Any, Optional, Union + +from aiohttp import ClientWebSocketResponse, WSMsgType, ClientSession +from aiohttp.web import WebSocketResponse + +from botframework.streaming.transport.web_socket import ( + WebSocket, + WebSocketMessage, + WebSocketCloseStatus, + WebSocketMessageType, + WebSocketState, +) + + +class AiohttpWebSocket(WebSocket): + def __init__( + self, + aiohttp_ws: Union[WebSocketResponse, ClientWebSocketResponse], + session: Optional[ClientSession] = None, + ): + self._aiohttp_ws = aiohttp_ws + self._session = session + + def dispose(self): + if self._session: + asyncio.create_task(self._session.close()) + + async def close(self, close_status: WebSocketCloseStatus, status_description: str): + await self._aiohttp_ws.close( + code=int(close_status), message=status_description.encode("utf8") + ) + + async def receive(self) -> WebSocketMessage: + try: + message = await self._aiohttp_ws.receive() + + if message.type == WSMsgType.TEXT: + message_data = list(str(message.data).encode("ascii")) + elif message.type == WSMsgType.BINARY: + message_data = list(message.data) + elif isinstance(message.data, int): + message_data = [] + + # async for message in self._aiohttp_ws: + return WebSocketMessage( + message_type=WebSocketMessageType(int(message.type)), data=message_data + ) + except Exception as error: + traceback.print_exc() + raise error + + async def send( + self, buffer: Any, message_type: WebSocketMessageType, end_of_message: bool + ): + is_closing = self._aiohttp_ws.closed + try: + if message_type == WebSocketMessageType.BINARY: + # TODO: The clening buffer line should be removed, just for bypassing bug in POC + clean_buffer = bytes([byte for byte in buffer if byte is not None]) + await self._aiohttp_ws.send_bytes(clean_buffer) + elif message_type == WebSocketMessageType.TEXT: + await self._aiohttp_ws.send_str(buffer) + else: + raise RuntimeError( + f"AiohttpWebSocket - message_type: {message_type} currently not supported" + ) + except Exception as error: + traceback.print_exc() + raise error + + @property + def status(self) -> WebSocketState: + return WebSocketState.CLOSED if self._aiohttp_ws.closed else WebSocketState.OPEN diff --git a/libraries/botbuilder-integration-aiohttp/setup.py b/libraries/botbuilder-integration-aiohttp/setup.py index 69605aadd..feb556f4a 100644 --- a/libraries/botbuilder-integration-aiohttp/setup.py +++ b/libraries/botbuilder-integration-aiohttp/setup.py @@ -42,6 +42,7 @@ packages=[ "botbuilder.integration.aiohttp", "botbuilder.integration.aiohttp.skills", + "botbuilder.integration.aiohttp.streaming", ], install_requires=REQUIRES, classifiers=[ diff --git a/libraries/botframework-connector/botframework/connector/__init__.py b/libraries/botframework-connector/botframework/connector/__init__.py index 519f0ab2e..cea241543 100644 --- a/libraries/botframework-connector/botframework/connector/__init__.py +++ b/libraries/botframework-connector/botframework/connector/__init__.py @@ -10,6 +10,16 @@ from .emulator_api_client import EmulatorApiClient from .version import VERSION -__all__ = ["Channels", "ConnectorClient", "EmulatorApiClient"] +# TODO: Experimental +from .aiohttp_bf_pipeline import AsyncBfPipeline +from .bot_framework_sdk_client_async import BotFrameworkConnectorConfiguration + +__all__ = [ + "AsyncBfPipeline", + "Channels", + "ConnectorClient", + "EmulatorApiClient", + "BotFrameworkConnectorConfiguration", +] __version__ = VERSION diff --git a/libraries/botframework-connector/botframework/connector/aio/_connector_client_async.py b/libraries/botframework-connector/botframework/connector/aio/_connector_client_async.py index 73cebfb07..86a303d51 100644 --- a/libraries/botframework-connector/botframework/connector/aio/_connector_client_async.py +++ b/libraries/botframework-connector/botframework/connector/aio/_connector_client_async.py @@ -5,16 +5,26 @@ # license information. # -------------------------------------------------------------------------- -from msrest.async_client import SDKClientAsync +from typing import Optional, Type + +from msrest.universal_http.async_abc import AsyncHTTPSender as AsyncHttpDriver +from msrest.pipeline.aiohttp import AsyncHTTPSender +from msrest.async_client import AsyncPipeline from msrest import Serializer, Deserializer -from .._configuration import ConnectorClientConfiguration from .operations_async import AttachmentsOperations from .operations_async import ConversationsOperations from .. import models -class ConnectorClient(SDKClientAsync): +# TODO: experimental +from ..bot_framework_sdk_client_async import ( + BotFrameworkSDKClientAsync, + BotFrameworkConnectorConfiguration, +) + + +class ConnectorClient(BotFrameworkSDKClientAsync): """The Bot Connector REST API allows your bot to send and receive messages to channels configured in the [Bot Framework Developer Portal](https://dev.botframework.com). The Connector service uses industry-standard REST and JSON over HTTPS. @@ -45,9 +55,26 @@ class ConnectorClient(SDKClientAsync): :param str base_url: Service URL """ - def __init__(self, credentials, base_url=None): - - self.config = ConnectorClientConfiguration(credentials, base_url) + def __init__( + self, + credentials, + base_url=None, + *, + pipeline_type: Optional[Type[AsyncPipeline]] = None, + sender: Optional[AsyncHTTPSender] = None, + driver: Optional[AsyncHttpDriver] = None, + custom_configuration: [BotFrameworkConnectorConfiguration] = None, + ): + if custom_configuration: + self.config = custom_configuration + else: + self.config = BotFrameworkConnectorConfiguration( + credentials, + base_url, + pipeline_type=pipeline_type, + sender=sender, + driver=driver, + ) super(ConnectorClient, self).__init__(self.config) client_models = { diff --git a/libraries/botframework-connector/botframework/connector/aiohttp_bf_pipeline.py b/libraries/botframework-connector/botframework/connector/aiohttp_bf_pipeline.py new file mode 100644 index 000000000..b46a40857 --- /dev/null +++ b/libraries/botframework-connector/botframework/connector/aiohttp_bf_pipeline.py @@ -0,0 +1,34 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from msrest.pipeline import AsyncPipeline, AsyncHTTPPolicy, SansIOHTTPPolicy +from msrest.universal_http.async_requests import AsyncRequestsHTTPSender as Driver +from msrest.pipeline.async_requests import ( + AsyncRequestsCredentialsPolicy, + AsyncPipelineRequestsHTTPSender, +) +from msrest.pipeline.universal import RawDeserializer + +from .bot_framework_sdk_client_async import BotFrameworkConnectorConfiguration + + +class AsyncBfPipeline(AsyncPipeline): + def __init__(self, config: BotFrameworkConnectorConfiguration): + creds = config.credentials + + policies = [ + config.user_agent_policy, # UserAgent policy + RawDeserializer(), # Deserialize the raw bytes + config.http_logger_policy, # HTTP request/response log + ] # type: List[Union[AsyncHTTPPolicy, SansIOHTTPPolicy]] + if creds: + if isinstance(creds, (AsyncHTTPPolicy, SansIOHTTPPolicy)): + policies.insert(1, creds) + else: + # Assume this is the old credentials class, and then requests. Wrap it. + policies.insert(1, AsyncRequestsCredentialsPolicy(creds)) + + sender = config.sender or AsyncPipelineRequestsHTTPSender( + config.driver or Driver(config) + ) + super().__init__(policies, sender) diff --git a/libraries/botframework-connector/botframework/connector/auth/emulator_validation.py b/libraries/botframework-connector/botframework/connector/auth/emulator_validation.py index b00b8e1cc..a50a5eaea 100644 --- a/libraries/botframework-connector/botframework/connector/auth/emulator_validation.py +++ b/libraries/botframework-connector/botframework/connector/auth/emulator_validation.py @@ -1,7 +1,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. -import asyncio from typing import Union import jwt @@ -181,9 +180,8 @@ async def authenticate_emulator_token( "Unauthorized. Unknown Emulator Token version ", version_claim, "." ) - is_valid_app_id = await asyncio.ensure_future( - credentials.is_valid_appid(app_id) - ) + is_valid_app_id = await credentials.is_valid_appid(app_id) + if not is_valid_app_id: raise PermissionError( "Unauthorized. Invalid AppId passed on token: ", app_id diff --git a/libraries/botframework-connector/botframework/connector/bot_framework_sdk_client_async.py b/libraries/botframework-connector/botframework/connector/bot_framework_sdk_client_async.py new file mode 100644 index 000000000..9efb15b7d --- /dev/null +++ b/libraries/botframework-connector/botframework/connector/bot_framework_sdk_client_async.py @@ -0,0 +1,40 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from typing import Optional, Type + +from msrest.async_client import SDKClientAsync +from msrest.universal_http.async_abc import AsyncHTTPSender as AsyncHttpDriver +from msrest.pipeline import AsyncPipeline +from msrest.pipeline.aiohttp import AsyncHTTPSender + + +from ._configuration import ConnectorClientConfiguration + + +class BotFrameworkConnectorConfiguration(ConnectorClientConfiguration): + def __init__( + self, + credentials, + base_url: str, + *, + pipeline_type: Optional[Type[AsyncPipeline]] = None, + sender: Optional[AsyncHTTPSender] = None, + driver: Optional[AsyncHttpDriver] = None + ): + super().__init__(credentials, base_url) + + # The overwrite hierarchy should be well documented + self.sender = sender + self.driver = driver + + self.custom_pipeline = pipeline_type(self) if pipeline_type else None + + +class BotFrameworkSDKClientAsync(SDKClientAsync): + def __init__(self, config: BotFrameworkConnectorConfiguration) -> None: + super().__init__(config) + + self._client.config.pipeline = ( + config.custom_pipeline or self._client.config.pipeline + ) diff --git a/libraries/botframework-streaming/README.rst b/libraries/botframework-streaming/README.rst new file mode 100644 index 000000000..49595961f --- /dev/null +++ b/libraries/botframework-streaming/README.rst @@ -0,0 +1,83 @@ + +=============================== +BotFramework-Streaming for Python +=============================== + +.. image:: https://fuselabs.visualstudio.com/SDK_v4/_apis/build/status/Python/SDK_v4-Python-CI?branchName=master + :target: https://fuselabs.visualstudio.com/SDK_v4/_apis/build/status/Python/SDK_v4-Python-CI + :align: right + :alt: Azure DevOps status for master branch +.. image:: https://badge.fury.io/py/botframework-streaming.svg + :target: https://badge.fury.io/py/botframework-streaming + :alt: Latest PyPI package version + +Streaming Extensions libraries for BotFramework. + +How to Install +============== + +.. code-block:: python + + pip install botframework-streaming + + +Documentation/Wiki +================== + +You can find more information on the botframework-python project by visiting our `Wiki`_. + +Requirements +============ + +* `Python >= 3.7.0`_ + + +Source Code +=========== +The latest developer version is available in a github repository: +https://github.com/Microsoft/botframework-python/ + + +Contributing +============ + +This project welcomes contributions and suggestions. Most contributions require you to agree to a +Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us +the rights to use your contribution. For details, visit https://cla.microsoft.com. + +When you submit a pull request, a CLA-bot will automatically determine whether you need to provide +a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions +provided by the bot. You will only need to do this once across all repos using our CLA. + +This project has adopted the `Microsoft Open Source Code of Conduct`_. +For more information see the `Code of Conduct FAQ`_ or +contact `opencode@microsoft.com`_ with any additional questions or comments. + +Reporting Security Issues +========================= + +Security issues and bugs should be reported privately, via email, to the Microsoft Security +Response Center (MSRC) at `secure@microsoft.com`_. You should +receive a response within 24 hours. If for some reason you do not, please follow up via +email to ensure we received your original message. Further information, including the +`MSRC PGP`_ key, can be found in +the `Security TechCenter`_. + +License +======= + +Copyright (c) Microsoft Corporation. All rights reserved. + +Licensed under the MIT_ License. + +.. _Wiki: https://github.com/Microsoft/botframework-python/wiki +.. _Python >= 3.7.0: https://www.python.org/downloads/ +.. _MIT: https://github.com/Microsoft/vscode/blob/master/LICENSE.txt +.. _Microsoft Open Source Code of Conduct: https://opensource.microsoft.com/codeofconduct/ +.. _Code of Conduct FAQ: https://opensource.microsoft.com/codeofconduct/faq/ +.. _opencode@microsoft.com: mailto:opencode@microsoft.com +.. _secure@microsoft.com: mailto:secure@microsoft.com +.. _MSRC PGP: https://technet.microsoft.com/en-us/security/dn606155 +.. _Security TechCenter: https://github.com/Microsoft/vscode/blob/master/LICENSE.txt + +.. `_ \ No newline at end of file diff --git a/libraries/botframework-streaming/botframework/streaming/__init__.py b/libraries/botframework-streaming/botframework/streaming/__init__.py new file mode 100644 index 000000000..fac150fb5 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/__init__.py @@ -0,0 +1,27 @@ +# coding=utf-8 +# -------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- + +from .about import __version__, __title__ +from .receive_request import ReceiveRequest +from .payload_stream import PayloadStream +from .protocol_adapter import ProtocolAdapter +from .receive_response import ReceiveResponse +from .request_handler import RequestHandler +from .streaming_request import StreamingRequest +from .streaming_response import StreamingResponse + +__all__ = [ + "ReceiveRequest", + "ProtocolAdapter", + "ReceiveResponse", + "PayloadStream", + "RequestHandler", + "StreamingRequest", + "StreamingResponse", + "__title__", + "__version__", +] diff --git a/libraries/botframework-streaming/botframework/streaming/about.py b/libraries/botframework-streaming/botframework/streaming/about.py new file mode 100644 index 000000000..81e170270 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/about.py @@ -0,0 +1,14 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import os + +__title__ = "botframework-streaming" +__version__ = ( + os.environ["packageVersion"] if "packageVersion" in os.environ else "4.13.0" +) +__uri__ = "https://www.github.com/Microsoft/botbuilder-python" +__author__ = "Microsoft" +__description__ = "Microsoft Bot Framework Bot Builder" +__summary__ = "Microsoft Bot Framework Bot Builder SDK for Python." +__license__ = "MIT" diff --git a/libraries/botframework-streaming/botframework/streaming/payload_stream.py b/libraries/botframework-streaming/botframework/streaming/payload_stream.py new file mode 100644 index 000000000..4a9ec1463 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payload_stream.py @@ -0,0 +1,77 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from asyncio import Lock, Semaphore +from typing import List + +from botframework.streaming.payloads.assemblers import PayloadStreamAssembler + + +class PayloadStream: + def __init__(self, assembler: PayloadStreamAssembler): + self._assembler = assembler + self._buffer_queue: List[List[int]] = [] + self._lock = Lock() + self._data_available = Semaphore(0) + self._producer_length = 0 # total length + self._consumer_position = 0 # read position + self._active: List[int] = [] + self._active_offset = 0 + self._end = False + + def __len__(self): + return self._producer_length + + def give_buffer(self, buffer: List[int]): + self._buffer_queue.append(buffer) + self._producer_length += len(buffer) + + self._data_available.release() + + def done_producing(self): + self.give_buffer([]) + + def write(self, buffer: List[int], offset: int, count: int): + buffer_copy = buffer[offset : offset + count] + self.give_buffer(buffer_copy) + + async def read(self, buffer: List[int], offset: int, count: int): + if self._end: + return 0 + + if not self._active: + await self._data_available.acquire() + async with self._lock: + self._active = self._buffer_queue.pop(0) + + available_count = min(len(self._active) - self._active_offset, count) + + for index in range(available_count): + buffer[offset + index] = self._active[self._active_offset] + self._active_offset += 1 + + self._consumer_position += available_count + + if self._active_offset >= len(self._active): + self._active = [] + self._active_offset = 0 + + if ( + self._assembler + and self._consumer_position >= self._assembler.content_length + ): + self._end = True + + return available_count + + async def read_until_end(self): + result = [None] * self._assembler.content_length + current_size = 0 + + while not self._end: + count = await self.read( + result, current_size, self._assembler.content_length + ) + current_size += count + + return result diff --git a/libraries/botframework-streaming/botframework/streaming/payload_transport/__init__.py b/libraries/botframework-streaming/botframework/streaming/payload_transport/__init__.py new file mode 100644 index 000000000..6270c96f3 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payload_transport/__init__.py @@ -0,0 +1,10 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + + +from .payload_receiver import PayloadReceiver +from .payload_sender import PayloadSender +from .send_packet import SendPacket + + +__all__ = ["PayloadReceiver", "PayloadSender", "SendPacket"] diff --git a/libraries/botframework-streaming/botframework/streaming/payload_transport/payload_receiver.py b/libraries/botframework-streaming/botframework/streaming/payload_transport/payload_receiver.py new file mode 100644 index 000000000..b20df2050 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payload_transport/payload_receiver.py @@ -0,0 +1,165 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import traceback + +from asyncio import iscoroutinefunction, isfuture +from typing import Callable, List + +import botframework.streaming as streaming +from botframework.streaming.payloads import HeaderSerializer +from botframework.streaming.payloads.models import Header, PayloadTypes +from botframework.streaming.transport import ( + DisconnectedEventArgs, + TransportConstants, + TransportReceiverBase, +) + + +class PayloadReceiver: + def __init__(self): + self._get_stream: Callable[[Header], List[int]] = None + self._receive_action: Callable[[Header, List[int], int], None] = None + self._receiver: TransportReceiverBase = None + self._is_disconnecting = False + + self._receive_header_buffer: List[int] = [ + None + ] * TransportConstants.MAX_HEADER_LENGTH + self._receive_content_buffer: List[int] = [ + None + ] * TransportConstants.MAX_PAYLOAD_LENGTH + + self.disconnected: Callable[[object, DisconnectedEventArgs], None] = None + + @property + def is_connected(self) -> bool: + return self._receiver is not None + + async def connect(self, receiver: TransportReceiverBase): + if self._receiver: + raise RuntimeError(f"{self.__class__.__name__} instance already connected.") + + self._receiver = receiver + await self._run_receive() + + async def _run_receive(self): + await self._receive_packets() + + def subscribe( + self, + get_stream: Callable[[Header], List[int]], + receive_action: Callable[[Header, List[int]], int], + ): + self._get_stream = get_stream + self._receive_action = receive_action + + async def disconnect(self, event_args: DisconnectedEventArgs = None): + did_disconnect = False + + if not self._is_disconnecting: + self._is_disconnecting = True + try: + try: + if self._receiver: + await self._receiver.close() + # TODO: investigate if 'dispose' is necessary + did_disconnect = True + except Exception: + traceback.print_exc() + + self._receiver = None + + if did_disconnect: + if callable(self.disconnected): + # pylint: disable=not-callable + if iscoroutinefunction(self.disconnected) or isfuture( + self.disconnected + ): + await self.disconnected( + self, event_args or DisconnectedEventArgs.empty + ) + else: + self.disconnected( + self, event_args or DisconnectedEventArgs.empty + ) + finally: + self._is_disconnecting = False + + async def _receive_packets(self): + is_closed = False + disconnect_args = None + + while self._receiver and self._receiver.is_connected and not is_closed: + # receive a single packet + try: + # read the header + header_offset = 0 + # TODO: this while is probalby not necessary + while header_offset < TransportConstants.MAX_HEADER_LENGTH: + length = await self._receiver.receive( + self._receive_header_buffer, + header_offset, + TransportConstants.MAX_HEADER_LENGTH - header_offset, + ) + + if length == 0: + # TODO: make custom exception + raise Exception( + "TransportDisconnectedException: Stream closed while reading header bytes" + ) + + header_offset += length + + # deserialize the bytes into a header + header = HeaderSerializer.deserialize( + self._receive_header_buffer, 0, TransportConstants.MAX_HEADER_LENGTH + ) + + # read the payload + content_stream = self._get_stream(header) + + buffer = ( + [None] * header.payload_length + if PayloadTypes.is_stream(header) + else self._receive_content_buffer + ) + offset = 0 + + if header.payload_length: + while offset < header.payload_length: + count = min( + header.payload_length - offset, + TransportConstants.MAX_PAYLOAD_LENGTH, + ) + + # Send: Packet content + length = await self._receiver.receive(buffer, offset, count) + if length == 0: + # TODO: make custom exception + raise Exception( + "TransportDisconnectedException: Stream closed while reading header bytes" + ) + + if content_stream is not None: + # write chunks to the content_stream if it's not a stream type + # TODO: this has to be improved in custom buffer class (validate buffer ended) + if not PayloadTypes.is_stream(header): + for index in range(offset, offset + length): + content_stream[index] = buffer[index] + + offset += length + + # give the full payload buffer to the contentStream if it's a stream + if PayloadTypes.is_stream(header) and isinstance( + content_stream, streaming.PayloadStream + ): + content_stream.give_buffer(buffer) + + self._receive_action(header, content_stream, offset) + except Exception as exception: + traceback.print_exc() + is_closed = True + disconnect_args = DisconnectedEventArgs(reason=str(exception)) + + await self.disconnect(disconnect_args) diff --git a/libraries/botframework-streaming/botframework/streaming/payload_transport/payload_sender.py b/libraries/botframework-streaming/botframework/streaming/payload_transport/payload_sender.py new file mode 100644 index 000000000..817181846 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payload_transport/payload_sender.py @@ -0,0 +1,157 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from asyncio import Event, ensure_future, iscoroutinefunction, isfuture +from typing import Awaitable, Callable, List + +from botframework.streaming.transport import ( + DisconnectedEventArgs, + TransportSenderBase, + TransportConstants, +) +from botframework.streaming.payloads import HeaderSerializer +from botframework.streaming.payloads.models import Header + +from .send_queue import SendQueue +from .send_packet import SendPacket + + +# TODO: consider interface this class +class PayloadSender: + def __init__(self): + self._connected_event = Event() + self._sender: TransportSenderBase = None + self._is_disconnecting: bool = False + self._send_header_buffer: List[int] = [ + None + ] * TransportConstants.MAX_HEADER_LENGTH + self._send_content_buffer: List[int] = [ + None + ] * TransportConstants.MAX_PAYLOAD_LENGTH + + self._send_queue = SendQueue(action=self._write_packet) + + self.disconnected: Callable[[object, DisconnectedEventArgs], None] = None + + @property + def is_connected(self) -> bool: + return self._sender is not None + + def connect(self, sender: TransportSenderBase): + if self._sender: + raise RuntimeError(f"{self.__class__.__name__} instance already connected.") + + self._sender = sender + self._connected_event.set() + + # TODO: check 'stream' for payload + def send_payload( + self, + header: Header, + payload: object, + is_length_known: bool, + sent_callback: Callable[[Header], Awaitable], + ): + packet = SendPacket( + header=header, + payload=payload, + is_length_known=is_length_known, + sent_callback=sent_callback, + ) + + self._send_queue.post(packet) + + async def disconnect(self, event_args: DisconnectedEventArgs = None): + did_disconnect = False + + if not self._is_disconnecting: + self._is_disconnecting = True + try: + try: + if self._sender: + self._sender.close() + # TODO: investigate if 'dispose' is necessary + did_disconnect = True + except Exception: + pass + + self._sender = None + + if did_disconnect: + self._connected_event.clear() + if callable(self.disconnected): + # pylint: disable=not-callable + if iscoroutinefunction(self.disconnected) or isfuture( + self.disconnected + ): + await self.disconnected( + self, event_args or DisconnectedEventArgs.empty + ) + else: + self.disconnected( + self, event_args or DisconnectedEventArgs.empty + ) + finally: + self._is_disconnecting = False + + async def _write_packet(self, packet: SendPacket): + await self._connected_event.wait() + + try: + # determine if we know the payload length and end + if not packet.is_length_known: + count = packet.header.payload_length + packet.header.end = count == 0 + + header_length = HeaderSerializer.serialize( + packet.header, self._send_header_buffer, 0 + ) + + # Send: Packet Header + length = await self._sender.send(self._send_header_buffer, 0, header_length) + if not length: + # TODO: make custom exception + raise Exception("TransportDisconnectedException") + + offset = 0 + + # Send content in chunks + if packet.header.payload_length and packet.payload: + # If we already read the buffer, send that + # If we did not, read from the stream until we've sent that amount + if not packet.is_length_known: + # Send: Packet content + length = await self._sender.send( + self._send_content_buffer, 0, packet.header.payload_length + ) + if length == 0: + # TODO: make custom exception + raise Exception("TransportDisconnectedException") + else: + while offset < packet.header.payload_length: + count = min( + packet.header.payload_length - offset, + TransportConstants.MAX_PAYLOAD_LENGTH, + ) + + # copy the stream to the buffer + # TODO: this has to be improved in custom buffer class (validate buffer ended) + for index in range(count): + self._send_content_buffer[index] = packet.payload[index] + + # Send: Packet content + length = await self._sender.send( + self._send_content_buffer, 0, count + ) + if length == 0: + # TODO: make custom exception + raise Exception("TransportDisconnectedException") + + offset += count + + if packet.sent_callback: + # TODO: should this really run in the background? + ensure_future(packet.sent_callback(packet.header)) + except Exception as exception: + disconnected_args = DisconnectedEventArgs(reason=str(exception)) + await self.disconnect(disconnected_args) diff --git a/libraries/botframework-streaming/botframework/streaming/payload_transport/send_packet.py b/libraries/botframework-streaming/botframework/streaming/payload_transport/send_packet.py new file mode 100644 index 000000000..bf7164708 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payload_transport/send_packet.py @@ -0,0 +1,21 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from typing import Awaitable, Callable + +from botframework.streaming.payloads.models import Header + + +class SendPacket: + def __init__( + self, + *, + header: Header, + payload: object, + is_length_known: bool, + sent_callback: Callable[[Header], Awaitable] + ): + self.header = header + self.payload = payload + self.is_length_known = is_length_known + self.sent_callback = sent_callback diff --git a/libraries/botframework-streaming/botframework/streaming/payload_transport/send_queue.py b/libraries/botframework-streaming/botframework/streaming/payload_transport/send_queue.py new file mode 100644 index 000000000..d337d911a --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payload_transport/send_queue.py @@ -0,0 +1,39 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import traceback + +from asyncio import Queue, ensure_future +from typing import Awaitable, Callable + + +class SendQueue: + def __init__(self, action: Callable[[object], Awaitable], timeout: int = 30): + self._action = action + + self._queue = Queue() + self._timeout_seconds = timeout + + # TODO: this have to be abstracted so can remove asyncio dependency + ensure_future(self._process()) + + def post(self, item: object): + self._post_internal(item) + + def _post_internal(self, item: object): + self._queue.put_nowait(item) + + async def _process(self): + while True: + try: + while True: + item = await self._queue.get() + try: + await self._action(item) + except Exception: + traceback.print_exc() + finally: + self._queue.task_done() + except Exception: + # AppInsights.TrackException(e) + traceback.print_exc() diff --git a/libraries/botframework-streaming/botframework/streaming/payloads/__init__.py b/libraries/botframework-streaming/botframework/streaming/payloads/__init__.py new file mode 100644 index 000000000..06fd3ad21 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payloads/__init__.py @@ -0,0 +1,20 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from .content_stream import ContentStream +from .header_serializer import HeaderSerializer +from .payload_assembler_manager import PayloadAssemblerManager +from .request_manager import RequestManager +from .response_message_stream import ResponseMessageStream +from .send_operations import SendOperations +from .stream_manager import StreamManager + +__all__ = [ + "ContentStream", + "PayloadAssemblerManager", + "RequestManager", + "ResponseMessageStream", + "HeaderSerializer", + "SendOperations", + "StreamManager", +] diff --git a/libraries/botframework-streaming/botframework/streaming/payloads/assemblers/__init__.py b/libraries/botframework-streaming/botframework/streaming/payloads/assemblers/__init__.py new file mode 100644 index 000000000..0373292c4 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payloads/assemblers/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from .assembler import Assembler +from .payload_stream_assembler import PayloadStreamAssembler +from .receive_request_assembler import ReceiveRequestAssembler +from .receive_response_assembler import ReceiveResponseAssembler + +__all__ = [ + "Assembler", + "PayloadStreamAssembler", + "ReceiveRequestAssembler", + "ReceiveResponseAssembler", +] diff --git a/libraries/botframework-streaming/botframework/streaming/payloads/assemblers/assembler.py b/libraries/botframework-streaming/botframework/streaming/payloads/assemblers/assembler.py new file mode 100644 index 000000000..5fdcfe49d --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payloads/assemblers/assembler.py @@ -0,0 +1,30 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + + +from abc import ABC +from uuid import UUID + +from typing import List + +from botframework.streaming.payloads.models import Header + + +class Assembler(ABC): + def __init__(self, end: bool, identifier: UUID): + self.end = end + self.identifier = identifier + + def close(self): + raise NotImplementedError() + + def create_stream_from_payload(self) -> List[int]: + raise NotImplementedError() + + def get_payload_as_stream(self) -> List[int]: + raise NotImplementedError() + + def on_receive( + self, header: Header, stream: List[int], content_length: int + ) -> List[int]: + raise NotImplementedError() diff --git a/libraries/botframework-streaming/botframework/streaming/payloads/assemblers/payload_stream_assembler.py b/libraries/botframework-streaming/botframework/streaming/payloads/assemblers/payload_stream_assembler.py new file mode 100644 index 000000000..c7aba13b6 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payloads/assemblers/payload_stream_assembler.py @@ -0,0 +1,46 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from uuid import UUID +from typing import List + +import botframework.streaming as streaming +import botframework.streaming.payloads as payloads +from botframework.streaming.payloads.models import Header + +from .assembler import Assembler + + +class PayloadStreamAssembler(Assembler): + # pylint: disable=super-init-not-called + def __init__( + self, + stream_manager: "payloads.StreamManager", + identifier: UUID, + type: str = None, + length: int = None, + ): + self._stream_manager = stream_manager or payloads.StreamManager() + self._stream: "streaming.PayloadStream" = None + # self._lock = Lock() + self.identifier = identifier + self.content_type = type + self.content_length = length + self.end: bool = None + + def create_stream_from_payload(self) -> "streaming.PayloadStream": + return streaming.PayloadStream(self) + + def get_payload_as_stream(self) -> "streaming.PayloadStream": + if self._stream is None: + self._stream = self.create_stream_from_payload() + + return self._stream + + def on_receive(self, header: Header, stream: List[int], content_length: int): + if header.end: + self.end = True + self._stream.done_producing() + + def close(self): + self._stream_manager.close_stream(self.identifier) diff --git a/libraries/botframework-streaming/botframework/streaming/payloads/assemblers/receive_request_assembler.py b/libraries/botframework-streaming/botframework/streaming/payloads/assemblers/receive_request_assembler.py new file mode 100644 index 000000000..f26f67c6a --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payloads/assemblers/receive_request_assembler.py @@ -0,0 +1,84 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import asyncio +from uuid import UUID +from typing import Awaitable, Callable, List + +import botframework.streaming as streaming +import botframework.streaming.payloads as payloads +from botframework.streaming.payloads.models import Header, RequestPayload + +from .assembler import Assembler + + +class ReceiveRequestAssembler(Assembler): + # pylint: disable=super-init-not-called + def __init__( + self, + header: Header, + stream_manager: "payloads.StreamManager", + on_completed: Callable[[UUID, "streaming.ReceiveRequest"], Awaitable], + ): + if not header: + raise TypeError( + f"'header: {header.__class__.__name__}' argument can't be None" + ) + if not on_completed: + raise TypeError(f"'on_completed' argument can't be None") + + self._stream_manager = stream_manager + self._on_completed = on_completed + self.identifier = header.id + self._length = header.payload_length if header.end else None + self._stream: List[int] = None + + def create_stream_from_payload(self) -> List[int]: + return [None] * (self._length or 0) + + def get_payload_as_stream(self) -> List[int]: + if self._stream is None: + self._stream = self.create_stream_from_payload() + + return self._stream + + def on_receive(self, header: Header, stream: List[int], content_length: int): + if header.end: + self.end = True + + # Execute the request in the background + asyncio.ensure_future(self.process_request(stream)) + + def close(self): + self._stream_manager.close_stream(self.identifier) + + async def process_request(self, stream: List[int]): + request_payload = RequestPayload().from_json(bytes(stream).decode("utf-8-sig")) + + request = streaming.ReceiveRequest( + verb=request_payload.verb, path=request_payload.path, streams=[] + ) + + if request_payload.streams: + for stream_description in request_payload.streams: + try: + identifier = UUID(stream_description.id) + except Exception: + raise ValueError( + f"Stream description id '{stream_description.id}' is not a Guid" + ) + + stream_assembler = self._stream_manager.get_payload_assembler( + identifier + ) + stream_assembler.content_type = stream_description.content_type + stream_assembler.content_length = stream_description.length + + content_stream = payloads.ContentStream( + identifier=identifier, assembler=stream_assembler + ) + content_stream.length = stream_description.length + content_stream.content_type = stream_description.content_type + request.streams.append(content_stream) + + await self._on_completed(self.identifier, request) diff --git a/libraries/botframework-streaming/botframework/streaming/payloads/assemblers/receive_response_assembler.py b/libraries/botframework-streaming/botframework/streaming/payloads/assemblers/receive_response_assembler.py new file mode 100644 index 000000000..9b6003021 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payloads/assemblers/receive_response_assembler.py @@ -0,0 +1,86 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import asyncio +from uuid import UUID +from typing import Awaitable, Callable, List + +import botframework.streaming as streaming +import botframework.streaming.payloads as payloads +from botframework.streaming.payloads.models import Header, ResponsePayload + +from .assembler import Assembler + + +class ReceiveResponseAssembler(Assembler): + # pylint: disable=super-init-not-called + def __init__( + self, + header: Header, + stream_manager: "payloads.StreamManager", + on_completed: Callable[[UUID, "streaming.ReceiveResponse"], Awaitable], + ): + if not header: + raise TypeError( + f"'header: {header.__class__.__name__}' argument can't be None" + ) + if not on_completed: + raise TypeError(f"'on_completed' argument can't be None") + + self._stream_manager = stream_manager + self._on_completed = on_completed + self.identifier = header.id + self._length = header.payload_length if header.end else None + self._stream: List[int] = None + + def create_stream_from_payload(self) -> List[int]: + return [None] * (self._length or 0) + + def get_payload_as_stream(self) -> List[int]: + if self._stream is None: + self._stream = self.create_stream_from_payload() + + return self._stream + + def on_receive(self, header: Header, stream: List[int], content_length: int): + if header.end: + self.end = header.end + + # Execute the response on a separate Task + # Execute the request on a separate Thread in the background + # Execute the request on a separate in the background + asyncio.ensure_future(self.process_response(stream)) + + def close(self): + self._stream_manager.close_stream(self.identifier) + + async def process_response(self, stream: List[int]): + response_payload = ResponsePayload().from_json(bytes(stream).decode("utf8")) + + response = streaming.ReceiveResponse( + status_code=response_payload.status_code, streams=[] + ) + + if response_payload.streams: + for stream_description in response_payload.streams: + try: + identifier = UUID(int=int(stream_description.id)) + except Exception: + raise ValueError( + f"Stream description id '{stream_description.id}' is not a Guid" + ) + + stream_assembler = self._stream_manager.get_payload_assembler( + identifier + ) + stream_assembler.content_type = stream_description.content_type + stream_assembler.content_length = stream_description.length + + content_stream = payloads.ContentStream( + identifier=identifier, assembler=stream_assembler + ) + content_stream.length = stream_description.length + content_stream.content_type = stream_description.content_type + response.streams.append(content_stream) + + await self._on_completed(self.identifier, response) diff --git a/libraries/botframework-streaming/botframework/streaming/payloads/content_stream.py b/libraries/botframework-streaming/botframework/streaming/payloads/content_stream.py new file mode 100644 index 000000000..c0c1ef67c --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payloads/content_stream.py @@ -0,0 +1,23 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from uuid import UUID + +from botframework.streaming.payloads.assemblers import PayloadStreamAssembler + + +class ContentStream: + def __init__(self, identifier: UUID, assembler: PayloadStreamAssembler): + if not assembler: + raise TypeError( + f"'assembler: {assembler.__class__.__name__}' argument can't be None" + ) + + self.identifier = identifier + self._assembler = assembler + self.stream = self._assembler.get_payload_as_stream() + self.content_type: str = None + self.length: int = None + + def cancel(self): + self._assembler.close() diff --git a/libraries/botframework-streaming/botframework/streaming/payloads/disassemblers/__init__.py b/libraries/botframework-streaming/botframework/streaming/payloads/disassemblers/__init__.py new file mode 100644 index 000000000..bc4270be5 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payloads/disassemblers/__init__.py @@ -0,0 +1,16 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from .cancel_disassembler import CancelDisassembler +from .payload_disassembler import PayloadDisassembler +from .request_disassembler import RequestDisassembler +from .response_disassembler import ResponseDisassembler +from .response_message_stream_disassembler import ResponseMessageStreamDisassembler + +__all__ = [ + "CancelDisassembler", + "PayloadDisassembler", + "RequestDisassembler", + "ResponseDisassembler", + "ResponseMessageStreamDisassembler", +] diff --git a/libraries/botframework-streaming/botframework/streaming/payloads/disassemblers/cancel_disassembler.py b/libraries/botframework-streaming/botframework/streaming/payloads/disassemblers/cancel_disassembler.py new file mode 100644 index 000000000..c531cfe5d --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payloads/disassemblers/cancel_disassembler.py @@ -0,0 +1,22 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from uuid import UUID + +from botframework.streaming.payload_transport import PayloadSender +from botframework.streaming.payloads.models import Header + + +class CancelDisassembler: + def __init__(self, *, sender: PayloadSender, identifier: UUID, type: str): + self._sender = sender + self._identifier = identifier + self._type = type + + async def disassemble(self): + header = Header(type=self._type, id=self._identifier, end=True) + + header.payload_length = 0 + + self._sender.send_payload(header, None, True, None) + return diff --git a/libraries/botframework-streaming/botframework/streaming/payloads/disassemblers/payload_disassembler.py b/libraries/botframework-streaming/botframework/streaming/payloads/disassemblers/payload_disassembler.py new file mode 100644 index 000000000..d60955d1f --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payloads/disassemblers/payload_disassembler.py @@ -0,0 +1,110 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import json +from asyncio import Future +from abc import ABC, abstractmethod +from uuid import UUID +from typing import List + +from botframework.streaming.transport import TransportConstants +from botframework.streaming.payload_transport import PayloadSender +from botframework.streaming.payloads import ResponseMessageStream +from botframework.streaming.payloads.models import ( + Header, + Serializable, + StreamDescription, +) + + +class PayloadDisassembler(ABC): + def __init__(self, sender: PayloadSender, identifier: UUID): + self.sender = sender + self.identifier = identifier + self._task_completion_source = Future() + + self._stream: List[int] = None + self._stream_length: int = None + self._send_offset: int = None + self._is_end: bool = False + self._type: str = None + + @property + @abstractmethod + def type(self) -> str: + return self._type + + async def get_stream(self) -> List[int]: + raise NotImplementedError() + + async def disassemble(self): + self._stream = await self.get_stream() + self._stream_length = len(self._stream) + self._send_offset = 0 + + await self._send() + + @staticmethod + def get_stream_description(stream: ResponseMessageStream) -> StreamDescription: + description = StreamDescription(id=str(stream.id)) + + # TODO: This content type is hardcoded for POC, investigate how to proceed + content = bytes(stream.content).decode("utf8") + + try: + json.loads(content) + content_type = "application/json" + except ValueError: + content_type = "text/plain" + + description.content_type = content_type + description.length = len(content) + + # TODO: validate statement below, also make the string a constant + # content_length: int = stream.content.headers.get("Content-Length") + # if content_length: + # description.length = int(content_length) + # else: + # # TODO: check statement validity + # description.length = stream.content.headers.content_length + + return description + + @staticmethod + def serialize(item: Serializable, stream: List[int], length: List[int]): + encoded_json = item.to_json().encode() + stream.clear() + stream.extend(list(encoded_json)) + + length.clear() + length.append(len(stream)) + + async def _send(self): + # determine if we know the length we can send and whether we can tell if this is the end + is_length_known = self._is_end + + header = Header(type=self.type, id=self.identifier, end=self._is_end) + + header.payload_length = 0 + + if self._stream_length is not None: + # determine how many bytes we can send and if we are at the end + header.payload_length = min( + self._stream_length - self._send_offset, + TransportConstants.MAX_PAYLOAD_LENGTH, + ) + header.end = ( + self._send_offset + header.payload_length >= self._stream_length + ) + is_length_known = True + + self.sender.send_payload(header, self._stream, is_length_known, self._on_send) + + async def _on_send(self, header: Header): + self._send_offset += header.payload_length + self._is_end = header.end + + if self._is_end: + self._task_completion_source.set_result(True) + else: + await self._send() diff --git a/libraries/botframework-streaming/botframework/streaming/payloads/disassemblers/request_disassembler.py b/libraries/botframework-streaming/botframework/streaming/payloads/disassemblers/request_disassembler.py new file mode 100644 index 000000000..281dec376 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payloads/disassemblers/request_disassembler.py @@ -0,0 +1,42 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from uuid import UUID +from typing import List + +from botframework.streaming.payload_transport import PayloadSender +from botframework.streaming.payloads.models import PayloadTypes, RequestPayload + +from .payload_disassembler import PayloadDisassembler + + +class RequestDisassembler(PayloadDisassembler): + def __init__( + self, + sender: PayloadSender, + identifier: UUID, + request: "streaming.StreamingRequest", + ): + super().__init__(sender, identifier) + + self.request = request + + @property + def type(self) -> str: + return PayloadTypes.REQUEST + + async def get_stream(self) -> List[int]: + payload = RequestPayload(verb=self.request.verb, path=self.request.path) + + if self.request.streams: + payload.streams = [ + self.get_stream_description(content_stream) + for content_stream in self.request.streams + ] + + memory_stream: List[int] = [] + stream_length: List[int] = [] + # TODO: high probability stream length is not necessary + self.serialize(payload, memory_stream, stream_length) + + return memory_stream diff --git a/libraries/botframework-streaming/botframework/streaming/payloads/disassemblers/response_disassembler.py b/libraries/botframework-streaming/botframework/streaming/payloads/disassemblers/response_disassembler.py new file mode 100644 index 000000000..7e480cac4 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payloads/disassemblers/response_disassembler.py @@ -0,0 +1,42 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from uuid import UUID +from typing import List + +from botframework.streaming.payload_transport import PayloadSender +from botframework.streaming.payloads.models import PayloadTypes, ResponsePayload + +from .payload_disassembler import PayloadDisassembler + + +class ResponseDisassembler(PayloadDisassembler): + def __init__( + self, + sender: PayloadSender, + identifier: UUID, + response: "streaming.StreamingResponse", + ): + super().__init__(sender, identifier) + + self.response = response + + @property + def type(self) -> str: + return PayloadTypes.RESPONSE + + async def get_stream(self) -> List[int]: + payload = ResponsePayload(status_code=self.response.status_code) + + if self.response.streams: + payload.streams = [ + self.get_stream_description(content_stream) + for content_stream in self.response.streams + ] + + memory_stream: List[int] = [] + stream_length: List[int] = [] + # TODO: high probability stream length is not necessary + self.serialize(payload, memory_stream, stream_length) + + return memory_stream diff --git a/libraries/botframework-streaming/botframework/streaming/payloads/disassemblers/response_message_stream_disassembler.py b/libraries/botframework-streaming/botframework/streaming/payloads/disassemblers/response_message_stream_disassembler.py new file mode 100644 index 000000000..3f0f5d71c --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payloads/disassemblers/response_message_stream_disassembler.py @@ -0,0 +1,27 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from typing import List + +from botframework.streaming.payload_transport import PayloadSender +from botframework.streaming.payloads import ResponseMessageStream +from botframework.streaming.payloads.models import PayloadTypes + +from .payload_disassembler import PayloadDisassembler + + +class ResponseMessageStreamDisassembler(PayloadDisassembler): + def __init__(self, sender: PayloadSender, content_stream: ResponseMessageStream): + super().__init__(sender, content_stream.id) + + self.content_stream = content_stream + + @property + def type(self) -> str: + return PayloadTypes.STREAM + + async def get_stream(self) -> List[int]: + # TODO: check if bypass is correct here or if serialization should take place. + # this is redundant -->stream: List[int] = list(str(self.content_stream.content).encode()) + + return self.content_stream.content diff --git a/libraries/botframework-streaming/botframework/streaming/payloads/header_serializer.py b/libraries/botframework-streaming/botframework/streaming/payloads/header_serializer.py new file mode 100644 index 000000000..b0b507ab2 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payloads/header_serializer.py @@ -0,0 +1,170 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from uuid import UUID +from typing import List + +from botframework.streaming.transport import TransportConstants + +from .models import Header + +_CHAR_TO_BINARY_INT = {val.decode(): list(val)[0] for val in [b".", b"\n", b"1", b"0"]} + + +# TODO: consider abstracting the binary int list logic into a class for easier handling +class HeaderSerializer: + DELIMITER = _CHAR_TO_BINARY_INT["."] + TERMINATOR = _CHAR_TO_BINARY_INT["\n"] + END = _CHAR_TO_BINARY_INT["1"] + NOT_END = _CHAR_TO_BINARY_INT["0"] + TYPE_OFFSET = 0 + TYPE_DELIMITER_OFFSET = 1 + LENGTH_OFFSET = 2 + LENGTH_LENGTH = 6 + LENGTH_DELIMETER_OFFSET = 8 + ID_OFFSET = 9 + ID_LENGTH = 36 + ID_DELIMETER_OFFSET = 45 + END_OFFSET = 46 + TERMINATOR_OFFSET = 47 + + @staticmethod + def serialize( + header: Header, + buffer: List[int], + offset: int, # pylint: disable=unused-argument + ) -> int: + + # write type + buffer[HeaderSerializer.TYPE_OFFSET] = HeaderSerializer._char_to_binary_int( + header.type + ) + buffer[HeaderSerializer.TYPE_DELIMITER_OFFSET] = HeaderSerializer.DELIMITER + + # write length + length_binary_array: List[int] = list( + HeaderSerializer._int_to_formatted_encoded_str( + header.payload_length, "{:06d}" + ) + ) + HeaderSerializer._write_in_buffer( + length_binary_array, buffer, HeaderSerializer.LENGTH_OFFSET + ) + buffer[HeaderSerializer.LENGTH_DELIMETER_OFFSET] = HeaderSerializer.DELIMITER + + # write id + id_binary_array: List[int] = list( + HeaderSerializer._uuid_to_numeric_encoded_str(header.id) + ) + HeaderSerializer._write_in_buffer( + id_binary_array, buffer, HeaderSerializer.ID_OFFSET + ) + buffer[HeaderSerializer.ID_DELIMETER_OFFSET] = HeaderSerializer.DELIMITER + + # write terminator + buffer[HeaderSerializer.END_OFFSET] = ( + HeaderSerializer.END if header.end else HeaderSerializer.NOT_END + ) + buffer[HeaderSerializer.TERMINATOR_OFFSET] = HeaderSerializer.TERMINATOR + + return TransportConstants.MAX_HEADER_LENGTH + + @staticmethod + def deserialize( + buffer: List[int], offset: int, count: int # pylint: disable=unused-argument + ) -> Header: + if count != TransportConstants.MAX_HEADER_LENGTH: + raise ValueError("Cannot deserialize header, incorrect length") + + header = Header( + type=HeaderSerializer._binary_int_to_char( + buffer[HeaderSerializer.TYPE_OFFSET] + ) + ) + + if buffer[HeaderSerializer.TYPE_DELIMITER_OFFSET] != HeaderSerializer.DELIMITER: + raise ValueError("Header type delimeter is malformed") + + length_str = HeaderSerializer._binary_array_to_str( + buffer[ + HeaderSerializer.LENGTH_OFFSET : HeaderSerializer.LENGTH_OFFSET + + HeaderSerializer.LENGTH_LENGTH + ] + ) + + try: + length = int(length_str) + except Exception: + raise ValueError("Header length is malformed") + + header.payload_length = length + + if ( + buffer[HeaderSerializer.LENGTH_DELIMETER_OFFSET] + != HeaderSerializer.DELIMITER + ): + raise ValueError("Header length delimeter is malformed") + + identifier_str = HeaderSerializer._binary_array_to_str( + buffer[ + HeaderSerializer.ID_OFFSET : HeaderSerializer.ID_OFFSET + + HeaderSerializer.ID_LENGTH + ] + ) + + try: + identifier = UUID(identifier_str) + except Exception: + raise ValueError("Header id is malformed") + + header.id = identifier + + if buffer[HeaderSerializer.ID_DELIMETER_OFFSET] != HeaderSerializer.DELIMITER: + raise ValueError("Header id delimeter is malformed") + + if buffer[HeaderSerializer.END_OFFSET] not in [ + HeaderSerializer.END, + HeaderSerializer.NOT_END, + ]: + raise ValueError("Header end is malformed") + + header.end = buffer[HeaderSerializer.END_OFFSET] == HeaderSerializer.END + + if buffer[HeaderSerializer.TERMINATOR_OFFSET] != HeaderSerializer.TERMINATOR: + raise ValueError("Header terminator is malformed") + + return header + + @staticmethod + def _char_to_binary_int(char: str) -> int: + if len(char) != 1: + raise ValueError("Char to cast should be a str of exactly length 1") + + unicode_list = list(char.encode()) + + if len(unicode_list) != 1: + raise ValueError("Char to cast should be in the ASCII domain") + + return unicode_list[0] + + @staticmethod + def _int_to_formatted_encoded_str(value: int, str_format: str) -> bytes: + return str_format.format(value).encode("ascii") + + @staticmethod + def _uuid_to_numeric_encoded_str(value: UUID) -> bytes: + return str(value).encode("ascii") + + @staticmethod + def _binary_int_to_char(binary_int: int) -> str: + return bytes([binary_int]).decode("ascii") + + @staticmethod + def _binary_array_to_str(binary_array: List[int]) -> str: + return bytes(binary_array).decode("ascii") + + @staticmethod + def _write_in_buffer(data: List[int], buffer: List[int], insert_index: int): + for byte_int in data: + buffer[insert_index] = byte_int + insert_index += 1 diff --git a/libraries/botframework-streaming/botframework/streaming/payloads/models/__init__.py b/libraries/botframework-streaming/botframework/streaming/payloads/models/__init__.py new file mode 100644 index 000000000..f0d3e2024 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payloads/models/__init__.py @@ -0,0 +1,19 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + + +from .header import Header +from .payload_types import PayloadTypes +from .request_payload import RequestPayload +from .response_payload import ResponsePayload +from .serializable import Serializable +from .stream_description import StreamDescription + +__all__ = [ + "Header", + "PayloadTypes", + "RequestPayload", + "ResponsePayload", + "Serializable", + "StreamDescription", +] diff --git a/libraries/botframework-streaming/botframework/streaming/payloads/models/header.py b/libraries/botframework-streaming/botframework/streaming/payloads/models/header.py new file mode 100644 index 000000000..5eab7564e --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payloads/models/header.py @@ -0,0 +1,33 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from uuid import UUID + +from botframework.streaming.transport import TransportConstants + + +class Header: + # pylint: disable=invalid-name + def __init__(self, *, type: str = None, id: UUID = None, end: bool = None): + self._internal_payload_length = None + self.type: str = type + self.id: UUID = id + self.end: bool = end + + @property + def payload_length(self) -> int: + return self._internal_payload_length + + @payload_length.setter + def payload_length(self, value: int): + self._validate_length( + value, TransportConstants.MAX_LENGTH, TransportConstants.MIN_LENGTH + ) + self._internal_payload_length = value + + def _validate_length(self, value: int, max_val: int, min_val: int): + if value > max_val: + raise ValueError(f"Length must be less or equal than {max_val}") + + if value < min_val: + raise ValueError(f"Length must be greater or equal than {min_val}") diff --git a/libraries/botframework-streaming/botframework/streaming/payloads/models/payload_types.py b/libraries/botframework-streaming/botframework/streaming/payloads/models/payload_types.py new file mode 100644 index 000000000..ec9c01090 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payloads/models/payload_types.py @@ -0,0 +1,16 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from .header import Header + + +class PayloadTypes: + REQUEST = "A" + RESPONSE = "B" + STREAM = "S" + CANCEL_ALL = "X" + CANCEL_STREAM = "C" + + @staticmethod + def is_stream(header: Header) -> bool: + return header.type == PayloadTypes.STREAM diff --git a/libraries/botframework-streaming/botframework/streaming/payloads/models/request_payload.py b/libraries/botframework-streaming/botframework/streaming/payloads/models/request_payload.py new file mode 100644 index 000000000..1003c292f --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payloads/models/request_payload.py @@ -0,0 +1,44 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import json +from typing import List + +from .serializable import Serializable +from .stream_description import StreamDescription + + +class RequestPayload(Serializable): + def __init__( + self, + *, + verb: str = None, + path: str = None, + streams: List[StreamDescription] = None + ): + self.verb = verb + self.path = path + self.streams = streams + + def to_json(self) -> str: + obj = {"verb": self.verb, "path": self.path} + + if self.streams: + obj["streams"] = [stream.to_dict() for stream in self.streams] + + return json.dumps(obj) + + def from_json(self, json_str: str) -> "RequestPayload": + obj = json.loads(json_str) + + self.verb = obj.get("verb") + self.path = obj.get("path") + stream_list = obj.get("streams") + + if stream_list: + self.streams = [ + StreamDescription().from_dict(stream_dict) + for stream_dict in stream_list + ] + + return self diff --git a/libraries/botframework-streaming/botframework/streaming/payloads/models/response_payload.py b/libraries/botframework-streaming/botframework/streaming/payloads/models/response_payload.py new file mode 100644 index 000000000..f1f41142c --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payloads/models/response_payload.py @@ -0,0 +1,38 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import json +from typing import List + +from .serializable import Serializable +from .stream_description import StreamDescription + + +class ResponsePayload(Serializable): + def __init__( + self, *, status_code: int = None, streams: List[StreamDescription] = None + ): + self.status_code = status_code + self.streams = streams + + def to_json(self) -> str: + obj = {"statusCode": self.status_code} + + if self.streams: + obj["streams"] = [stream.to_dict() for stream in self.streams] + + return json.dumps(obj) + + def from_json(self, json_str: str) -> "ResponsePayload": + obj = json.loads(json_str) + + self.status_code = obj.get("statusCode") + stream_list = obj.get("streams") + + if stream_list: + self.streams = [ + StreamDescription().from_dict(stream_dict) + for stream_dict in stream_list + ] + + return self diff --git a/libraries/botframework-streaming/botframework/streaming/payloads/models/serializable.py b/libraries/botframework-streaming/botframework/streaming/payloads/models/serializable.py new file mode 100644 index 000000000..8c01830be --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payloads/models/serializable.py @@ -0,0 +1,14 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + + +from abc import ABC + + +# TODO: debate if this class is pertinent or should use msrest infrastructure +class Serializable(ABC): + def to_json(self) -> str: + raise NotImplementedError() + + def from_json(self, json_str: str): + raise NotImplementedError() diff --git a/libraries/botframework-streaming/botframework/streaming/payloads/models/stream_description.py b/libraries/botframework-streaming/botframework/streaming/payloads/models/stream_description.py new file mode 100644 index 000000000..c426f5de1 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payloads/models/stream_description.py @@ -0,0 +1,36 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import json + +from .serializable import Serializable + + +class StreamDescription(Serializable): + # pylint: disable=invalid-name + def __init__(self, *, id: str = None, content_type: str = None, length: int = None): + self.id = id + self.content_type = content_type + self.length = length + + def to_dict(self) -> dict: + obj = {"id": self.id, "type": self.content_type} + + if self.length is not None: + obj["length"] = self.length + + return obj + + def from_dict(self, json_dict: dict) -> "StreamDescription": + self.id = json_dict.get("id") + self.content_type = json_dict.get("type") + self.length = json_dict.get("length") + + return self + + def to_json(self) -> str: + return json.dumps(self.to_dict) + + def from_json(self, json_str: str) -> "StreamDescription": + obj = json.loads(json_str) + return self.from_dict(obj) diff --git a/libraries/botframework-streaming/botframework/streaming/payloads/payload_assembler_manager.py b/libraries/botframework-streaming/botframework/streaming/payloads/payload_assembler_manager.py new file mode 100644 index 000000000..276654b0b --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payloads/payload_assembler_manager.py @@ -0,0 +1,73 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from uuid import UUID +from typing import Awaitable, Callable, Dict, List, Union + +from botframework.streaming.payloads.assemblers import ( + Assembler, + ReceiveRequestAssembler, + ReceiveResponseAssembler, +) +from botframework.streaming.payloads.models import Header, PayloadTypes + +from .stream_manager import StreamManager + + +class PayloadAssemblerManager: + def __init__( + self, + stream_manager: StreamManager, + on_receive_request: Callable[[UUID, "streaming.ReceiveRequest"], Awaitable], + on_receive_response: Callable[[UUID, "streaming.ReceiveResponse"], Awaitable], + ): + self._on_receive_request = on_receive_request + self._on_receive_response = on_receive_response + self._stream_manager = stream_manager + self._active_assemblers: Dict[UUID, Assembler] = {} + + def get_payload_stream( + self, header: Header + ) -> Union[List[int], "streaming.PayloadStream"]: + # TODO: The return value SHOULDN'T be a union, we should interface List[int] into a BFStream class + if self._is_stream_payload(header): + return self._stream_manager.get_payload_stream(header) + if not self._active_assemblers.get(header.id): + # a new requestId has come in, start a new task to process it as it is received + assembler = self._create_payload_assembler(header) + if assembler: + self._active_assemblers[header.id] = assembler + return assembler.get_payload_as_stream() + + return None + + def on_receive( + self, header: Header, content_stream: List[int], content_length: int + ): + if self._is_stream_payload(header): + self._stream_manager.on_receive(header, content_stream, content_length) + else: + assembler = self._active_assemblers.get(header.id) + if assembler: + assembler.on_receive(header, content_stream, content_length) + + # remove them when we are done + if header.end: + del self._active_assemblers[header.id] + + # ignore unknown header ids + + def _create_payload_assembler(self, header: Header) -> Assembler: + if header.type == PayloadTypes.REQUEST: + return ReceiveRequestAssembler( + header, self._stream_manager, self._on_receive_request + ) + if header.type == PayloadTypes.RESPONSE: + return ReceiveResponseAssembler( + header, self._stream_manager, self._on_receive_response + ) + + return None + + def _is_stream_payload(self, header: Header) -> bool: + return PayloadTypes.is_stream(header) diff --git a/libraries/botframework-streaming/botframework/streaming/payloads/request_manager.py b/libraries/botframework-streaming/botframework/streaming/payloads/request_manager.py new file mode 100644 index 000000000..0ffdbeaad --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payloads/request_manager.py @@ -0,0 +1,45 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from asyncio import Future, shield +from uuid import UUID +from typing import Dict + +import botframework.streaming as streaming + + +class RequestManager: + def __init__( + self, + *, + pending_requests: Dict[UUID, "Future[streaming.ReceiveResponse]"] = None + ): + self._pending_requests = pending_requests or {} + + async def signal_response( + self, request_id: UUID, response: "streaming.ReceiveResponse" + ) -> bool: + # TODO: dive more into this logic + signal: Future = self._pending_requests.get(request_id) + if signal: + signal.set_result(response) + # TODO: double check this + # del self._pending_requests[request_id] + + return True + + return False + + async def get_response(self, request_id: UUID) -> "streaming.ReceiveResponse": + if request_id in self._pending_requests: + return None + + pending_request = Future() + self._pending_requests[request_id] = pending_request + + try: + response: streaming.ReceiveResponse = await shield(pending_request) + return response + + finally: + del self._pending_requests[request_id] diff --git a/libraries/botframework-streaming/botframework/streaming/payloads/response_message_stream.py b/libraries/botframework-streaming/botframework/streaming/payloads/response_message_stream.py new file mode 100644 index 000000000..04ae1dd77 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payloads/response_message_stream.py @@ -0,0 +1,11 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from uuid import uuid4, UUID + + +class ResponseMessageStream: + # pylint: disable=invalid-name + def __init__(self, *, id: UUID = None, content: object = None): + self.id = id or uuid4() + self.content = content diff --git a/libraries/botframework-streaming/botframework/streaming/payloads/send_operations.py b/libraries/botframework-streaming/botframework/streaming/payloads/send_operations.py new file mode 100644 index 000000000..82a7ecadc --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payloads/send_operations.py @@ -0,0 +1,71 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import asyncio +from uuid import UUID + +from botframework.streaming.payload_transport import PayloadSender +from botframework.streaming.payloads.disassemblers import ( + CancelDisassembler, + RequestDisassembler, + ResponseDisassembler, + ResponseMessageStreamDisassembler, +) +from botframework.streaming.payloads.models import PayloadTypes + + +class SendOperations: + def __init__(self, payload_sender: PayloadSender): + self._payload_sender = payload_sender + + async def send_request( + self, identifier: UUID, request: "streaming.StreamingRequest" + ): + disassembler = RequestDisassembler(self._payload_sender, identifier, request) + + await disassembler.disassemble() + + if request.streams: + tasks = [ + ResponseMessageStreamDisassembler( + self._payload_sender, content_stream + ).disassemble() + for content_stream in request.streams + ] + + await asyncio.gather(*tasks) + + async def send_response( + self, identifier: UUID, response: "streaming.StreamingResponse" + ): + disassembler = ResponseDisassembler(self._payload_sender, identifier, response) + + await disassembler.disassemble() + + if response.streams: + tasks = [ + ResponseMessageStreamDisassembler( + self._payload_sender, content_stream + ).disassemble() + for content_stream in response.streams + ] + + await asyncio.gather(*tasks) + + async def send_cancel_all(self, identifier: UUID): + disassembler = CancelDisassembler( + sender=self._payload_sender, + identifier=identifier, + type=PayloadTypes.CANCEL_ALL, + ) + + await disassembler.disassemble() + + async def send_cancel_stream(self, identifier: UUID): + disassembler = CancelDisassembler( + sender=self._payload_sender, + identifier=identifier, + type=PayloadTypes.CANCEL_STREAM, + ) + + await disassembler.disassemble() diff --git a/libraries/botframework-streaming/botframework/streaming/payloads/stream_manager.py b/libraries/botframework-streaming/botframework/streaming/payloads/stream_manager.py new file mode 100644 index 000000000..84e4cf4f3 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/payloads/stream_manager.py @@ -0,0 +1,49 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from uuid import UUID +from typing import Callable, Dict, List + +from botframework.streaming.payloads.assemblers import PayloadStreamAssembler +from botframework.streaming.payloads.models import Header + + +class StreamManager: + def __init__( + self, on_cancel_stream: Callable[[PayloadStreamAssembler], None] = None + ): + self._on_cancel_stream = on_cancel_stream or (lambda ocs: None) + self._active_assemblers: Dict[UUID, PayloadStreamAssembler] = {} + + def get_payload_assembler(self, identifier: UUID) -> PayloadStreamAssembler: + self._active_assemblers[identifier] = self._active_assemblers.get( + identifier, PayloadStreamAssembler(self, identifier) + ) + + return self._active_assemblers[identifier] + + def get_payload_stream(self, header: Header) -> "streaming.PayloadStream": + assembler = self.get_payload_assembler(header.id) + + return assembler.get_payload_as_stream() + + def on_receive( + self, header: Header, content_stream: List[int], content_length: int + ): + assembler = self._active_assemblers.get(header.id) + + if assembler: + assembler.on_receive(header, content_stream, content_length) + + def close_stream(self, identifier: UUID): + assembler = self._active_assemblers.get(identifier) + + if assembler: + del self._active_assemblers[identifier] + stream = assembler.get_payload_as_stream() + if ( + assembler.content_length + and len(stream) < assembler.content_length + or not assembler.end + ): + self._on_cancel_stream(assembler) diff --git a/libraries/botframework-streaming/botframework/streaming/protocol_adapter.py b/libraries/botframework-streaming/botframework/streaming/protocol_adapter.py new file mode 100644 index 000000000..71661bdf2 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/protocol_adapter.py @@ -0,0 +1,82 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +import asyncio +from uuid import UUID, uuid4 + +from botframework.streaming.payloads import ( + PayloadAssemblerManager, + RequestManager, + SendOperations, + StreamManager, +) +from botframework.streaming.payloads.assemblers import PayloadStreamAssembler +from botframework.streaming.payload_transport import PayloadSender, PayloadReceiver + +from .receive_request import ReceiveRequest +from .receive_response import ReceiveResponse +from .request_handler import RequestHandler +from .streaming_request import StreamingRequest + + +class ProtocolAdapter: + def __init__( + self, + request_handler: RequestHandler, + request_manager: RequestManager, + payload_sender: PayloadSender, + payload_receiver: PayloadReceiver, + handler_context: object = None, + ): + self._request_handler = request_handler + self._request_manager = request_manager + self._payload_sender = payload_sender + self._payload_receiver = payload_receiver + self._handler_context = handler_context + + self._send_operations = SendOperations(self._payload_sender) + # TODO: might be able to remove + self._stream_manager = StreamManager(self._on_cancel_stream) + self._assembler_manager = PayloadAssemblerManager( + self._stream_manager, self._on_receive_request, self._on_receive_response + ) + + self._payload_receiver.subscribe( + self._assembler_manager.get_payload_stream, + self._assembler_manager.on_receive, + ) + + async def send_request(self, request: StreamingRequest) -> ReceiveResponse: + if not request: + raise TypeError( + f"'request: {request.__class__.__name__}' argument can't be None" + ) + + request_id = uuid4() + response_task = self._request_manager.get_response(request_id) + request_task = self._send_operations.send_request(request_id, request) + + [_, response] = await asyncio.gather(request_task, response_task) + + return response + + async def _on_receive_request(self, identifier: UUID, request: ReceiveRequest): + # request is done, we can handle it + if self._request_handler: + response = await self._request_handler.process_request( + request, None, self._handler_context + ) + + if response: + await self._send_operations.send_response(identifier, response) + + async def _on_receive_response(self, identifier: UUID, response: ReceiveResponse): + # we received the response to something, signal it + await self._request_manager.signal_response(identifier, response) + + def _on_cancel_stream(self, content_stream_assembler: PayloadStreamAssembler): + # TODO: on original C# code content_stream_assembler is typed as IAssembler + asyncio.create_task( + self._send_operations.send_cancel_stream( + content_stream_assembler.identifier + ) + ) diff --git a/libraries/botframework-streaming/botframework/streaming/receive_request.py b/libraries/botframework-streaming/botframework/streaming/receive_request.py new file mode 100644 index 000000000..973630bd0 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/receive_request.py @@ -0,0 +1,29 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from typing import List + +from botframework.streaming.payloads import ContentStream + + +class ReceiveRequest: + def __init__( + self, *, verb: str = None, path: str = None, streams: List[ContentStream] + ): + self.verb = verb + self.path = path + self.streams: List[ContentStream] = streams or [] + + async def read_body_as_str(self) -> str: + try: + content_stream = self.streams[0] if self.streams else None + + if not content_stream: + # TODO: maybe raise an error + return "" + + # TODO: encoding double check + stream = await content_stream.stream.read_until_end() + return bytes(stream).decode("utf-8-sig") + except Exception as error: + raise error diff --git a/libraries/botframework-streaming/botframework/streaming/receive_response.py b/libraries/botframework-streaming/botframework/streaming/receive_response.py new file mode 100644 index 000000000..517874b5c --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/receive_response.py @@ -0,0 +1,52 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from typing import List, Union, Type + +from msrest.serialization import Model +from botframework.streaming.payloads import ContentStream +from botframework.streaming.payloads.models import Serializable + + +class ReceiveResponse: + def __init__(self, status_code: int = None, streams: List[ContentStream] = None): + self.status_code = status_code + self.streams = streams + + def read_body_as_json( + self, cls: Union[Type[Model], Type[Serializable]] + ) -> Union[Model, Serializable]: + try: + body_str = self.read_body_as_str() + body = None + + if issubclass(cls, Serializable): + body = cls().from_json(body_str) + elif isinstance(cls, Model): + body = cls.deserialize(body_str) + return body + except Exception as error: + raise error + + def read_body_as_str(self) -> str: + try: + content_stream = self.read_body() + + if not content_stream: + return "" + + # TODO: encoding double check + return content_stream.decode("utf8") + except Exception as error: + raise error + + def read_body(self) -> bytes: + try: + content_stream = self.streams[0] if self.streams else None + + if not content_stream: + return None + + return bytes(content_stream.stream) + except Exception as error: + raise error diff --git a/libraries/botframework-streaming/botframework/streaming/request_handler.py b/libraries/botframework-streaming/botframework/streaming/request_handler.py new file mode 100644 index 000000000..3214eb7a1 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/request_handler.py @@ -0,0 +1,15 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from abc import ABC +from logging import Logger + +from .receive_request import ReceiveRequest +from .streaming_response import StreamingResponse + + +class RequestHandler(ABC): + async def process_request( + self, request: ReceiveRequest, logger: Logger, context: object + ) -> StreamingResponse: + raise NotImplementedError() diff --git a/libraries/botframework-streaming/botframework/streaming/streaming_request.py b/libraries/botframework-streaming/botframework/streaming/streaming_request.py new file mode 100644 index 000000000..6157a04d6 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/streaming_request.py @@ -0,0 +1,87 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import json +from uuid import UUID, uuid4 +from typing import List, Union + +from msrest.serialization import Model +from botframework.streaming.payloads import ResponseMessageStream +from botframework.streaming.payloads.models import Serializable + + +class StreamingRequest: + GET = "GET" + POST = "POST" + PUT = "PUT" + DELETE = "DELETE" + + def __init__( + self, + *, + verb: str = None, + path: str = None, + streams: List[ResponseMessageStream] = None, + ): + self.verb = verb + self.path = path + self.streams = streams + + @staticmethod + def create_request( + method: str, path: str = None, body: object = None + ) -> "StreamingRequest": + if not method: + return None + + request = StreamingRequest(verb=method, path=path,) + + if body: + request.add_stream(body) + + return request + + @staticmethod + def create_get(path: str = None, body: object = None) -> "StreamingRequest": + return StreamingRequest.create_request("GET", path, body) + + @staticmethod + def create_post(path: str = None, body: object = None) -> "StreamingRequest": + return StreamingRequest.create_request("POST", path, body) + + @staticmethod + def create_put(path: str = None, body: object = None) -> "StreamingRequest": + return StreamingRequest.create_request("PUT", path, body) + + @staticmethod + def create_delete(path: str = None, body: object = None) -> "StreamingRequest": + return StreamingRequest.create_request("DELETE", path, body) + + def set_body(self, body: Union[str, Serializable, Model, bytes]): + # TODO: verify if msrest.serialization.Model is necessary + if not body: + return + + if isinstance(body, bytes): + pass + else: + if isinstance(body, Serializable): + body = body.to_json() + elif isinstance(body, Model): + body = json.dumps(body.as_dict()) + + body = body.encode("ascii") + + self.add_stream(list(body)) + + def add_stream(self, content: object, stream_id: UUID = None): + if not content: + raise TypeError( + f"'content: {content.__class__.__name__}' argument can't be None" + ) + if not self.streams: + self.streams = [] + + self.streams.append( + ResponseMessageStream(id=stream_id or uuid4(), content=content) + ) diff --git a/libraries/botframework-streaming/botframework/streaming/streaming_response.py b/libraries/botframework-streaming/botframework/streaming/streaming_response.py new file mode 100644 index 000000000..a97dad475 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/streaming_response.py @@ -0,0 +1,50 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import json +from uuid import UUID, uuid4 +from typing import List, Union + +from msrest.serialization import Model +from botframework.streaming.payloads import ResponseMessageStream +from botframework.streaming.payloads.models import Serializable + + +class StreamingResponse: + def __init__( + self, *, status_code: int = None, streams: List[ResponseMessageStream] = None + ): + self.status_code = status_code + self.streams = streams + + def add_stream(self, content: object, identifier: UUID = None): + if not content: + raise TypeError("content can't be None") + + if self.streams is None: + self.streams: List[ResponseMessageStream] = [] + + self.streams.append( + ResponseMessageStream(id=identifier or uuid4(), content=content) + ) + + def set_body(self, body: Union[str, Serializable, Model]): + # TODO: verify if msrest.serialization.Model is necessary + if not body: + return + + if isinstance(body, Serializable): + body = body.to_json() + elif isinstance(body, Model): + body = json.dumps(body.as_dict()) + + self.add_stream(list(body.encode())) + + @staticmethod + def create_response(status_code: int, body: object) -> "StreamingResponse": + response = StreamingResponse(status_code=status_code) + + if body: + response.add_stream(body) + + return response diff --git a/libraries/botframework-streaming/botframework/streaming/transport/__init__.py b/libraries/botframework-streaming/botframework/streaming/transport/__init__.py new file mode 100644 index 000000000..3939e47a5 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/transport/__init__.py @@ -0,0 +1,18 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from .disconnected_event_args import DisconnectedEventArgs +from .streaming_transport_service import StreamingTransportService +from .transport_base import TransportBase +from .transport_constants import TransportConstants +from .transport_receiver_base import TransportReceiverBase +from .transport_sender_base import TransportSenderBase + +__all__ = [ + "DisconnectedEventArgs", + "StreamingTransportService", + "TransportBase", + "TransportConstants", + "TransportReceiverBase", + "TransportSenderBase", +] diff --git a/libraries/botframework-streaming/botframework/streaming/transport/disconnected_event_args.py b/libraries/botframework-streaming/botframework/streaming/transport/disconnected_event_args.py new file mode 100644 index 000000000..9db882219 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/transport/disconnected_event_args.py @@ -0,0 +1,10 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + + +class DisconnectedEventArgs: + def __init__(self, *, reason: str = None): + self.reason = reason + + +DisconnectedEventArgs.empty = DisconnectedEventArgs() diff --git a/libraries/botframework-streaming/botframework/streaming/transport/streaming_transport_service.py b/libraries/botframework-streaming/botframework/streaming/transport/streaming_transport_service.py new file mode 100644 index 000000000..498f7198c --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/transport/streaming_transport_service.py @@ -0,0 +1,12 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from abc import ABC + + +class StreamingTransportService(ABC): + async def start(self): + raise NotImplementedError() + + async def send(self, request): + raise NotImplementedError() diff --git a/libraries/botframework-streaming/botframework/streaming/transport/transport_base.py b/libraries/botframework-streaming/botframework/streaming/transport/transport_base.py new file mode 100644 index 000000000..4955f96e8 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/transport/transport_base.py @@ -0,0 +1,10 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + + +class TransportBase: + def __init__(self): + self.is_connected: bool = None + + def close(self): + return diff --git a/libraries/botframework-streaming/botframework/streaming/transport/transport_constants.py b/libraries/botframework-streaming/botframework/streaming/transport/transport_constants.py new file mode 100644 index 000000000..139099512 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/transport/transport_constants.py @@ -0,0 +1,11 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from abc import ABC + + +class TransportConstants(ABC): + MAX_PAYLOAD_LENGTH = 4096 + MAX_HEADER_LENGTH = 48 + MAX_LENGTH = 999999 + MIN_LENGTH = 0 diff --git a/libraries/botframework-streaming/botframework/streaming/transport/transport_receiver_base.py b/libraries/botframework-streaming/botframework/streaming/transport/transport_receiver_base.py new file mode 100644 index 000000000..e7e849a49 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/transport/transport_receiver_base.py @@ -0,0 +1,11 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from abc import ABC + +from .transport_base import TransportBase + + +class TransportReceiverBase(ABC, TransportBase): + async def receive(self, buffer: object, offset: int, count: int) -> int: + raise NotImplementedError() diff --git a/libraries/botframework-streaming/botframework/streaming/transport/transport_sender_base.py b/libraries/botframework-streaming/botframework/streaming/transport/transport_sender_base.py new file mode 100644 index 000000000..33d647159 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/transport/transport_sender_base.py @@ -0,0 +1,11 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from abc import ABC + +from .transport_base import TransportBase + + +class TransportSenderBase(ABC, TransportBase): + async def send(self, buffer: object, offset: int, count: int) -> int: + raise NotImplementedError() diff --git a/libraries/botframework-streaming/botframework/streaming/transport/web_socket/__init__.py b/libraries/botframework-streaming/botframework/streaming/transport/web_socket/__init__.py new file mode 100644 index 000000000..ef5847cbf --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/transport/web_socket/__init__.py @@ -0,0 +1,20 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from .web_socket import WebSocketMessage +from .web_socket import WebSocket +from .web_socket_close_status import WebSocketCloseStatus +from .web_socket_server import WebSocketServer +from .web_socket_message_type import WebSocketMessageType +from .web_socket_transport import WebSocketTransport +from .web_socket_state import WebSocketState + +__all__ = [ + "WebSocketMessage", + "WebSocket", + "WebSocketCloseStatus", + "WebSocketMessageType", + "WebSocketServer", + "WebSocketTransport", + "WebSocketState", +] diff --git a/libraries/botframework-streaming/botframework/streaming/transport/web_socket/web_socket.py b/libraries/botframework-streaming/botframework/streaming/transport/web_socket/web_socket.py new file mode 100644 index 000000000..c50cc1181 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/transport/web_socket/web_socket.py @@ -0,0 +1,35 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from abc import ABC +from typing import List, Any + +from .web_socket_close_status import WebSocketCloseStatus +from .web_socket_state import WebSocketState +from .web_socket_message_type import WebSocketMessageType + + +class WebSocketMessage: + def __init__(self, *, message_type: WebSocketMessageType, data: List[int]): + self.message_type = message_type + self.data = data + + +class WebSocket(ABC): + def dispose(self): + raise NotImplementedError() + + async def close(self, close_status: WebSocketCloseStatus, status_description: str): + raise NotImplementedError() + + async def receive(self) -> WebSocketMessage: + raise NotImplementedError() + + async def send( + self, buffer: Any, message_type: WebSocketMessageType, end_of_message: bool + ): + raise NotImplementedError() + + @property + def status(self) -> WebSocketState: + raise NotImplementedError() diff --git a/libraries/botframework-streaming/botframework/streaming/transport/web_socket/web_socket_close_status.py b/libraries/botframework-streaming/botframework/streaming/transport/web_socket/web_socket_close_status.py new file mode 100644 index 000000000..417c6588c --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/transport/web_socket/web_socket_close_status.py @@ -0,0 +1,17 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from enum import IntEnum + + +class WebSocketCloseStatus(IntEnum): + NORMAL_CLOSURE = 1000 + ENDPOINT_UNAVAILABLE = 1001 + PROTOCOL_ERROR = 1002 + INVALID_MESSAGE_TYPE = 1003 + EMPTY = 1005 + INVALID_PAYLOAD_DATA = 1007 + POLICY_VIOLATION = 1008 + MESSAGE_TOO_BIG = 1009 + MANDATORY_EXTENSION = 1010 + INTERNAL_SERVER_ERROR = 1011 diff --git a/libraries/botframework-streaming/botframework/streaming/transport/web_socket/web_socket_message_type.py b/libraries/botframework-streaming/botframework/streaming/transport/web_socket/web_socket_message_type.py new file mode 100644 index 000000000..658b7e073 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/transport/web_socket/web_socket_message_type.py @@ -0,0 +1,14 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from enum import IntEnum + + +class WebSocketMessageType(IntEnum): + # websocket spec types + CONTINUATION = 0 + TEXT = 1 + BINARY = 2 + PING = 9 + PONG = 10 + CLOSE = 8 diff --git a/libraries/botframework-streaming/botframework/streaming/transport/web_socket/web_socket_server.py b/libraries/botframework-streaming/botframework/streaming/transport/web_socket/web_socket_server.py new file mode 100644 index 000000000..67d0d8336 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/transport/web_socket/web_socket_server.py @@ -0,0 +1,97 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from asyncio import Future, iscoroutinefunction, isfuture +from typing import Callable + +from botframework.streaming import ( + ProtocolAdapter, + ReceiveResponse, + RequestHandler, + StreamingRequest, +) +from botframework.streaming.payloads import RequestManager +from botframework.streaming.payload_transport import PayloadSender, PayloadReceiver +from botframework.streaming.transport import DisconnectedEventArgs + +from .web_socket import WebSocket +from .web_socket_transport import WebSocketTransport + + +class WebSocketServer: + def __init__(self, socket: WebSocket, request_handler: RequestHandler): + if socket is None: + raise TypeError( + f"'socket: {socket.__class__.__name__}' argument can't be None" + ) + if not request_handler: + raise TypeError( + f"'request_handler: {request_handler.__class__.__name__}' argument can't be None" + ) + + self.disconnected_event_handler: Callable[ + [object, DisconnectedEventArgs], None + ] = None + + self._web_socket_transport = WebSocketTransport(socket) + self._request_handler = request_handler + self._request_manager = RequestManager() + self._sender = PayloadSender() + self._sender.disconnected = self._on_connection_disconnected + self._receiver = PayloadReceiver() + self._receiver.disconnected = self._on_connection_disconnected + self._protocol_adapter = ProtocolAdapter( + self._request_handler, self._request_manager, self._sender, self._receiver + ) + self._closed_signal: Future = None + self._is_disconnecting: bool = False + + @property + def is_connected(self) -> bool: + return self._sender.is_connected and self._receiver.is_connected + + async def start(self): + self._closed_signal = Future() + self._sender.connect(self._web_socket_transport) + await self._receiver.connect(self._web_socket_transport) + + return self._closed_signal + + async def send(self, request: StreamingRequest) -> ReceiveResponse: + if not request: + raise TypeError( + f"'request: {request.__class__.__name__}' argument can't be None" + ) + + if not self._sender.is_connected or not self._sender.is_connected: + raise RuntimeError("The server is not connected") + + return await self._protocol_adapter.send_request(request) + + async def disconnect(self): + await self._sender.disconnect() + await self._receiver.disconnect() + + async def _on_connection_disconnected( + self, sender: object, event_args: object # pylint: disable=unused-argument + ): + if not self._is_disconnecting: + self._is_disconnecting = True + + if self._closed_signal: + self._closed_signal.set_result("close") + self._closed_signal = None + + if sender in [self._sender, self._receiver]: + if iscoroutinefunction(sender.disconnect) or isfuture( + sender.disconnect + ): + await sender.disconnect() + else: + sender.disconnect() + + if self.disconnected_event_handler: + # pylint: disable=not-callable + self.disconnected_event_handler(self, DisconnectedEventArgs.empty) + + self._is_disconnecting = False diff --git a/libraries/botframework-streaming/botframework/streaming/transport/web_socket/web_socket_state.py b/libraries/botframework-streaming/botframework/streaming/transport/web_socket/web_socket_state.py new file mode 100644 index 000000000..fddd42ec2 --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/transport/web_socket/web_socket_state.py @@ -0,0 +1,9 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from enum import IntEnum + + +class WebSocketState(IntEnum): + OPEN = 2 + CLOSED = 5 diff --git a/libraries/botframework-streaming/botframework/streaming/transport/web_socket/web_socket_transport.py b/libraries/botframework-streaming/botframework/streaming/transport/web_socket/web_socket_transport.py new file mode 100644 index 000000000..bd327affa --- /dev/null +++ b/libraries/botframework-streaming/botframework/streaming/transport/web_socket/web_socket_transport.py @@ -0,0 +1,89 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import traceback +from typing import List + +from botframework.streaming.transport import TransportReceiverBase, TransportSenderBase + +from .web_socket import WebSocket +from .web_socket_message_type import WebSocketMessageType +from .web_socket_close_status import WebSocketCloseStatus +from .web_socket_state import WebSocketState + + +class WebSocketTransport(TransportReceiverBase, TransportSenderBase): + def __init__(self, web_socket: WebSocket): + self._socket = web_socket + + @property + def is_connected(self): + # TODO: mock logic + return self._socket.status == WebSocketState.OPEN + + async def close(self): + # TODO: mock logic + if self._socket.status == WebSocketState.OPEN: + try: + await self._socket.close( + WebSocketCloseStatus.NORMAL_CLOSURE, + "Closed by the WebSocketTransport", + ) + except Exception: + # pylint: disable=pointless-string-statement + """ + Any exception thrown here will be caused by the socket already being closed, + which is the state we want to put it in by calling this method, which + means we don't care if it was already closed and threw an exception + when we tried to close it again. + """ + traceback.print_exc() + + # TODO: might need to remove offset and count if no segmentation possible + # TODO: considering to create a BFTransportBuffer class to abstract the logic of binary buffers adapting to + # current interfaces + async def receive( + self, buffer: List[int], offset: int = 0, count: int = None + ) -> int: + try: + if self._socket: + result = await self._socket.receive() + buffer_index = offset + result_length = count if count is not None else len(result.data) + for result_index in range(result_length): + buffer[buffer_index] = result.data[result_index] + buffer_index += 1 + if result.message_type == WebSocketMessageType.CLOSE: + await self._socket.close( + WebSocketCloseStatus.NORMAL_CLOSURE, "Socket closed" + ) + + # Depending on ws implementation library next line might not be necessary + if self._socket.status == WebSocketState.CLOSED: + self._socket.dispose() + + return result_length + except Exception as error: + # Exceptions of the three types below will also have set the socket's state to closed, which fires an + # event consumers of this class are subscribed to and have handling around. Any other exception needs to + # be thrown to cause a non-transport-connectivity failure. + raise error + + # TODO: might need to remove offset and count if no segmentation possible (or put them in BFTransportBuffer) + async def send(self, buffer: List[int], offset: int = 0, count: int = None) -> int: + try: + if self._socket: + await self._socket.send( + buffer[offset:count] if count is not None else buffer, + WebSocketMessageType.BINARY, + True, + ) + return count or len(buffer) + except Exception as error: + # Exceptions of the three types below will also have set the socket's state to closed, which fires an + # event consumers of this class are subscribed to and have handling around. Any other exception needs to + # be thrown to cause a non-transport-connectivity failure. + traceback.print_exc() + raise error + + return 0 diff --git a/libraries/botframework-streaming/requirements.txt b/libraries/botframework-streaming/requirements.txt new file mode 100644 index 000000000..1d6f7ab31 --- /dev/null +++ b/libraries/botframework-streaming/requirements.txt @@ -0,0 +1,4 @@ +msrest==0.6.10 +botframework-connector>=4.7.1 +botbuilder-schema>=4.7.1 +aiohttp>=3.6.2 \ No newline at end of file diff --git a/libraries/botframework-streaming/setup.py b/libraries/botframework-streaming/setup.py new file mode 100644 index 000000000..c7baf511f --- /dev/null +++ b/libraries/botframework-streaming/setup.py @@ -0,0 +1,51 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import os +from setuptools import setup + +VERSION = os.environ["packageVersion"] if "packageVersion" in os.environ else "4.12.0" +REQUIRES = [ + "botbuilder-schema>=4.12.0", + "botframework-connector>=4.12.0", + "botbuilder-core>=4.12.0", +] + +root = os.path.abspath(os.path.dirname(__file__)) + +with open(os.path.join(root, "botframework", "streaming", "about.py")) as f: + package_info = {} + info = f.read() + exec(info, package_info) + +with open(os.path.join(root, "README.rst"), encoding="utf-8") as f: + long_description = f.read() + +setup( + name=package_info["__title__"], + version=package_info["__version__"], + url=package_info["__uri__"], + author=package_info["__author__"], + description=package_info["__description__"], + keywords=["BotFrameworkStreaming", "bots", "ai", "botframework", "botframework",], + long_description=long_description, + long_description_content_type="text/x-rst", + license=package_info["__license__"], + packages=[ + "botframework.streaming", + "botframework.streaming.payloads", + "botframework.streaming.payloads.models", + "botframework.streaming.payload_transport", + "botframework.streaming.transport", + "botframework.streaming.transport.web_socket", + ], + install_requires=REQUIRES, + classifiers=[ + "Programming Language :: Python :: 3.7", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Development Status :: 5 - Production/Stable", + "Topic :: Scientific/Engineering :: Artificial Intelligence", + ], +) diff --git a/libraries/botframework-streaming/tests/test_payload_processor.py b/libraries/botframework-streaming/tests/test_payload_processor.py new file mode 100644 index 000000000..cb892ff16 --- /dev/null +++ b/libraries/botframework-streaming/tests/test_payload_processor.py @@ -0,0 +1,60 @@ +from typing import List +from uuid import UUID, uuid4 + +import aiounittest +from botframework.streaming import ReceiveRequest +from botframework.streaming.payloads import StreamManager +from botframework.streaming.payloads.assemblers import ( + ReceiveRequestAssembler, + PayloadStreamAssembler, +) +from botframework.streaming.payloads.models import ( + Header, + RequestPayload, + StreamDescription, +) + + +class MockStreamManager(StreamManager): + def __init__(self): + super().__init__() + + def get_payload_assembler(self, identifier: UUID) -> PayloadStreamAssembler: + return PayloadStreamAssembler(self, identifier) + + +class TestPayloadProcessor(aiounittest.AsyncTestCase): + async def test_process_request(self): + # Arrange + header_id: UUID = uuid4() + header = Header(type="A", id=header_id, end=True) + header.payload_length = 3 + stream_manager = MockStreamManager() + + on_completed_called = False + + async def mock_on_completed(identifier: UUID, request: ReceiveRequest): + nonlocal on_completed_called + assert identifier == header_id + assert request.verb == "POST" + assert request.path == "/api/messages" + assert len(request.streams) == 1 + on_completed_called = True + + sut = ReceiveRequestAssembler( + header, stream_manager, on_completed=mock_on_completed + ) + + # Act + stream_id: UUID = uuid4() + streams: List[StreamDescription] = [ + StreamDescription(id=str(stream_id), content_type="json", length=100) + ] + payload = RequestPayload( + verb="POST", path="/api/messages", streams=streams + ).to_json() + payload_stream: List[int] = list(bytes(payload, "utf-8")) + await sut.process_request(payload_stream) + + # Assert + assert on_completed_called diff --git a/libraries/botframework-streaming/tests/test_payload_receiver.py b/libraries/botframework-streaming/tests/test_payload_receiver.py new file mode 100644 index 000000000..c9e2aa58c --- /dev/null +++ b/libraries/botframework-streaming/tests/test_payload_receiver.py @@ -0,0 +1,70 @@ +from typing import List + +import aiounittest + +from botframework.streaming import PayloadStream +from botframework.streaming.payload_transport import PayloadReceiver +from botframework.streaming.transport import TransportReceiverBase + + +class MockTransportReceiver(TransportReceiverBase): + def __init__(self, mock_header: bytes, mock_payload: bytes): + self._is_connected = True + self._mock_gen = self._mock_receive(mock_header, mock_payload) + + def _mock_receive(self, mock_header: bytes, mock_payload: bytes): + yield mock_header + yield mock_payload + + @property + def is_connected(self): + if self._is_connected: + self._is_connected = False + return True + return False + + async def close(self): + return + + async def receive(self, buffer: object, offset: int, count: int) -> int: + resp_buffer = list(next(self._mock_gen)) + for index, val in enumerate(resp_buffer): + buffer[index] = val + return len(resp_buffer) + + +class MockStream(PayloadStream): + # pylint: disable=super-init-not-called + def __init__(self): + self.buffer = None + self._producer_length = 0 # total length + + def give_buffer(self, buffer: List[int]): + self.buffer = buffer + + +class TestBotFrameworkHttpClient(aiounittest.AsyncTestCase): + async def test_connect(self): + mock_header = b"S.000004.e35ed534-0808-4acf-af1e-24aa81d2b31d.1\n" + mock_payload = b"test" + + mock_receiver = MockTransportReceiver(mock_header, mock_payload) + mock_stream = MockStream() + + receive_action_called = False + + def mock_get_stream(header): # pylint: disable=unused-argument + return mock_stream + + def mock_receive_action(header, stream, offset): + nonlocal receive_action_called + assert header.type == "S" + assert len(stream.buffer) == offset + receive_action_called = True + + sut = PayloadReceiver() + sut.subscribe(mock_get_stream, mock_receive_action) + await sut.connect(mock_receiver) + + assert bytes(mock_stream.buffer) == mock_payload + assert receive_action_called diff --git a/libraries/botframework-streaming/tests/test_payload_sender.py b/libraries/botframework-streaming/tests/test_payload_sender.py new file mode 100644 index 000000000..242e0de45 --- /dev/null +++ b/libraries/botframework-streaming/tests/test_payload_sender.py @@ -0,0 +1,60 @@ +from asyncio import Semaphore +from typing import List +from uuid import UUID, uuid4 + +import aiounittest +from botframework.streaming.payload_transport import PayloadSender +from botframework.streaming.payloads import HeaderSerializer +from botframework.streaming.payloads.models import Header +from botframework.streaming.transport import TransportSenderBase + + +class MockTransportSender(TransportSenderBase): + def __init__(self): + super().__init__() + self.send_called = Semaphore(0) + + async def send(self, buffer: List[int], offset: int, count: int) -> int: + # Assert + if count == 48: # Header + print("Validating Header...") + header = HeaderSerializer.deserialize(buffer, offset, count) + assert header.type == "A" + assert header.payload_length == 3 + assert header.end + else: # Payload + print("Validating Payload...") + assert count == 3 + self.send_called.release() + + return count + + def close(self): + pass + + +class TestPayloadSender(aiounittest.AsyncTestCase): + async def test_send(self): + # Arrange + sut = PayloadSender() + sender = MockTransportSender() + sut.connect(sender) + + header_id: UUID = uuid4() + header = Header(type="A", id=header_id, end=True) + header.payload_length = 3 + payload = [1, 2, 3] + + async def mock_sent_callback(callback_header: Header): + print( + f"{callback_header.type}.{callback_header.payload_length}.{callback_header.id}.{callback_header.end}" + ) + + # Act + sut.send_payload( + header, payload, is_length_known=True, sent_callback=mock_sent_callback + ) + + # Assert + await sender.send_called.acquire() + await sut.disconnect() diff --git a/pipelines/botbuilder-python-ci.yml b/pipelines/botbuilder-python-ci.yml index 89987ffc9..af2fc58c6 100644 --- a/pipelines/botbuilder-python-ci.yml +++ b/pipelines/botbuilder-python-ci.yml @@ -6,7 +6,6 @@ variables: COVERALLS_GIT_COMMIT: $(Build.SourceVersion) COVERALLS_SERVICE_JOB_ID: $(Build.BuildId) COVERALLS_SERVICE_NAME: python-ci - python.36: 3.6.x python.37: 3.7.x python.38: 3.8.x # PythonCoverallsToken: get this from Azure @@ -20,8 +19,6 @@ jobs: strategy: matrix: - Python36: - PYTHON_VERSION: '$(python.36)' Python37: PYTHON_VERSION: '$(python.37)' Python38: @@ -38,9 +35,6 @@ jobs: inputs: versionSpec: '$(PYTHON_VERSION)' - - script: 'sudo ln -s /opt/hostedtoolcache/Python/3.6.9/x64/lib/libpython3.6m.so.1.0 /usr/lib/libpython3.6m.so' - displayName: libpython3.6m - - script: | python -m pip install --upgrade pip pip install -e ./libraries/botbuilder-schema @@ -54,6 +48,7 @@ jobs: pip install -e ./libraries/botbuilder-integration-applicationinsights-aiohttp pip install -e ./libraries/botbuilder-adapters-slack pip install -e ./libraries/botbuilder-integration-aiohttp + pip install -e ./libraries/botframework-streaming pip install -r ./libraries/botframework-connector/tests/requirements.txt pip install -r ./libraries/botbuilder-core/tests/requirements.txt pip install -r ./libraries/botbuilder-ai/tests/requirements.txt diff --git a/tests/skills/streamming-extensions/app.py b/tests/skills/streamming-extensions/app.py new file mode 100644 index 000000000..450c22b17 --- /dev/null +++ b/tests/skills/streamming-extensions/app.py @@ -0,0 +1,87 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import sys +import traceback +from datetime import datetime + +from aiohttp import web +from aiohttp.web import Request, Response, json_response +from botbuilder.core import ( + BotFrameworkAdapterSettings, + TurnContext, + BotFrameworkAdapter, +) +from botbuilder.core.integration import aiohttp_error_middleware +from botbuilder.schema import Activity, ActivityTypes + +from bots import EchoBot +from config import DefaultConfig + +CONFIG = DefaultConfig() + +# Create adapter. +# See https://aka.ms/about-bot-adapter to learn more about how bots work. +SETTINGS = BotFrameworkAdapterSettings(CONFIG.APP_ID, CONFIG.APP_PASSWORD) +ADAPTER = BotFrameworkAdapter(SETTINGS) + + +# Catch-all for errors. +async def on_error(context: TurnContext, error: Exception): + # This check writes out errors to console log .vs. app insights. + # NOTE: In production environment, you should consider logging this to Azure + # application insights. + print(f"\n [on_turn_error] unhandled error: {error}", file=sys.stderr) + traceback.print_exc() + + # Send a message to the user + await context.send_activity("The bot encountered an error or bug.") + await context.send_activity( + "To continue to run this bot, please fix the bot source code." + ) + # Send a trace activity if we're talking to the Bot Framework Emulator + if context.activity.channel_id == "emulator": + # Create a trace activity that contains the error object + trace_activity = Activity( + label="TurnError", + name="on_turn_error Trace", + timestamp=datetime.utcnow(), + type=ActivityTypes.trace, + value=f"{error}", + value_type="https://www.botframework.com/schemas/error", + ) + # Send a trace activity, which will be displayed in Bot Framework Emulator + await context.send_activity(trace_activity) + + +ADAPTER.on_turn_error = on_error + +# Create the Bot +BOT = EchoBot() + + +# Listen for incoming requests on /api/messages +async def messages(req: Request) -> Response: + # Main bot message handler. + if "application/json" in req.headers["Content-Type"]: + body = await req.json() + else: + return Response(status=415) + + activity = Activity().deserialize(body) + auth_header = req.headers["Authorization"] if "Authorization" in req.headers else "" + + response = await ADAPTER.process_activity(activity, auth_header, BOT.on_turn) + if response: + return json_response(data=response.body, status=response.status) + return Response(status=201) + + +APP = web.Application(middlewares=[aiohttp_error_middleware]) +APP.router.add_post("/api/messages", messages) + +if __name__ == "__main__": + try: + web.run_app(APP, host="localhost", port=CONFIG.PORT) + except Exception as error: + raise error diff --git a/tests/skills/streamming-extensions/bots/__init__.py b/tests/skills/streamming-extensions/bots/__init__.py new file mode 100644 index 000000000..f95fbbbad --- /dev/null +++ b/tests/skills/streamming-extensions/bots/__init__.py @@ -0,0 +1,6 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from .echo_bot import EchoBot + +__all__ = ["EchoBot"] diff --git a/tests/skills/streamming-extensions/bots/echo_bot.py b/tests/skills/streamming-extensions/bots/echo_bot.py new file mode 100644 index 000000000..90a094640 --- /dev/null +++ b/tests/skills/streamming-extensions/bots/echo_bot.py @@ -0,0 +1,19 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +from botbuilder.core import ActivityHandler, MessageFactory, TurnContext +from botbuilder.schema import ChannelAccount + + +class EchoBot(ActivityHandler): + async def on_members_added_activity( + self, members_added: [ChannelAccount], turn_context: TurnContext + ): + for member in members_added: + if member.id != turn_context.activity.recipient.id: + await turn_context.send_activity("Hello and welcome!") + + async def on_message_activity(self, turn_context: TurnContext): + return await turn_context.send_activity( + MessageFactory.text(f"Echo: {turn_context.activity.text}") + ) diff --git a/tests/skills/streamming-extensions/config.py b/tests/skills/streamming-extensions/config.py new file mode 100644 index 000000000..e007d0fa9 --- /dev/null +++ b/tests/skills/streamming-extensions/config.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python3 +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import os + +""" Bot Configuration """ + + +class DefaultConfig: + """ Bot Configuration """ + + PORT = 3978 + APP_ID = os.environ.get("MicrosoftAppId", "") + APP_PASSWORD = os.environ.get("MicrosoftAppPassword", "")