diff --git a/src/bedrock_agentcore/memory/integrations/strands/session_manager.py b/src/bedrock_agentcore/memory/integrations/strands/session_manager.py index 3cbe3aa..0a7e9bc 100644 --- a/src/bedrock_agentcore/memory/integrations/strands/session_manager.py +++ b/src/bedrock_agentcore/memory/integrations/strands/session_manager.py @@ -2,7 +2,8 @@ import json import logging -from datetime import datetime, timezone +import threading +from datetime import datetime, timedelta, timezone from typing import TYPE_CHECKING, Any, Optional import boto3 @@ -46,6 +47,38 @@ class AgentCoreMemorySessionManager(RepositorySessionManager, SessionRepository) - Consistent with existing Strands Session managers (such as: FileSessionManager, S3SessionManager) """ + # Class-level timestamp tracking for monotonic ordering + _timestamp_lock = threading.Lock() + _last_timestamp: Optional[datetime] = None + + @classmethod + def _get_monotonic_timestamp(cls, desired_timestamp: Optional[datetime] = None) -> datetime: + """Get a monotonically increasing timestamp. + + Args: + desired_timestamp (Optional[datetime]): The desired timestamp. If None, uses current time. + + Returns: + datetime: A timestamp guaranteed to be greater than any previously returned timestamp. + """ + if desired_timestamp is None: + desired_timestamp = datetime.now(timezone.utc) + + with cls._timestamp_lock: + if cls._last_timestamp is None: + cls._last_timestamp = desired_timestamp + return desired_timestamp + + # Why the 1 second check? Because Boto3 does NOT support sub 1 second resolution. + if desired_timestamp <= cls._last_timestamp + timedelta(seconds=1): + # Increment by 1 second to ensure ordering + new_timestamp = cls._last_timestamp + timedelta(seconds=1) + else: + new_timestamp = desired_timestamp + + cls._last_timestamp = new_timestamp + return new_timestamp + def __init__( self, agentcore_memory_config: AgentCoreMemoryConfig, @@ -149,7 +182,7 @@ def create_session(self, session: Session, **kwargs: Any) -> Session: payload=[ {"blob": json.dumps(session.to_dict())}, ], - eventTimestamp=datetime.now(timezone.utc), + eventTimestamp=self._get_monotonic_timestamp(), ) logger.info("Created session: %s with event: %s", session.session_id, event.get("event", {}).get("eventId")) return session @@ -220,7 +253,7 @@ def create_agent(self, session_id: str, session_agent: SessionAgent, **kwargs: A payload=[ {"blob": json.dumps(session_agent.to_dict())}, ], - eventTimestamp=datetime.now(timezone.utc), + eventTimestamp=self._get_monotonic_timestamp(), ) logger.info( "Created agent: %s in session: %s with event %s", @@ -319,13 +352,18 @@ def create_message( messages = AgentCoreMemoryConverter.message_to_payload(session_message) if not messages: return + + # Parse the original timestamp and use it as desired timestamp + original_timestamp = datetime.fromisoformat(session_message.created_at.replace("Z", "+00:00")) + monotonic_timestamp = self._get_monotonic_timestamp(original_timestamp) + if not AgentCoreMemoryConverter.exceeds_conversational_limit(messages[0]): event = self.memory_client.create_event( memory_id=self.config.memory_id, actor_id=self.config.actor_id, session_id=session_id, messages=messages, - event_timestamp=datetime.fromisoformat(session_message.created_at.replace("Z", "+00:00")), + event_timestamp=monotonic_timestamp, ) else: event = self.memory_client.gmdp_client.create_event( @@ -335,7 +373,7 @@ def create_message( payload=[ {"blob": json.dumps(messages[0])}, ], - eventTimestamp=datetime.fromisoformat(session_message.created_at.replace("Z", "+00:00")), + eventTimestamp=monotonic_timestamp, ) logger.debug("Created event: %s for message: %s", event.get("eventId"), session_message.message_id) return event