Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions libraries/botbuilder-core/botbuilder/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from .intent_score import IntentScore
from .invoke_response import InvokeResponse
from .memory_storage import MemoryStorage
from .memory_transcript_store import MemoryTranscriptStore
from .message_factory import MessageFactory
from .middleware_set import AnonymousReceiveMiddleware, Middleware, MiddlewareSet
from .null_telemetry_client import NullTelemetryClient
Expand Down Expand Up @@ -55,6 +56,7 @@
"IntentScore",
"InvokeResponse",
"MemoryStorage",
"MemoryTranscriptStore",
"MessageFactory",
"Middleware",
"MiddlewareSet",
Expand Down
147 changes: 147 additions & 0 deletions libraries/botbuilder-core/botbuilder/core/memory_transcript_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
"""The memory transcript store stores transcripts in volatile memory."""
import datetime
from typing import List, Dict
from botbuilder.schema import Activity
from .transcript_logger import PagedResult, TranscriptInfo, TranscriptStore

# pylint: disable=line-too-long
class MemoryTranscriptStore(TranscriptStore):
"""This provider is most useful for simulating production storage when running locally against the
emulator or as part of a unit test.
"""

channels: Dict[str, Dict[str, Activity]] = {}

async def log_activity(self, activity: Activity) -> None:
if not activity:
raise TypeError("activity cannot be None for log_activity()")

# get channel
channel = {}
if not activity.channel_id in self.channels:
channel = {}
self.channels[activity.channel_id] = channel
else:
channel = self.channels[activity.channel_id]

# Get conversation transcript.
transcript = []
if activity.conversation.id in channel:
transcript = channel[activity.conversation.id]
else:
transcript = []
channel[activity.conversation.id] = transcript

transcript.append(activity)

async def get_transcript_activities(
self,
channel_id: str,
conversation_id: str,
continuation_token: str = None,
start_date: datetime = datetime.datetime.min,
) -> "PagedResult[Activity]":
if not channel_id:
raise TypeError("Missing channel_id")

if not conversation_id:
raise TypeError("Missing conversation_id")

paged_result = PagedResult()
if channel_id in self.channels:
channel = self.channels[channel_id]
if conversation_id in channel:
transcript = channel[conversation_id]
if continuation_token:
paged_result.items = (
[
x
for x in sorted(
transcript, key=lambda x: x.timestamp, reverse=False
)
if x.timestamp >= start_date
]
.dropwhile(lambda x: x.id != continuation_token)
.Skip(1)[:20]
)
if paged_result.items.count == 20:
paged_result.continuation_token = paged_result.items[-1].id
else:
paged_result.items = [
x
for x in sorted(
transcript, key=lambda x: x.timestamp, reverse=False
)
if x.timestamp >= start_date
][:20]
if paged_result.items.count == 20:
paged_result.continuation_token = paged_result.items[-1].id

return paged_result

async def delete_transcript(self, channel_id: str, conversation_id: str) -> None:
if not channel_id:
raise TypeError("channel_id should not be None")

if not conversation_id:
raise TypeError("conversation_id should not be None")

if channel_id in self.channels:
if conversation_id in self.channels[channel_id]:
del self.channels[channel_id][conversation_id]

async def list_transcripts(
self, channel_id: str, continuation_token: str = None
) -> "PagedResult[TranscriptInfo]":
if not channel_id:
raise TypeError("Missing channel_id")

paged_result = PagedResult()

if channel_id in self.channels:
channel: Dict[str, List[Activity]] = self.channels[channel_id]

if continuation_token:
paged_result.items = (
sorted(
[
TranscriptInfo(
channel_id,
c.value()[0].timestamp if c.value() else None,
c.id,
)
for c in channel
],
key=lambda x: x.created,
reverse=True,
)
.dropwhile(lambda x: x.id != continuation_token)
.Skip(1)
.Take(20)
)
if paged_result.items.count == 20:
paged_result.continuation_token = paged_result.items[-1].id
else:
paged_result.items = (
sorted(
[
TranscriptInfo(
channel_id,
c.value()[0].timestamp if c.value() else None,
c.id,
)
for c in channel
],
key=lambda x: x.created,
reverse=True,
)
.dropwhile(lambda x: x.id != continuation_token)
.Skip(1)
.Take(20)
)
if paged_result.items.count == 20:
paged_result.continuation_token = paged_result.items[-1].id

return paged_result
197 changes: 197 additions & 0 deletions libraries/botbuilder-core/botbuilder/core/transcript_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
"""Logs incoming and outgoing activities to a TranscriptStore.."""

import datetime
import copy
from queue import Queue
from abc import ABC, abstractmethod
from typing import Awaitable, Callable, List
from botbuilder.schema import Activity, ActivityTypes, ConversationReference
from .middleware_set import Middleware
from .turn_context import TurnContext


class TranscriptLogger(ABC):
"""Transcript logger stores activities for conversations for recall."""

@abstractmethod
async def log_activity(self, activity: Activity) -> None:
"""Log an activity to the transcript.
:param activity:Activity being logged.
"""
raise NotImplementedError


class TranscriptLoggerMiddleware(Middleware):
"""Logs incoming and outgoing activities to a TranscriptStore."""

def __init__(self, logger: TranscriptLogger):
if not logger:
raise TypeError(
"TranscriptLoggerMiddleware requires a TranscriptLogger instance."
)
self.logger = logger

async def on_process_request(
self, context: TurnContext, logic: Callable[[TurnContext], Awaitable]
):
"""Initialization for middleware.
:param context: Context for the current turn of conversation with the user.
:param logic: Function to call at the end of the middleware chain.
"""
transcript = Queue()
activity = context.activity
# Log incoming activity at beginning of turn
if activity:
if not activity.from_property.role:
activity.from_property.role = "user"
self.log_activity(transcript, copy.copy(activity))

# hook up onSend pipeline
# pylint: disable=unused-argument
async def send_activities_handler(
ctx: TurnContext,
activities: List[Activity],
next_send: Callable[[], Awaitable[None]],
):
# Run full pipeline
responses = await next_send()
for activity in activities:
await self.log_activity(transcript, copy.copy(activity))
return responses

context.on_send_activities(send_activities_handler)

# hook up update activity pipeline
async def update_activity_handler(
ctx: TurnContext, activity: Activity, next_update: Callable[[], Awaitable]
):
# Run full pipeline
response = await next_update()
update_activity = copy.copy(activity)
update_activity.type = ActivityTypes.message_update
await self.log_activity(transcript, update_activity)
return response

context.on_update_activity(update_activity_handler)

# hook up delete activity pipeline
async def delete_activity_handler(
ctx: TurnContext,
reference: ConversationReference,
next_delete: Callable[[], Awaitable],
):
# Run full pipeline
await next_delete()

delete_msg = Activity(
type=ActivityTypes.message_delete, id=reference.activity_id
)
deleted_activity: Activity = TurnContext.apply_conversation_reference(
delete_msg, reference, False
)
await self.log_activity(transcript, deleted_activity)

context.on_delete_activity(delete_activity_handler)

if logic:
await logic()

# Flush transcript at end of turn
while not transcript.empty():
activity = transcript.get()
if activity is None:
break
await self.logger.log_activity(activity)
transcript.task_done()

def log_activity(self, transcript: Queue, activity: Activity) -> None:
"""Logs the activity.
:param transcript: transcript.
:param activity: Activity to log.
"""
transcript.put(activity)


class TranscriptStore(TranscriptLogger):
""" Transcript storage for conversations."""

@abstractmethod
async def get_transcript_activities(
self,
channel_id: str,
conversation_id: str,
continuation_token: str,
start_date: datetime,
) -> "PagedResult":
"""Get activities for a conversation (Aka the transcript).
:param channel_id: Channel Id where conversation took place.
:param conversation_id: Conversation ID
:param continuation_token: Continuation token to page through results.
:param start_date: Earliest time to include
:result: Page of results of Activity objects
"""
raise NotImplementedError

@abstractmethod
async def list_transcripts(
self, channel_id: str, continuation_token: str
) -> "PagedResult":
"""List conversations in the channelId.
:param channel_id: Channel Id where conversation took place.
:param continuation_token : Continuation token to page through results.
:result: Page of results of TranscriptInfo objects
"""
raise NotImplementedError

@abstractmethod
async def delete_transcript(self, channel_id: str, conversation_id: str) -> None:
"""Delete a specific conversation and all of it's activities.
:param channel_id: Channel Id where conversation took place.
:param conversation_id: Id of the conversation to delete.
:result: None
"""
raise NotImplementedError


class ConsoleTranscriptLogger(TranscriptLogger):
"""ConsoleTranscriptLogger writes activities to Console output."""

async def log_activity(self, activity: Activity) -> None:
"""Log an activity to the transcript.
:param activity:Activity being logged.
"""
if activity:
print(f"Activity Log: {activity}")
else:
raise TypeError("Activity is required")


class TranscriptInfo:
"""Metadata for a stored transcript."""

# pylint: disable=invalid-name
def __init__(
self,
channel_id: str = None,
created: datetime = None,
conversation_id: str = None,
):
"""
:param channel_id: Channel ID the transcript was taken from
:param created: Timestamp when event created
:param id: Conversation ID
"""
self.channel_id = channel_id
self.created = created
self.id = conversation_id


class PagedResult:
"""Paged results for transcript data."""

# Page of Items
items: List[object] = None
# Token used to page through multiple pages.
continuation_token: str = None
Loading