Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
dbad674
Initial commit for POC. This is WIP
axelsrz Apr 1, 2020
62f8a6f
Updates for POC. This is WIP
axelsrz Apr 2, 2020
8be271c
Pylint: POC updates
axelsrz Apr 2, 2020
bd4f691
Updates on POC, protocol adapter pending
axelsrz Apr 6, 2020
95f250f
Updates on POC, protocol adapter in progress
axelsrz Apr 7, 2020
7224a5f
POC almost ready for testing, changes on BFAdapter pending
axelsrz Apr 9, 2020
e233297
POC waiting on client injection in connector
axelsrz Apr 10, 2020
cb2ba6d
black: POC waiting on client injection in connector
axelsrz Apr 10, 2020
c079dc1
POC for http client injection in connector
axelsrz Apr 20, 2020
0ae3fd9
got rid of importing errors when loading libraries. Currently in the …
axelsrz Apr 23, 2020
e8ed7eb
Fix couple of errors, still in debugging phase. Initial receive doesn…
axelsrz Apr 30, 2020
0906287
Several fixes including deadlock in threading, serialization and mino…
axelsrz May 5, 2020
5a4f8c3
More errors fixed, trying to fit websocket into ms rest pipeline. Rec…
axelsrz May 13, 2020
28f09e4
Disassembler fixes, sender struggling to send through socket
axelsrz May 14, 2020
1432097
changes on disassembler and receiver
axelsrz May 26, 2020
579735b
Solved conflicts
axelsrz Mar 4, 2021
b29c586
adding streaming to ci pipeline
axelsrz Mar 5, 2021
621c79b
Pylint fixes
axelsrz Mar 10, 2021
aaa8068
updated streaming setup.py
axelsrz Mar 11, 2021
a0424c2
Removing 3.6
axelsrz Mar 11, 2021
b179524
Changing all concurrent mechanisms in streaming to asyncio
axelsrz Mar 24, 2021
87c4111
Added validation for abrupt closing of websocket, added tracebacks an…
axelsrz Mar 30, 2021
344840a
Merge branch 'main' into axsuarez/streaming-extensions
axelsrz Apr 19, 2021
d670237
UnblockActivityProcessorThread
msomanathan Apr 19, 2021
a194366
Header serialization fix and stream serialization fix.
axelsrz Apr 21, 2021
a56176a
Parity change in the internal buffer structure of the payload_stream …
axelsrz Apr 23, 2021
a583234
Fixes on the RecieveResponse path
axelsrz Apr 24, 2021
f2991b6
PayloadStream length fix
axelsrz Apr 28, 2021
1e70c04
Grouping related imports
axelsrz Apr 28, 2021
c7b8faf
payload receiver unit test (#1664)
axelsrz Apr 30, 2021
f0de9a0
Payload sender unit test (#1666)
msomanathan May 3, 2021
7e68cb1
blackcheck
msomanathan May 3, 2021
ad70fa5
pylintfix
msomanathan May 3, 2021
9601b2e
test_req_processor (#1668)
msomanathan May 5, 2021
004dfff
Axsuarez/streaming receive loop unittest (#1667)
axelsrz May 5, 2021
6d23924
renaming straming to botframework scope
axelsrz May 6, 2021
4561352
Updating pipeline
axelsrz May 6, 2021
65aca0b
Removing sleep() safety measure
axelsrz May 6, 2021
4da1fc2
Remove unused import
axelsrz May 6, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ disable=print-statement,
too-many-return-statements,
import-error,
no-name-in-module,
too-many-branches
too-many-branches,
too-many-ancestors,
too-many-nested-blocks

# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
Expand Down
10 changes: 10 additions & 0 deletions libraries/botbuilder-core/botbuilder/core/bot_framework_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -1282,6 +1282,16 @@ async def exchange_token_from_credentials(
)
raise TypeError(f"exchange token returned improper result: {type(result)}")

def can_process_outgoing_activity(
self, activity: Activity # pylint: disable=unused-argument
) -> bool:
return False

async def process_outgoing_activity(
self, turn_context: TurnContext, activity: Activity
) -> ResourceResponse:
raise Exception("NotImplemented")

@staticmethod
def key_for_connector_client(service_url: str, app_id: str, scope: str):
return f"{service_url if service_url else ''}:{app_id if app_id else ''}:{scope if scope else ''}"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

import traceback

from aiohttp.web import (
middleware,
HTTPNotImplemented,
Expand All @@ -26,4 +28,5 @@ async def aiohttp_error_middleware(request, handler):
except KeyError:
raise HTTPNotFound()
except Exception:
traceback.print_exc()
raise HTTPInternalServerError()
14 changes: 14 additions & 0 deletions libraries/botbuilder-core/botbuilder/core/streaming/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

from .bot_framework_http_adapter_base import BotFrameworkHttpAdapterBase
from .streaming_activity_processor import StreamingActivityProcessor
from .streaming_request_handler import StreamingRequestHandler
from .version_info import VersionInfo

__all__ = [
"BotFrameworkHttpAdapterBase",
"StreamingActivityProcessor",
"StreamingRequestHandler",
"VersionInfo",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

from http import HTTPStatus
from typing import Awaitable, Callable, List

from botbuilder.core import (
Bot,
BotFrameworkAdapter,
BotFrameworkAdapterSettings,
InvokeResponse,
TurnContext,
)
from botbuilder.schema import Activity, ActivityTypes, ResourceResponse
from botframework.connector import AsyncBfPipeline, BotFrameworkConnectorConfiguration
from botframework.connector.aio import ConnectorClient
from botframework.connector.auth import (
ClaimsIdentity,
MicrosoftAppCredentials,
MicrosoftGovernmentAppCredentials,
)

from .streaming_activity_processor import StreamingActivityProcessor
from .streaming_request_handler import StreamingRequestHandler
from .streaming_http_client import StreamingHttpDriver


class BotFrameworkHttpAdapterBase(BotFrameworkAdapter, StreamingActivityProcessor):
# pylint: disable=pointless-string-statement
def __init__(self, settings: BotFrameworkAdapterSettings):
super().__init__(settings)

self.connected_bot: Bot = None
self.claims_identity: ClaimsIdentity = None
self.request_handlers: List[StreamingRequestHandler] = None

async def process_streaming_activity(
self,
activity: Activity,
bot_callback_handler: Callable[[TurnContext], Awaitable],
) -> InvokeResponse:
if not activity:
raise TypeError(
f"'activity: {activity.__class__.__name__}' argument can't be None"
)

"""
If a conversation has moved from one connection to another for the same Channel or Skill and
hasn't been forgotten by the previous StreamingRequestHandler. The last requestHandler
the conversation has been associated with should always be the active connection.
"""
request_handler = [
handler
for handler in self.request_handlers
if handler.service_url == activity.service_url
and handler.has_conversation(activity.conversation.id)
]
request_handler = request_handler[-1] if request_handler else None
context = TurnContext(self, activity)

if self.claims_identity:
context.turn_state[self.BOT_IDENTITY_KEY] = self.claims_identity

connector_client = self._create_streaming_connector_client(
activity, request_handler
)
context.turn_state[self.BOT_CONNECTOR_CLIENT_KEY] = connector_client

await self.run_pipeline(context, bot_callback_handler)

if activity.type == ActivityTypes.invoke:
activity_invoke_response = context.turn_state.get(self._INVOKE_RESPONSE_KEY)

if not activity_invoke_response:
return InvokeResponse(status=HTTPStatus.NOT_IMPLEMENTED)
return activity_invoke_response.value

return None

async def send_streaming_activity(self, activity: Activity) -> ResourceResponse:
raise NotImplementedError()

def can_process_outgoing_activity(self, activity: Activity) -> bool:
if not activity:
raise TypeError(
f"'activity: {activity.__class__.__name__}' argument can't be None"
)

return not activity.service_url.startswith("https")

async def process_outgoing_activity(
self, turn_context: TurnContext, activity: Activity
) -> ResourceResponse:
if not activity:
raise TypeError(
f"'activity: {activity.__class__.__name__}' argument can't be None"
)

# TODO: Check if we have token responses from OAuth cards.

# The ServiceUrl for streaming channels begins with the string "urn" and contains
# information unique to streaming connections. Now that we know that this is a streaming
# activity, process it in the streaming pipeline.
# Process streaming activity.
return await self.send_streaming_activity(activity)

def _create_streaming_connector_client(
self, activity: Activity, request_handler: StreamingRequestHandler
) -> ConnectorClient:
empty_credentials = (
MicrosoftAppCredentials.empty()
if self._channel_provider and self._channel_provider.is_government()
else MicrosoftGovernmentAppCredentials.empty()
)
streaming_driver = StreamingHttpDriver(request_handler)
config = BotFrameworkConnectorConfiguration(
empty_credentials,
activity.service_url,
pipeline_type=AsyncBfPipeline,
driver=streaming_driver,
)
streaming_driver.config = config
connector_client = ConnectorClient(None, custom_configuration=config)

return connector_client
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

from abc import ABC
from typing import Awaitable, Callable

from botbuilder.core import TurnContext, InvokeResponse
from botbuilder.schema import Activity


class StreamingActivityProcessor(ABC):
"""
Process streaming activities.
"""

async def process_streaming_activity(
self,
activity: Activity,
bot_callback_handler: Callable[[TurnContext], Awaitable],
) -> InvokeResponse:
raise NotImplementedError()
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

from http import HTTPStatus
from logging import Logger
from typing import Any

from msrest.universal_http import ClientRequest
from msrest.universal_http.async_abc import AsyncClientResponse
from msrest.universal_http.async_requests import (
AsyncRequestsHTTPSender as AsyncRequestsHTTPDriver,
)
from botframework.streaming import StreamingRequest, ReceiveResponse

from .streaming_request_handler import StreamingRequestHandler


class StreamingProtocolClientResponse(AsyncClientResponse):
def __init__(
self, request: StreamingRequest, streaming_response: ReceiveResponse
) -> None:
super(StreamingProtocolClientResponse, self).__init__(
request, streaming_response
)
# https://aiohttp.readthedocs.io/en/stable/client_reference.html#aiohttp.ClientResponse
self.status_code = streaming_response.status_code
# self.headers = streaming_response.headers
# self.reason = streaming_response.reason
self._body = None

def body(self) -> bytes:
"""Return the whole body as bytes in memory.
"""
if not self._body:
return bytes([])
return self._body

async def load_body(self) -> None:
"""Load in memory the body, so it could be accessible from sync methods."""
self._body: ReceiveResponse
self._body = self.internal_response.read_body()

def raise_for_status(self):
if 400 <= self.internal_response.status_code <= 599:
raise Exception(f"Http error: {self.internal_response.status_code}")


class StreamingHttpDriver(AsyncRequestsHTTPDriver):
def __init__(
self,
request_handler: StreamingRequestHandler,
*,
config=None,
logger: Logger = None,
):
super().__init__(config)
if not request_handler:
raise TypeError(
f"'request_handler: {request_handler.__class__.__name__}' argument can't be None"
)
self._request_handler = request_handler
self._logger = logger

async def send(
self, request: ClientRequest, **config: Any # pylint: disable=unused-argument
) -> AsyncClientResponse:
# TODO: validate form of request to perform operations
streaming_request = StreamingRequest(
path=request.url[request.url.index("v3/") :], verb=request.method
)
streaming_request.set_body(request.data)

return await self._send_request(streaming_request)

async def _send_request(
self, request: StreamingRequest
) -> StreamingProtocolClientResponse:
try:
server_response = await self._request_handler.send_streaming_request(
request
)

if not server_response:
raise Exception("Server response from streaming request is None")

if server_response.status_code == HTTPStatus.OK:
# TODO: this should be an object read from json

return StreamingProtocolClientResponse(request, server_response)
except Exception as error:
# TODO: log error
raise error

return None
Loading