diff --git a/libraries/botbuilder-core/botbuilder/core/__init__.py b/libraries/botbuilder-core/botbuilder/core/__init__.py index 15b2f5f37..d6977c927 100644 --- a/libraries/botbuilder-core/botbuilder/core/__init__.py +++ b/libraries/botbuilder-core/botbuilder/core/__init__.py @@ -20,6 +20,7 @@ from .intent_score import IntentScore from .invoke_response import InvokeResponse from .memory_storage import MemoryStorage +from .memory_transcript_store import MemoryTranscriptStore from .message_factory import MessageFactory from .middleware_set import AnonymousReceiveMiddleware, Middleware, MiddlewareSet from .null_telemetry_client import NullTelemetryClient @@ -55,6 +56,7 @@ "IntentScore", "InvokeResponse", "MemoryStorage", + "MemoryTranscriptStore", "MessageFactory", "Middleware", "MiddlewareSet", diff --git a/libraries/botbuilder-core/botbuilder/core/memory_transcript_store.py b/libraries/botbuilder-core/botbuilder/core/memory_transcript_store.py new file mode 100644 index 000000000..e8953e0ae --- /dev/null +++ b/libraries/botbuilder-core/botbuilder/core/memory_transcript_store.py @@ -0,0 +1,147 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +"""The memory transcript store stores transcripts in volatile memory.""" +import datetime +from typing import List, Dict +from botbuilder.schema import Activity +from .transcript_logger import PagedResult, TranscriptInfo, TranscriptStore + +# pylint: disable=line-too-long +class MemoryTranscriptStore(TranscriptStore): + """This provider is most useful for simulating production storage when running locally against the + emulator or as part of a unit test. + """ + + channels: Dict[str, Dict[str, Activity]] = {} + + async def log_activity(self, activity: Activity) -> None: + if not activity: + raise TypeError("activity cannot be None for log_activity()") + + # get channel + channel = {} + if not activity.channel_id in self.channels: + channel = {} + self.channels[activity.channel_id] = channel + else: + channel = self.channels[activity.channel_id] + + # Get conversation transcript. + transcript = [] + if activity.conversation.id in channel: + transcript = channel[activity.conversation.id] + else: + transcript = [] + channel[activity.conversation.id] = transcript + + transcript.append(activity) + + async def get_transcript_activities( + self, + channel_id: str, + conversation_id: str, + continuation_token: str = None, + start_date: datetime = datetime.datetime.min, + ) -> "PagedResult[Activity]": + if not channel_id: + raise TypeError("Missing channel_id") + + if not conversation_id: + raise TypeError("Missing conversation_id") + + paged_result = PagedResult() + if channel_id in self.channels: + channel = self.channels[channel_id] + if conversation_id in channel: + transcript = channel[conversation_id] + if continuation_token: + paged_result.items = ( + [ + x + for x in sorted( + transcript, key=lambda x: x.timestamp, reverse=False + ) + if x.timestamp >= start_date + ] + .dropwhile(lambda x: x.id != continuation_token) + .Skip(1)[:20] + ) + if paged_result.items.count == 20: + paged_result.continuation_token = paged_result.items[-1].id + else: + paged_result.items = [ + x + for x in sorted( + transcript, key=lambda x: x.timestamp, reverse=False + ) + if x.timestamp >= start_date + ][:20] + if paged_result.items.count == 20: + paged_result.continuation_token = paged_result.items[-1].id + + return paged_result + + async def delete_transcript(self, channel_id: str, conversation_id: str) -> None: + if not channel_id: + raise TypeError("channel_id should not be None") + + if not conversation_id: + raise TypeError("conversation_id should not be None") + + if channel_id in self.channels: + if conversation_id in self.channels[channel_id]: + del self.channels[channel_id][conversation_id] + + async def list_transcripts( + self, channel_id: str, continuation_token: str = None + ) -> "PagedResult[TranscriptInfo]": + if not channel_id: + raise TypeError("Missing channel_id") + + paged_result = PagedResult() + + if channel_id in self.channels: + channel: Dict[str, List[Activity]] = self.channels[channel_id] + + if continuation_token: + paged_result.items = ( + sorted( + [ + TranscriptInfo( + channel_id, + c.value()[0].timestamp if c.value() else None, + c.id, + ) + for c in channel + ], + key=lambda x: x.created, + reverse=True, + ) + .dropwhile(lambda x: x.id != continuation_token) + .Skip(1) + .Take(20) + ) + if paged_result.items.count == 20: + paged_result.continuation_token = paged_result.items[-1].id + else: + paged_result.items = ( + sorted( + [ + TranscriptInfo( + channel_id, + c.value()[0].timestamp if c.value() else None, + c.id, + ) + for c in channel + ], + key=lambda x: x.created, + reverse=True, + ) + .dropwhile(lambda x: x.id != continuation_token) + .Skip(1) + .Take(20) + ) + if paged_result.items.count == 20: + paged_result.continuation_token = paged_result.items[-1].id + + return paged_result diff --git a/libraries/botbuilder-core/botbuilder/core/transcript_logger.py b/libraries/botbuilder-core/botbuilder/core/transcript_logger.py new file mode 100644 index 000000000..47597250f --- /dev/null +++ b/libraries/botbuilder-core/botbuilder/core/transcript_logger.py @@ -0,0 +1,197 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +"""Logs incoming and outgoing activities to a TranscriptStore..""" + +import datetime +import copy +from queue import Queue +from abc import ABC, abstractmethod +from typing import Awaitable, Callable, List +from botbuilder.schema import Activity, ActivityTypes, ConversationReference +from .middleware_set import Middleware +from .turn_context import TurnContext + + +class TranscriptLogger(ABC): + """Transcript logger stores activities for conversations for recall.""" + + @abstractmethod + async def log_activity(self, activity: Activity) -> None: + """Log an activity to the transcript. + :param activity:Activity being logged. + """ + raise NotImplementedError + + +class TranscriptLoggerMiddleware(Middleware): + """Logs incoming and outgoing activities to a TranscriptStore.""" + + def __init__(self, logger: TranscriptLogger): + if not logger: + raise TypeError( + "TranscriptLoggerMiddleware requires a TranscriptLogger instance." + ) + self.logger = logger + + async def on_process_request( + self, context: TurnContext, logic: Callable[[TurnContext], Awaitable] + ): + """Initialization for middleware. + :param context: Context for the current turn of conversation with the user. + :param logic: Function to call at the end of the middleware chain. + """ + transcript = Queue() + activity = context.activity + # Log incoming activity at beginning of turn + if activity: + if not activity.from_property.role: + activity.from_property.role = "user" + self.log_activity(transcript, copy.copy(activity)) + + # hook up onSend pipeline + # pylint: disable=unused-argument + async def send_activities_handler( + ctx: TurnContext, + activities: List[Activity], + next_send: Callable[[], Awaitable[None]], + ): + # Run full pipeline + responses = await next_send() + for activity in activities: + await self.log_activity(transcript, copy.copy(activity)) + return responses + + context.on_send_activities(send_activities_handler) + + # hook up update activity pipeline + async def update_activity_handler( + ctx: TurnContext, activity: Activity, next_update: Callable[[], Awaitable] + ): + # Run full pipeline + response = await next_update() + update_activity = copy.copy(activity) + update_activity.type = ActivityTypes.message_update + await self.log_activity(transcript, update_activity) + return response + + context.on_update_activity(update_activity_handler) + + # hook up delete activity pipeline + async def delete_activity_handler( + ctx: TurnContext, + reference: ConversationReference, + next_delete: Callable[[], Awaitable], + ): + # Run full pipeline + await next_delete() + + delete_msg = Activity( + type=ActivityTypes.message_delete, id=reference.activity_id + ) + deleted_activity: Activity = TurnContext.apply_conversation_reference( + delete_msg, reference, False + ) + await self.log_activity(transcript, deleted_activity) + + context.on_delete_activity(delete_activity_handler) + + if logic: + await logic() + + # Flush transcript at end of turn + while not transcript.empty(): + activity = transcript.get() + if activity is None: + break + await self.logger.log_activity(activity) + transcript.task_done() + + def log_activity(self, transcript: Queue, activity: Activity) -> None: + """Logs the activity. + :param transcript: transcript. + :param activity: Activity to log. + """ + transcript.put(activity) + + +class TranscriptStore(TranscriptLogger): + """ Transcript storage for conversations.""" + + @abstractmethod + async def get_transcript_activities( + self, + channel_id: str, + conversation_id: str, + continuation_token: str, + start_date: datetime, + ) -> "PagedResult": + """Get activities for a conversation (Aka the transcript). + :param channel_id: Channel Id where conversation took place. + :param conversation_id: Conversation ID + :param continuation_token: Continuation token to page through results. + :param start_date: Earliest time to include + :result: Page of results of Activity objects + """ + raise NotImplementedError + + @abstractmethod + async def list_transcripts( + self, channel_id: str, continuation_token: str + ) -> "PagedResult": + """List conversations in the channelId. + :param channel_id: Channel Id where conversation took place. + :param continuation_token : Continuation token to page through results. + :result: Page of results of TranscriptInfo objects + """ + raise NotImplementedError + + @abstractmethod + async def delete_transcript(self, channel_id: str, conversation_id: str) -> None: + """Delete a specific conversation and all of it's activities. + :param channel_id: Channel Id where conversation took place. + :param conversation_id: Id of the conversation to delete. + :result: None + """ + raise NotImplementedError + + +class ConsoleTranscriptLogger(TranscriptLogger): + """ConsoleTranscriptLogger writes activities to Console output.""" + + async def log_activity(self, activity: Activity) -> None: + """Log an activity to the transcript. + :param activity:Activity being logged. + """ + if activity: + print(f"Activity Log: {activity}") + else: + raise TypeError("Activity is required") + + +class TranscriptInfo: + """Metadata for a stored transcript.""" + + # pylint: disable=invalid-name + def __init__( + self, + channel_id: str = None, + created: datetime = None, + conversation_id: str = None, + ): + """ + :param channel_id: Channel ID the transcript was taken from + :param created: Timestamp when event created + :param id: Conversation ID + """ + self.channel_id = channel_id + self.created = created + self.id = conversation_id + + +class PagedResult: + """Paged results for transcript data.""" + + # Page of Items + items: List[object] = None + # Token used to page through multiple pages. + continuation_token: str = None diff --git a/libraries/botbuilder-core/tests/test_memory_transcript_store.py b/libraries/botbuilder-core/tests/test_memory_transcript_store.py new file mode 100644 index 000000000..12cb0e8a7 --- /dev/null +++ b/libraries/botbuilder-core/tests/test_memory_transcript_store.py @@ -0,0 +1,124 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. +# pylint: disable=missing-docstring, unused-import +import sys +import copy +import uuid +import datetime +from typing import Awaitable, Callable, Dict, List +from unittest.mock import patch, Mock +import aiounittest + +from botbuilder.core import ( + AnonymousReceiveMiddleware, + BotTelemetryClient, + MemoryTranscriptStore, + MiddlewareSet, + Middleware, + TurnContext, +) +from botbuilder.core.adapters import TestAdapter, TestFlow +from botbuilder.schema import ( + Activity, + ActivityTypes, + ChannelAccount, + ConversationAccount, + ConversationReference, +) + +# pylint: disable=line-too-long,missing-docstring +class TestMemoryTranscriptStore(aiounittest.AsyncTestCase): + # pylint: disable=unused-argument + async def test_null_transcript_store(self): + memory_transcript = MemoryTranscriptStore() + with self.assertRaises(TypeError): + await memory_transcript.log_activity(None) + + async def test_log_activity(self): + memory_transcript = MemoryTranscriptStore() + conversation_id = "_log_activity" + date = datetime.datetime.now() + activity = self.create_activities(conversation_id, date, 1)[-1] + await memory_transcript.log_activity(activity) + + async def test_get_activity_none(self): + memory_transcript = MemoryTranscriptStore() + conversation_id = "_log_activity" + await memory_transcript.get_transcript_activities("test", conversation_id) + + async def test_get_single_activity(self): + memory_transcript = MemoryTranscriptStore() + conversation_id = "_log_activity" + date = datetime.datetime.now() + activity = self.create_activities(conversation_id, date, count=1)[-1] + await memory_transcript.log_activity(activity) + result = await memory_transcript.get_transcript_activities( + "test", conversation_id + ) + self.assertNotEqual(result.items, None) + self.assertEqual(result.items[0].text, "0") + + async def test_get_multiple_activity(self): + memory_transcript = MemoryTranscriptStore() + conversation_id = "_log_activity" + date = datetime.datetime.now() + activities = self.create_activities(conversation_id, date, count=10) + for activity in activities: + await memory_transcript.log_activity(activity) + result = await memory_transcript.get_transcript_activities( + "test", conversation_id + ) + self.assertNotEqual(result.items, None) + self.assertEqual(len(result.items), 20) # 2 events logged each iteration + + async def test_delete_transcript(self): + memory_transcript = MemoryTranscriptStore() + conversation_id = "_log_activity" + date = datetime.datetime.now() + activity = self.create_activities(conversation_id, date, count=1)[-1] + await memory_transcript.log_activity(activity) + result = await memory_transcript.get_transcript_activities( + "test", conversation_id + ) + self.assertNotEqual(result.items, None) + await memory_transcript.delete_transcript("test", conversation_id) + result = await memory_transcript.get_transcript_activities( + "test", conversation_id + ) + self.assertEqual(result.items, None) + + def create_activities(self, conversation_id: str, date: datetime, count: int = 5): + activities: List[Activity] = [] + time_stamp = date + for i in range(count): + activities.append( + Activity( + type=ActivityTypes.message, + timestamp=time_stamp, + id=str(uuid.uuid4()), + text=str(i), + channel_id="test", + from_property=ChannelAccount(id=f"User{i}"), + conversation=ConversationAccount(id=conversation_id), + recipient=ChannelAccount(id="bot1", name="2"), + service_url="http://foo.com/api/messages", + ) + ) + time_stamp = time_stamp + datetime.timedelta(0, 60) + activities.append( + Activity( + type=ActivityTypes.message, + timestamp=date, + id=str(uuid.uuid4()), + text=str(i), + channel_id="test", + from_property=ChannelAccount(id="Bot1", name="2"), + conversation=ConversationAccount(id=conversation_id), + recipient=ChannelAccount(id=f"User{i}"), + service_url="http://foo.com/api/messages", + ) + ) + time_stamp = time_stamp + datetime.timedelta( + 0, 60 + ) # days, seconds, then other fields. + return activities diff --git a/libraries/botbuilder-core/tests/test_private_conversation_state.py b/libraries/botbuilder-core/tests/test_private_conversation_state.py index fe477a72b..802a2678b 100644 --- a/libraries/botbuilder-core/tests/test_private_conversation_state.py +++ b/libraries/botbuilder-core/tests/test_private_conversation_state.py @@ -1,3 +1,6 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + import aiounittest from botbuilder.core import MemoryStorage, TurnContext, PrivateConversationState diff --git a/libraries/botbuilder-core/tests/test_show_typing_middleware.py b/libraries/botbuilder-core/tests/test_show_typing_middleware.py index 00466e17d..0d1af513c 100644 --- a/libraries/botbuilder-core/tests/test_show_typing_middleware.py +++ b/libraries/botbuilder-core/tests/test_show_typing_middleware.py @@ -1,3 +1,6 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + import time import aiounittest