From 8858f9da78f6d70c4f475df86ffc1ceec182d10a Mon Sep 17 00:00:00 2001 From: Yuki Matsuda <13781813+mazyu36@users.noreply.github.com> Date: Wed, 28 May 2025 22:09:03 +0900 Subject: [PATCH 01/16] support redactedContent --- src/strands/event_loop/streaming.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/strands/event_loop/streaming.py b/src/strands/event_loop/streaming.py index 6e8a806fd..f0b6ef743 100644 --- a/src/strands/event_loop/streaming.py +++ b/src/strands/event_loop/streaming.py @@ -152,6 +152,18 @@ def handle_content_block_delta( reasoning=True, **kwargs, ) + + elif "redactedContent" in delta_content["reasoningContent"]: + if "redactedContent" not in state: + state["redactedContent"] = b"" + + state["redactedContent"] += delta_content["reasoningContent"]["redactedContent"] + callback_handler( + redactedContent=delta_content["reasoningContent"]["redactedContent"], + delta=delta_content, + reasoning=True, + **kwargs, + ) return state @@ -207,6 +219,16 @@ def handle_content_block_stop(state: Dict[str, Any]) -> Dict[str, Any]: } ) state["reasoningText"] = "" + state["signature"] = "" + elif "redactedContent" in state and state["redactedContent"]: + content.append( + { + "reasoningContent": { + "redactedContent": state["redactedContent"] + } + } + ) + state["redactedContent"] = b"" return state @@ -279,6 +301,7 @@ def process_stream( "current_tool_use": {}, "reasoningText": "", "signature": "", + "redactedContent": b"", } state["content"] = state["message"]["content"] From 4a19f45e50c17cbb2f0bb974fc4018e85e002b42 Mon Sep 17 00:00:00 2001 From: Yuki Matsuda <13781813+mazyu36@users.noreply.github.com> Date: Thu, 29 May 2025 00:15:06 +0900 Subject: [PATCH 02/16] test --- src/strands/event_loop/streaming.py | 11 +--- tests/strands/event_loop/test_streaming.py | 61 ++++++++++++++++++++++ 2 files changed, 63 insertions(+), 9 deletions(-) diff --git a/src/strands/event_loop/streaming.py b/src/strands/event_loop/streaming.py index f0b6ef743..df0a9308f 100644 --- a/src/strands/event_loop/streaming.py +++ b/src/strands/event_loop/streaming.py @@ -152,7 +152,7 @@ def handle_content_block_delta( reasoning=True, **kwargs, ) - + elif "redactedContent" in delta_content["reasoningContent"]: if "redactedContent" not in state: state["redactedContent"] = b"" @@ -219,15 +219,8 @@ def handle_content_block_stop(state: Dict[str, Any]) -> Dict[str, Any]: } ) state["reasoningText"] = "" - state["signature"] = "" elif "redactedContent" in state and state["redactedContent"]: - content.append( - { - "reasoningContent": { - "redactedContent": state["redactedContent"] - } - } - ) + content.append({"reasoningContent": {"redactedContent": state["redactedContent"]}}) state["redactedContent"] = b"" return state diff --git a/tests/strands/event_loop/test_streaming.py b/tests/strands/event_loop/test_streaming.py index c24e7e48a..c5f6ec2b7 100644 --- a/tests/strands/event_loop/test_streaming.py +++ b/tests/strands/event_loop/test_streaming.py @@ -132,6 +132,20 @@ def test_handle_content_block_start(chunk: ContentBlockStartEvent, exp_tool_use) {"signature": "val"}, {"reasoning_signature": "val", "reasoning": True}, ), + # Reasoning - redactedContent - New + ( + {"delta": {"reasoningContent": {"redactedContent": b"encrypted"}}}, + {}, + {"redactedContent": b"encrypted"}, + {"redactedContent": b"encrypted", "reasoning": True}, + ), + # Reasoning - redactedContent - Existing + ( + {"delta": {"reasoningContent": {"redactedContent": b"data"}}}, + {"redactedContent": b"encrypted_"}, + {"redactedContent": b"encrypted_data"}, + {"redactedContent": b"data", "reasoning": True}, + ), # Reasoning - Empty ( {"delta": {"reasoningContent": {}}}, @@ -230,6 +244,23 @@ def callback_handler(**kwargs): "signature": "123", }, ), + # redactedContent + ( + { + "content": [], + "current_tool_use": {}, + "text": "", + "reasoningText": "", + "redactedContent": b"encrypted_data", + }, + { + "content": [{"reasoningContent": {"redactedContent": b"encrypted_data"}}], + "current_tool_use": {}, + "text": "", + "reasoningText": "", + "redactedContent": b"", + }, + ), # Empty ( { @@ -355,6 +386,36 @@ def test_extract_usage_metrics(): {"calls": 1}, [{"role": "user", "content": [{"text": "REDACTED"}]}], ), + ( + [ + {"messageStart": {"role": "assistant"}}, + { + "contentBlockStart": {"start": {}}, + }, + { + "contentBlockDelta": {"delta": {"reasoningContent": {"redactedContent": b"encrypted_data"}}}, + }, + {"contentBlockStop": {}}, + { + "messageStop": {"stopReason": "end_turn"}, + }, + { + "metadata": { + "usage": {"inputTokens": 1, "outputTokens": 1, "totalTokens": 1}, + "metrics": {"latencyMs": 1}, + } + }, + ], + "end_turn", + { + "role": "assistant", + "content": [{"reasoningContent": {"redactedContent": b"encrypted_data"}}], + }, + {"inputTokens": 1, "outputTokens": 1, "totalTokens": 1}, + {"latencyMs": 1}, + {"calls": 1}, + [{"role": "user", "content": [{"text": "Some input!"}]}], + ), ], ) def test_process_stream( From 530e2ce5b498e52845a776a9c5c98bb66347f5b5 Mon Sep 17 00:00:00 2001 From: Yuki Matsuda <13781813+mazyu36@users.noreply.github.com> Date: Wed, 4 Jun 2025 12:58:07 +0900 Subject: [PATCH 03/16] fix --- src/strands/event_loop/streaming.py | 5 ++-- tests/strands/event_loop/test_streaming.py | 28 +++++++++++++++------- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/src/strands/event_loop/streaming.py b/src/strands/event_loop/streaming.py index df0a9308f..9ee6a0a61 100644 --- a/src/strands/event_loop/streaming.py +++ b/src/strands/event_loop/streaming.py @@ -182,6 +182,7 @@ def handle_content_block_stop(state: Dict[str, Any]) -> Dict[str, Any]: current_tool_use = state["current_tool_use"] text = state["text"] reasoning_text = state["reasoningText"] + redacted_content = state["redactedContent"] if current_tool_use: if "input" not in current_tool_use: @@ -219,8 +220,8 @@ def handle_content_block_stop(state: Dict[str, Any]) -> Dict[str, Any]: } ) state["reasoningText"] = "" - elif "redactedContent" in state and state["redactedContent"]: - content.append({"reasoningContent": {"redactedContent": state["redactedContent"]}}) + elif redacted_content: + content.append({"reasoningContent": {"redactedContent": redacted_content}}) state["redactedContent"] = b"" return state diff --git a/tests/strands/event_loop/test_streaming.py b/tests/strands/event_loop/test_streaming.py index c5f6ec2b7..00a8765ae 100644 --- a/tests/strands/event_loop/test_streaming.py +++ b/tests/strands/event_loop/test_streaming.py @@ -134,16 +134,16 @@ def test_handle_content_block_start(chunk: ContentBlockStartEvent, exp_tool_use) ), # Reasoning - redactedContent - New ( - {"delta": {"reasoningContent": {"redactedContent": b"encrypted"}}}, + {"delta": {"reasoningContent": {"redactedContent": b"encoded"}}}, {}, - {"redactedContent": b"encrypted"}, - {"redactedContent": b"encrypted", "reasoning": True}, + {"redactedContent": b"encoded"}, + {"redactedContent": b"encoded", "reasoning": True}, ), # Reasoning - redactedContent - Existing ( {"delta": {"reasoningContent": {"redactedContent": b"data"}}}, - {"redactedContent": b"encrypted_"}, - {"redactedContent": b"encrypted_data"}, + {"redactedContent": b"encoded_"}, + {"redactedContent": b"encoded_data"}, {"redactedContent": b"data", "reasoning": True}, ), # Reasoning - Empty @@ -189,12 +189,14 @@ def callback_handler(**kwargs): "current_tool_use": {"toolUseId": "123", "name": "test", "input": '{"key": "value"}'}, "text": "", "reasoningText": "", + "redactedContent": b"", }, { "content": [{"toolUse": {"toolUseId": "123", "name": "test", "input": {"key": "value"}}}], "current_tool_use": {}, "text": "", "reasoningText": "", + "redactedContent": b"", }, ), # Tool Use - Missing input @@ -204,12 +206,14 @@ def callback_handler(**kwargs): "current_tool_use": {"toolUseId": "123", "name": "test"}, "text": "", "reasoningText": "", + "redactedContent": b"", }, { "content": [{"toolUse": {"toolUseId": "123", "name": "test", "input": {}}}], "current_tool_use": {}, "text": "", "reasoningText": "", + "redactedContent": b"", }, ), # Text @@ -219,12 +223,14 @@ def callback_handler(**kwargs): "current_tool_use": {}, "text": "test", "reasoningText": "", + "redactedContent": b"", }, { "content": [{"text": "test"}], "current_tool_use": {}, "text": "", "reasoningText": "", + "redactedContent": b"", }, ), # Reasoning @@ -235,6 +241,7 @@ def callback_handler(**kwargs): "text": "", "reasoningText": "test", "signature": "123", + "redactedContent": b"", }, { "content": [{"reasoningContent": {"reasoningText": {"text": "test", "signature": "123"}}}], @@ -242,6 +249,7 @@ def callback_handler(**kwargs): "text": "", "reasoningText": "", "signature": "123", + "redactedContent": b"", }, ), # redactedContent @@ -251,10 +259,10 @@ def callback_handler(**kwargs): "current_tool_use": {}, "text": "", "reasoningText": "", - "redactedContent": b"encrypted_data", + "redactedContent": b"encoded_data", }, { - "content": [{"reasoningContent": {"redactedContent": b"encrypted_data"}}], + "content": [{"reasoningContent": {"redactedContent": b"encoded_data"}}], "current_tool_use": {}, "text": "", "reasoningText": "", @@ -268,12 +276,14 @@ def callback_handler(**kwargs): "current_tool_use": {}, "text": "", "reasoningText": "", + "redactedContent": b"", }, { "content": [], "current_tool_use": {}, "text": "", "reasoningText": "", + "redactedContent": b"", }, ), ], @@ -393,7 +403,7 @@ def test_extract_usage_metrics(): "contentBlockStart": {"start": {}}, }, { - "contentBlockDelta": {"delta": {"reasoningContent": {"redactedContent": b"encrypted_data"}}}, + "contentBlockDelta": {"delta": {"reasoningContent": {"redactedContent": b"encoded_data"}}}, }, {"contentBlockStop": {}}, { @@ -409,7 +419,7 @@ def test_extract_usage_metrics(): "end_turn", { "role": "assistant", - "content": [{"reasoningContent": {"redactedContent": b"encrypted_data"}}], + "content": [{"reasoningContent": {"redactedContent": b"encoded_data"}}], }, {"inputTokens": 1, "outputTokens": 1, "totalTokens": 1}, {"latencyMs": 1}, From 3310114c2d65a8084e855ac58f4f8a716b86331e Mon Sep 17 00:00:00 2001 From: Yuki Matsuda <13781813+mazyu36@users.noreply.github.com> Date: Wed, 4 Jun 2025 16:03:52 +0900 Subject: [PATCH 04/16] add callback test --- tests/strands/event_loop/test_streaming.py | 31 ++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/tests/strands/event_loop/test_streaming.py b/tests/strands/event_loop/test_streaming.py index 00a8765ae..1d739bf97 100644 --- a/tests/strands/event_loop/test_streaming.py +++ b/tests/strands/event_loop/test_streaming.py @@ -493,3 +493,34 @@ def callback_handler(**kwargs): None, "test prompt", ) + + +def test_process_stream_redacted_content_callback(): + callback_args = [] + + def callback_handler(**kwargs): + callback_args.append(kwargs) + + response = [ + {"messageStart": {"role": "assistant"}}, + {"contentBlockStart": {"start": {}}}, + {"contentBlockDelta": {"delta": {"reasoningContent": {"redactedContent": b"encoded_data_1"}}}}, + {"contentBlockDelta": {"delta": {"reasoningContent": {"redactedContent": b"encoded_data_2"}}}}, + {"contentBlockStop": {}}, + {"messageStop": {"stopReason": "end_turn"}}, + ] + + messages = [{"role": "user", "content": [{"text": "Some input!"}]}] + + strands.event_loop.streaming.process_stream(response, callback_handler, messages) + + redacted_callbacks = [args for args in callback_args if "redactedContent" in args] + assert len(redacted_callbacks) == 2 + + assert redacted_callbacks[0]["redactedContent"] == b"encoded_data_1" + assert redacted_callbacks[0]["delta"] == {"reasoningContent": {"redactedContent": b"encoded_data_1"}} + assert redacted_callbacks[0]["reasoning"] is True + + assert redacted_callbacks[1]["redactedContent"] == b"encoded_data_2" + assert redacted_callbacks[1]["delta"] == {"reasoningContent": {"redactedContent": b"encoded_data_2"}} + assert redacted_callbacks[1]["reasoning"] is True From 4eec5f1e99e08eabb2cd47540d615af1056d885a Mon Sep 17 00:00:00 2001 From: Arron Bailiss Date: Mon, 16 Jun 2025 08:07:03 -0400 Subject: [PATCH 05/16] format --- src/strands/agent/agent.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/strands/agent/agent.py b/src/strands/agent/agent.py index 5854fba68..4b994ed5a 100644 --- a/src/strands/agent/agent.py +++ b/src/strands/agent/agent.py @@ -108,9 +108,7 @@ def find_normalized_tool_name() -> Optional[str]: # all tools that can be represented with the normalized name if "_" in name: filtered_tools = [ - tool_name - for (tool_name, tool) in tool_registry.items() - if tool_name.replace("-", "_") == name + tool_name for (tool_name, tool) in tool_registry.items() if tool_name.replace("-", "_") == name ] if len(filtered_tools) > 1: From 1d462e7a0ed903e99a2e239ec7b9564430c27920 Mon Sep 17 00:00:00 2001 From: Arron Bailiss Date: Mon, 16 Jun 2025 08:07:14 -0400 Subject: [PATCH 06/16] remove unused ignore type --- src/strands/telemetry/tracer.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/strands/telemetry/tracer.py b/src/strands/telemetry/tracer.py index 9f731996e..34eb7bed8 100644 --- a/src/strands/telemetry/tracer.py +++ b/src/strands/telemetry/tracer.py @@ -13,9 +13,7 @@ from opentelemetry import trace from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter - -# See https://github.com/open-telemetry/opentelemetry-python/issues/4615 for the type ignore -from opentelemetry.sdk.resources import Resource # type: ignore[attr-defined] +from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter, SimpleSpanProcessor from opentelemetry.trace import StatusCode From 5a43f9eb6c4f64c8b7973451247c6c5139c31a9b Mon Sep 17 00:00:00 2001 From: Aaron Farntrog Date: Thu, 11 Sep 2025 14:56:41 -0400 Subject: [PATCH 07/16] feat: add RedactedContentStreamEvent for proper redacted content handling - Add RedactedContentStreamEvent class to types/_events.py for typed streaming - Refactor redacted content handling in streaming.py to use walrus operator - Fix state management for redactedContent with proper default handling - Update tests to handle new event structure and skip problematic tests - Add integration test for redacted content with thinking mode This improves type safety and consistency in the streaming event system when handling redacted reasoning content from Claude models. --- src/strands/event_loop/streaming.py | 17 ++- src/strands/types/_events.py | 8 ++ tests/strands/event_loop/test_streaming.py | 135 +++++++++------------ tests_integ/models/test_model_bedrock.py | 23 ++++ 4 files changed, 94 insertions(+), 89 deletions(-) diff --git a/src/strands/event_loop/streaming.py b/src/strands/event_loop/streaming.py index 996444b82..15687f9a8 100644 --- a/src/strands/event_loop/streaming.py +++ b/src/strands/event_loop/streaming.py @@ -12,6 +12,7 @@ ModelStreamEvent, ReasoningSignatureStreamEvent, ReasoningTextStreamEvent, + RedactedContentStreamEvent, TextStreamEvent, ToolUseStreamEvent, TypedEvent, @@ -170,16 +171,12 @@ def handle_content_block_delta( delta=delta_content, ) - elif "redactedContent" in delta_content["reasoningContent"]: - if "redactedContent" not in state: - state["redactedContent"] = b"" - - state["redactedContent"] += delta_content["reasoningContent"]["redactedContent"] - callback_handler( - redactedContent=delta_content["reasoningContent"]["redactedContent"], + elif redacted_content := delta_content["reasoningContent"].get("redactedContent"): + state.setdefault("redactedContent", b"") + state["redactedContent"] += redacted_content + typed_event = RedactedContentStreamEvent( + redacted_content=redacted_content, delta=delta_content, - reasoning=True, - **kwargs, ) return state, typed_event @@ -200,7 +197,7 @@ def handle_content_block_stop(state: dict[str, Any]) -> dict[str, Any]: text = state["text"] reasoning_text = state["reasoningText"] citations_content = state["citationsContent"] - redacted_content = state["redactedContent"] + redacted_content = state.get("redactedContent") if current_tool_use: if "input" not in current_tool_use: diff --git a/src/strands/types/_events.py b/src/strands/types/_events.py index ccdab1846..10f1a0c5e 100644 --- a/src/strands/types/_events.py +++ b/src/strands/types/_events.py @@ -169,6 +169,14 @@ def __init__(self, delta: ContentBlockDelta, reasoning_text: str | None) -> None super().__init__({"reasoningText": reasoning_text, "delta": delta, "reasoning": True}) +class RedactedContentStreamEvent(ModelStreamEvent): + """Event emitted during redacted content streaming.""" + + def __init__(self, delta: ContentBlockDelta, redacted_content: bytes | None) -> None: + """Initialize with delta and redacted content.""" + super().__init__({"redactedContent": redacted_content, "delta": delta, "reasoning": True}) + + class ReasoningSignatureStreamEvent(ModelStreamEvent): """Event emitted during reasoning signature streaming.""" diff --git a/tests/strands/event_loop/test_streaming.py b/tests/strands/event_loop/test_streaming.py index 2a12eb0f0..16dc72361 100644 --- a/tests/strands/event_loop/test_streaming.py +++ b/tests/strands/event_loop/test_streaming.py @@ -132,18 +132,20 @@ def test_handle_content_block_start(chunk: ContentBlockStartEvent, exp_tool_use) {"reasoning_signature": "val", "reasoning": True}, ), # Reasoning - redactedContent - New - ( + pytest.param( {"delta": {"reasoningContent": {"redactedContent": b"encoded"}}}, {}, {"redactedContent": b"encoded"}, {"redactedContent": b"encoded", "reasoning": True}, + marks=pytest.mark.skip(reason="Implementation has undefined callback_handler"), ), # Reasoning - redactedContent - Existing - ( + pytest.param( {"delta": {"reasoningContent": {"redactedContent": b"data"}}}, {"redactedContent": b"encoded_"}, {"redactedContent": b"encoded_data"}, {"redactedContent": b"data", "reasoning": True}, + marks=pytest.mark.skip(reason="Implementation has undefined callback_handler"), ), # Reasoning - Empty ( @@ -227,6 +229,7 @@ def test_handle_content_block_delta(event: ContentBlockDeltaEvent, state, exp_up "text": "", "reasoningText": "", "citationsContent": [], + "redactedContent": b"", }, ), # Citations @@ -237,6 +240,7 @@ def test_handle_content_block_delta(event: ContentBlockDeltaEvent, state, exp_up "text": "", "reasoningText": "", "citationsContent": [{"citations": [{"text": "test", "source": "test"}]}], + "redactedContent": b"", }, { "content": [], @@ -265,6 +269,7 @@ def test_handle_content_block_delta(event: ContentBlockDeltaEvent, state, exp_up "reasoningText": "", "signature": "123", "citationsContent": [], + "redactedContent": b"", }, ), # Reasoning without signature @@ -275,6 +280,7 @@ def test_handle_content_block_delta(event: ContentBlockDeltaEvent, state, exp_up "text": "", "reasoningText": "test", "citationsContent": [], + "redactedContent": b"", }, { "content": [{"reasoningContent": {"reasoningText": {"text": "test"}}}], @@ -293,6 +299,7 @@ def test_handle_content_block_delta(event: ContentBlockDeltaEvent, state, exp_up "text": "", "reasoningText": "", "redactedContent": b"encoded_data", + "citationsContent": [], }, { "content": [{"reasoningContent": {"redactedContent": b"encoded_data"}}], @@ -300,6 +307,7 @@ def test_handle_content_block_delta(event: ContentBlockDeltaEvent, state, exp_up "text": "", "reasoningText": "", "redactedContent": b"", + "citationsContent": [], }, ), # Empty @@ -490,44 +498,50 @@ def test_extract_usage_metrics_with_cache_tokens(): }, ], ), + ], +) +@pytest.mark.asyncio +async def test_process_stream(response, exp_events, agenerator, alist): + stream = strands.event_loop.streaming.process_stream(agenerator(response)) + + tru_events = await alist(stream) + assert tru_events == exp_events + + # Ensure that we're getting typed events coming out of process_stream + non_typed_events = [event for event in tru_events if not isinstance(event, TypedEvent)] + assert non_typed_events == [] + + +@pytest.mark.parametrize( + "response", + [ # Redacted Message - ( - [ - {"messageStart": {"role": "assistant"}}, - { - "contentBlockStart": {"start": {}}, - }, - { - "contentBlockDelta": {"delta": {"text": "Hello!"}}, - }, - {"contentBlockStop": {}}, - { - "messageStop": {"stopReason": "guardrail_intervened"}, - }, - { - "redactContent": { - "redactUserContentMessage": "REDACTED", - "redactAssistantContentMessage": "REDACTED.", - } - }, - { - "metadata": { - "usage": {"inputTokens": 1, "outputTokens": 1, "totalTokens": 1}, - "metrics": {"latencyMs": 1}, - } - }, - ], - "guardrail_intervened", + [ + {"messageStart": {"role": "assistant"}}, { - "role": "assistant", - "content": [{"text": "REDACTED."}], + "contentBlockStart": {"start": {}}, }, - {"inputTokens": 1, "outputTokens": 1, "totalTokens": 1}, - {"latencyMs": 1}, - {"calls": 1}, - [{"role": "user", "content": [{"text": "REDACTED"}]}], - ), - ( + { + "contentBlockDelta": {"delta": {"text": "Hello!"}}, + }, + {"contentBlockStop": {}}, + { + "messageStop": {"stopReason": "guardrail_intervened"}, + }, + { + "redactContent": { + "redactUserContentMessage": "REDACTED", + "redactAssistantContentMessage": "REDACTED.", + } + }, + { + "metadata": { + "usage": {"inputTokens": 1, "outputTokens": 1, "totalTokens": 1}, + "metrics": {"latencyMs": 1}, + } + }, + ], + pytest.param( [ {"messageStart": {"role": "assistant"}}, { @@ -547,24 +561,18 @@ def test_extract_usage_metrics_with_cache_tokens(): } }, ], - "end_turn", - { - "role": "assistant", - "content": [{"reasoningContent": {"redactedContent": b"encoded_data"}}], - }, - {"inputTokens": 1, "outputTokens": 1, "totalTokens": 1}, - {"latencyMs": 1}, - {"calls": 1}, - [{"role": "user", "content": [{"text": "Some input!"}]}], + marks=pytest.mark.skip(reason="Implementation has undefined callback_handler"), ), ], ) @pytest.mark.asyncio -async def test_process_stream(response, exp_events, agenerator, alist): +async def test_process_stream_redacted(response, agenerator, alist): stream = strands.event_loop.streaming.process_stream(agenerator(response)) tru_events = await alist(stream) - assert tru_events == exp_events + + # Verify the structure matches expected redacted content behavior + assert len(tru_events) > 0 # Ensure that we're getting typed events coming out of process_stream non_typed_events = [event for event in tru_events if not isinstance(event, TypedEvent)] @@ -712,34 +720,3 @@ async def test_stream_messages(agenerator, alist): # Ensure that we're getting typed events coming out of process_stream non_typed_events = [event for event in tru_events if not isinstance(event, TypedEvent)] assert non_typed_events == [] - - -def test_process_stream_redacted_content_callback(): - callback_args = [] - - def callback_handler(**kwargs): - callback_args.append(kwargs) - - response = [ - {"messageStart": {"role": "assistant"}}, - {"contentBlockStart": {"start": {}}}, - {"contentBlockDelta": {"delta": {"reasoningContent": {"redactedContent": b"encoded_data_1"}}}}, - {"contentBlockDelta": {"delta": {"reasoningContent": {"redactedContent": b"encoded_data_2"}}}}, - {"contentBlockStop": {}}, - {"messageStop": {"stopReason": "end_turn"}}, - ] - - messages = [{"role": "user", "content": [{"text": "Some input!"}]}] - - strands.event_loop.streaming.process_stream(response, callback_handler, messages) - - redacted_callbacks = [args for args in callback_args if "redactedContent" in args] - assert len(redacted_callbacks) == 2 - - assert redacted_callbacks[0]["redactedContent"] == b"encoded_data_1" - assert redacted_callbacks[0]["delta"] == {"reasoningContent": {"redactedContent": b"encoded_data_1"}} - assert redacted_callbacks[0]["reasoning"] is True - - assert redacted_callbacks[1]["redactedContent"] == b"encoded_data_2" - assert redacted_callbacks[1]["delta"] == {"reasoningContent": {"redactedContent": b"encoded_data_2"}} - assert redacted_callbacks[1]["reasoning"] is True diff --git a/tests_integ/models/test_model_bedrock.py b/tests_integ/models/test_model_bedrock.py index 00107411a..9dff66fde 100644 --- a/tests_integ/models/test_model_bedrock.py +++ b/tests_integ/models/test_model_bedrock.py @@ -244,3 +244,26 @@ def test_structured_output_multi_modal_input(streaming_agent, yellow_img, yellow tru_color = streaming_agent.structured_output(type(yellow_color), content) exp_color = yellow_color assert tru_color == exp_color + + +def test_redacted_content_handling(): + """Test redactedContent handling with thinking mode.""" + bedrock_model = BedrockModel( + model_id="us.anthropic.claude-3-7-sonnet-20250219-v1:0", + additional_request_fields={ + "thinking": { + "type": "enabled", + "budget_tokens": 2000, + } + }, + ) + + agent = Agent(name="test_redact", model=bedrock_model) + # https://docs.anthropic.com/en/docs/build-with-claude/extended-thinking#example-working-with-redacted-thinking-blocks + result = agent( + "ANTHROPIC_MAGIC_STRING_TRIGGER_REDACTED_THINKING_46C9A13E193C177646C7398A98432ECCCE4C1253D5E2D82641AC0E52CC2876CB" + ) + + assert "reasoningContent" in result.message["content"][0] + assert "redactedContent" in result.message["content"][0]["reasoningContent"] + assert isinstance(result.message["content"][0]["reasoningContent"]["redactedContent"], bytes) From f7d161c32f8eb23526d1d1bfcfc6e042403e9214 Mon Sep 17 00:00:00 2001 From: Aaron Farntrog Date: Thu, 11 Sep 2025 14:59:37 -0400 Subject: [PATCH 08/16] tests --- tests/strands/event_loop/test_streaming.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/strands/event_loop/test_streaming.py b/tests/strands/event_loop/test_streaming.py index 16dc72361..ad4bd40f8 100644 --- a/tests/strands/event_loop/test_streaming.py +++ b/tests/strands/event_loop/test_streaming.py @@ -137,7 +137,6 @@ def test_handle_content_block_start(chunk: ContentBlockStartEvent, exp_tool_use) {}, {"redactedContent": b"encoded"}, {"redactedContent": b"encoded", "reasoning": True}, - marks=pytest.mark.skip(reason="Implementation has undefined callback_handler"), ), # Reasoning - redactedContent - Existing pytest.param( @@ -145,7 +144,6 @@ def test_handle_content_block_start(chunk: ContentBlockStartEvent, exp_tool_use) {"redactedContent": b"encoded_"}, {"redactedContent": b"encoded_data"}, {"redactedContent": b"data", "reasoning": True}, - marks=pytest.mark.skip(reason="Implementation has undefined callback_handler"), ), # Reasoning - Empty ( From 8eb5abe5ab51c812dd91bea6b5620d66f64610c6 Mon Sep 17 00:00:00 2001 From: Aaron Farntrog Date: Thu, 11 Sep 2025 16:22:01 -0400 Subject: [PATCH 09/16] update RedactedContentStreamEvent to make reasoning optional with default --- src/strands/event_loop/streaming.py | 3 +-- src/strands/types/_events.py | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/strands/event_loop/streaming.py b/src/strands/event_loop/streaming.py index 15687f9a8..2b4d25223 100644 --- a/src/strands/event_loop/streaming.py +++ b/src/strands/event_loop/streaming.py @@ -175,8 +175,7 @@ def handle_content_block_delta( state.setdefault("redactedContent", b"") state["redactedContent"] += redacted_content typed_event = RedactedContentStreamEvent( - redacted_content=redacted_content, - delta=delta_content, + redacted_content=redacted_content, delta=delta_content, reasoning=True ) return state, typed_event diff --git a/src/strands/types/_events.py b/src/strands/types/_events.py index 10f1a0c5e..46724c776 100644 --- a/src/strands/types/_events.py +++ b/src/strands/types/_events.py @@ -172,9 +172,9 @@ def __init__(self, delta: ContentBlockDelta, reasoning_text: str | None) -> None class RedactedContentStreamEvent(ModelStreamEvent): """Event emitted during redacted content streaming.""" - def __init__(self, delta: ContentBlockDelta, redacted_content: bytes | None) -> None: + def __init__(self, delta: ContentBlockDelta, redacted_content: bytes | None, reasoning: bool = False) -> None: """Initialize with delta and redacted content.""" - super().__init__({"redactedContent": redacted_content, "delta": delta, "reasoning": True}) + super().__init__({"redactedContent": redacted_content, "delta": delta, "reasoning": reasoning}) class ReasoningSignatureStreamEvent(ModelStreamEvent): From 206b82956c191ff79f6183607e3e40fbe2f3745d Mon Sep 17 00:00:00 2001 From: Aaron Farntrog Date: Fri, 12 Sep 2025 11:09:50 -0400 Subject: [PATCH 10/16] add redactedContent to state only when present --- src/strands/event_loop/streaming.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/strands/event_loop/streaming.py b/src/strands/event_loop/streaming.py index 2b4d25223..eae7eee07 100644 --- a/src/strands/event_loop/streaming.py +++ b/src/strands/event_loop/streaming.py @@ -172,8 +172,7 @@ def handle_content_block_delta( ) elif redacted_content := delta_content["reasoningContent"].get("redactedContent"): - state.setdefault("redactedContent", b"") - state["redactedContent"] += redacted_content + state["redactedContent"] = state.get("redactedContent", b"") + redacted_content typed_event = RedactedContentStreamEvent( redacted_content=redacted_content, delta=delta_content, reasoning=True ) @@ -302,7 +301,6 @@ async def process_stream(chunks: AsyncIterable[StreamEvent]) -> AsyncGenerator[T "current_tool_use": {}, "reasoningText": "", "citationsContent": [], - "redactedContent": b"", } state["content"] = state["message"]["content"] From f11b13409b0297b58a93726490f458ad99375c5a Mon Sep 17 00:00:00 2001 From: Aaron Farntrog Date: Fri, 12 Sep 2025 15:33:39 -0400 Subject: [PATCH 11/16] update event type --- src/strands/event_loop/streaming.py | 4 ++-- src/strands/types/_events.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/strands/event_loop/streaming.py b/src/strands/event_loop/streaming.py index eae7eee07..b28a14e99 100644 --- a/src/strands/event_loop/streaming.py +++ b/src/strands/event_loop/streaming.py @@ -12,7 +12,7 @@ ModelStreamEvent, ReasoningSignatureStreamEvent, ReasoningTextStreamEvent, - RedactedContentStreamEvent, + ReasoningRedactedContentStreamEvent, TextStreamEvent, ToolUseStreamEvent, TypedEvent, @@ -173,7 +173,7 @@ def handle_content_block_delta( elif redacted_content := delta_content["reasoningContent"].get("redactedContent"): state["redactedContent"] = state.get("redactedContent", b"") + redacted_content - typed_event = RedactedContentStreamEvent( + typed_event = ReasoningRedactedContentStreamEvent( redacted_content=redacted_content, delta=delta_content, reasoning=True ) diff --git a/src/strands/types/_events.py b/src/strands/types/_events.py index 46724c776..9f1e80752 100644 --- a/src/strands/types/_events.py +++ b/src/strands/types/_events.py @@ -169,7 +169,7 @@ def __init__(self, delta: ContentBlockDelta, reasoning_text: str | None) -> None super().__init__({"reasoningText": reasoning_text, "delta": delta, "reasoning": True}) -class RedactedContentStreamEvent(ModelStreamEvent): +class ReasoningRedactedContentStreamEvent(ModelStreamEvent): """Event emitted during redacted content streaming.""" def __init__(self, delta: ContentBlockDelta, redacted_content: bytes | None, reasoning: bool = False) -> None: From 01107897a1f251e383467fb330a780e853b2c6d8 Mon Sep 17 00:00:00 2001 From: Aaron Farntrog Date: Fri, 12 Sep 2025 15:40:36 -0400 Subject: [PATCH 12/16] update event type --- src/strands/event_loop/streaming.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/strands/event_loop/streaming.py b/src/strands/event_loop/streaming.py index b28a14e99..c60debe1f 100644 --- a/src/strands/event_loop/streaming.py +++ b/src/strands/event_loop/streaming.py @@ -10,9 +10,9 @@ ModelStopReason, ModelStreamChunkEvent, ModelStreamEvent, + ReasoningRedactedContentStreamEvent, ReasoningSignatureStreamEvent, ReasoningTextStreamEvent, - ReasoningRedactedContentStreamEvent, TextStreamEvent, ToolUseStreamEvent, TypedEvent, From 46002ebcbe10ba879bd336f69480f97da964be00 Mon Sep 17 00:00:00 2001 From: Aaron Farntrog Date: Fri, 12 Sep 2025 16:26:07 -0400 Subject: [PATCH 13/16] use reasoningRedactedContent bec we will remove the reasoning param in v2 --- src/strands/types/_events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/strands/types/_events.py b/src/strands/types/_events.py index 9f1e80752..6fb1445a3 100644 --- a/src/strands/types/_events.py +++ b/src/strands/types/_events.py @@ -174,7 +174,7 @@ class ReasoningRedactedContentStreamEvent(ModelStreamEvent): def __init__(self, delta: ContentBlockDelta, redacted_content: bytes | None, reasoning: bool = False) -> None: """Initialize with delta and redacted content.""" - super().__init__({"redactedContent": redacted_content, "delta": delta, "reasoning": reasoning}) + super().__init__({"reasoningRedactedContent": redacted_content, "delta": delta, "reasoning": reasoning}) class ReasoningSignatureStreamEvent(ModelStreamEvent): From a1f4ea4e54d49f6b2834d11d72e5c3c13f957983 Mon Sep 17 00:00:00 2001 From: Aaron Farntrog Date: Fri, 12 Sep 2025 16:36:58 -0400 Subject: [PATCH 14/16] use reasoningRedactedContent bec we will remove the reasoning param in v2 --- tests/strands/event_loop/test_streaming.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/strands/event_loop/test_streaming.py b/tests/strands/event_loop/test_streaming.py index ad4bd40f8..dbc56f813 100644 --- a/tests/strands/event_loop/test_streaming.py +++ b/tests/strands/event_loop/test_streaming.py @@ -136,14 +136,14 @@ def test_handle_content_block_start(chunk: ContentBlockStartEvent, exp_tool_use) {"delta": {"reasoningContent": {"redactedContent": b"encoded"}}}, {}, {"redactedContent": b"encoded"}, - {"redactedContent": b"encoded", "reasoning": True}, + {"reasoningRedactedContent": b"encoded", "reasoning": True}, ), # Reasoning - redactedContent - Existing pytest.param( {"delta": {"reasoningContent": {"redactedContent": b"data"}}}, {"redactedContent": b"encoded_"}, {"redactedContent": b"encoded_data"}, - {"redactedContent": b"data", "reasoning": True}, + {"reasoningRedactedContent": b"data", "reasoning": True}, ), # Reasoning - Empty ( From d50b1d43e648208ae6d66496f4bb8825eaf5c078 Mon Sep 17 00:00:00 2001 From: Aaron Farntrog Date: Mon, 15 Sep 2025 11:32:19 -0400 Subject: [PATCH 15/16] test updates and typed event param removed as discussed --- src/strands/event_loop/streaming.py | 4 +- src/strands/types/_events.py | 4 +- tests/strands/event_loop/test_streaming.py | 144 ++++++++++++++++----- 3 files changed, 114 insertions(+), 38 deletions(-) diff --git a/src/strands/event_loop/streaming.py b/src/strands/event_loop/streaming.py index c60debe1f..f24bd2a76 100644 --- a/src/strands/event_loop/streaming.py +++ b/src/strands/event_loop/streaming.py @@ -173,9 +173,7 @@ def handle_content_block_delta( elif redacted_content := delta_content["reasoningContent"].get("redactedContent"): state["redactedContent"] = state.get("redactedContent", b"") + redacted_content - typed_event = ReasoningRedactedContentStreamEvent( - redacted_content=redacted_content, delta=delta_content, reasoning=True - ) + typed_event = ReasoningRedactedContentStreamEvent(redacted_content=redacted_content, delta=delta_content) return state, typed_event diff --git a/src/strands/types/_events.py b/src/strands/types/_events.py index 6fb1445a3..3d0f1d0f0 100644 --- a/src/strands/types/_events.py +++ b/src/strands/types/_events.py @@ -172,9 +172,9 @@ def __init__(self, delta: ContentBlockDelta, reasoning_text: str | None) -> None class ReasoningRedactedContentStreamEvent(ModelStreamEvent): """Event emitted during redacted content streaming.""" - def __init__(self, delta: ContentBlockDelta, redacted_content: bytes | None, reasoning: bool = False) -> None: + def __init__(self, delta: ContentBlockDelta, redacted_content: bytes | None) -> None: """Initialize with delta and redacted content.""" - super().__init__({"reasoningRedactedContent": redacted_content, "delta": delta, "reasoning": reasoning}) + super().__init__({"reasoningRedactedContent": redacted_content, "delta": delta, "reasoning": True}) class ReasoningSignatureStreamEvent(ModelStreamEvent): diff --git a/tests/strands/event_loop/test_streaming.py b/tests/strands/event_loop/test_streaming.py index dbc56f813..1de957619 100644 --- a/tests/strands/event_loop/test_streaming.py +++ b/tests/strands/event_loop/test_streaming.py @@ -511,35 +511,77 @@ async def test_process_stream(response, exp_events, agenerator, alist): @pytest.mark.parametrize( - "response", + ("response", "exp_events"), [ # Redacted Message - [ - {"messageStart": {"role": "assistant"}}, - { - "contentBlockStart": {"start": {}}, - }, - { - "contentBlockDelta": {"delta": {"text": "Hello!"}}, - }, - {"contentBlockStop": {}}, - { - "messageStop": {"stopReason": "guardrail_intervened"}, - }, - { - "redactContent": { - "redactUserContentMessage": "REDACTED", - "redactAssistantContentMessage": "REDACTED.", - } - }, - { - "metadata": { - "usage": {"inputTokens": 1, "outputTokens": 1, "totalTokens": 1}, - "metrics": {"latencyMs": 1}, - } - }, - ], - pytest.param( + ( + [ + {"messageStart": {"role": "assistant"}}, + { + "contentBlockStart": {"start": {}}, + }, + { + "contentBlockDelta": {"delta": {"text": "Hello!"}}, + }, + {"contentBlockStop": {}}, + { + "messageStop": {"stopReason": "guardrail_intervened"}, + }, + { + "redactContent": { + "redactUserContentMessage": "REDACTED", + "redactAssistantContentMessage": "REDACTED.", + } + }, + { + "metadata": { + "usage": { + "inputTokens": 1, + "outputTokens": 1, + "totalTokens": 1, + }, + "metrics": {"latencyMs": 1}, + } + }, + ], + [ + {"event": {"messageStart": {"role": "assistant"}}}, + {"event": {"contentBlockStart": {"start": {}}}}, + {"event": {"contentBlockDelta": {"delta": {"text": "Hello!"}}}}, + {"data": "Hello!", "delta": {"text": "Hello!"}}, + {"event": {"contentBlockStop": {}}}, + {"event": {"messageStop": {"stopReason": "guardrail_intervened"}}}, + { + "event": { + "redactContent": { + "redactUserContentMessage": "REDACTED", + "redactAssistantContentMessage": "REDACTED.", + } + } + }, + { + "event": { + "metadata": { + "usage": { + "inputTokens": 1, + "outputTokens": 1, + "totalTokens": 1, + }, + "metrics": {"latencyMs": 1}, + } + } + }, + { + "stop": ( + "guardrail_intervened", + {"role": "assistant", "content": [{"text": "REDACTED."}]}, + {"inputTokens": 1, "outputTokens": 1, "totalTokens": 1}, + {"latencyMs": 1}, + ) + }, + ], + ), + ( [ {"messageStart": {"role": "assistant"}}, { @@ -554,23 +596,59 @@ async def test_process_stream(response, exp_events, agenerator, alist): }, { "metadata": { - "usage": {"inputTokens": 1, "outputTokens": 1, "totalTokens": 1}, + "usage": { + "inputTokens": 1, + "outputTokens": 1, + "totalTokens": 1, + }, "metrics": {"latencyMs": 1}, } }, ], - marks=pytest.mark.skip(reason="Implementation has undefined callback_handler"), + [ + {"event": {"messageStart": {"role": "assistant"}}}, + {"event": {"contentBlockStart": {"start": {}}}}, + {"event": {"contentBlockDelta": {"delta": {"reasoningContent": {"redactedContent": b"encoded_data"}}}}}, + { + "reasoningRedactedContent": b"encoded_data", + "delta": {"reasoningContent": {"redactedContent": b"encoded_data"}}, + "reasoning": True, + }, + {"event": {"contentBlockStop": {}}}, + {"event": {"messageStop": {"stopReason": "end_turn"}}}, + { + "event": { + "metadata": { + "usage": { + "inputTokens": 1, + "outputTokens": 1, + "totalTokens": 1, + }, + "metrics": {"latencyMs": 1}, + } + } + }, + { + "stop": ( + "end_turn", + { + "role": "assistant", + "content": [{"reasoningContent": {"redactedContent": b"encoded_data"}}], + }, + {"inputTokens": 1, "outputTokens": 1, "totalTokens": 1}, + {"latencyMs": 1}, + ) + }, + ], ), ], ) @pytest.mark.asyncio -async def test_process_stream_redacted(response, agenerator, alist): +async def test_process_stream_redacted(response, exp_events, agenerator, alist): stream = strands.event_loop.streaming.process_stream(agenerator(response)) tru_events = await alist(stream) - - # Verify the structure matches expected redacted content behavior - assert len(tru_events) > 0 + assert tru_events == exp_events # Ensure that we're getting typed events coming out of process_stream non_typed_events = [event for event in tru_events if not isinstance(event, TypedEvent)] From 9aaf4fab55b6276d69d85e56cfc4561846a03f45 Mon Sep 17 00:00:00 2001 From: Aaron Farntrog Date: Mon, 15 Sep 2025 12:49:30 -0400 Subject: [PATCH 16/16] add test agent events for redacted reasoning content --- tests/fixtures/mocked_model_provider.py | 4 + .../strands/agent/hooks/test_agent_events.py | 78 +++++++++++++++++++ 2 files changed, 82 insertions(+) diff --git a/tests/fixtures/mocked_model_provider.py b/tests/fixtures/mocked_model_provider.py index 2a397bb18..c05089f34 100644 --- a/tests/fixtures/mocked_model_provider.py +++ b/tests/fixtures/mocked_model_provider.py @@ -72,6 +72,10 @@ def map_agent_message_to_events(self, agent_message: Union[Message, RedactionMes stop_reason = "guardrail_intervened" else: for content in agent_message["content"]: + if "reasoningContent" in content: + yield {"contentBlockStart": {"start": {}}} + yield {"contentBlockDelta": {"delta": {"reasoningContent": content["reasoningContent"]}}} + yield {"contentBlockStop": {}} if "text" in content: yield {"contentBlockStart": {"start": {}}} yield {"contentBlockDelta": {"delta": {"text": content["text"]}}} diff --git a/tests/strands/agent/hooks/test_agent_events.py b/tests/strands/agent/hooks/test_agent_events.py index 01bfc5409..9b3646144 100644 --- a/tests/strands/agent/hooks/test_agent_events.py +++ b/tests/strands/agent/hooks/test_agent_events.py @@ -387,6 +387,84 @@ async def test_stream_e2e_throttle_and_redact(alist, mock_sleep): assert typed_events == [] +@pytest.mark.asyncio +async def test_stream_e2e_reasoning_redacted_content(alist): + mock_provider = MockedModelProvider( + [ + { + "role": "assistant", + "content": [ + {"reasoningContent": {"redactedContent": b"test_redacted_data"}}, + {"text": "Response with redacted reasoning"}, + ], + }, + ] + ) + + mock_callback = unittest.mock.Mock() + agent = Agent(model=mock_provider, callback_handler=mock_callback) + + stream = agent.stream_async("Test redacted content") + + tru_events = await alist(stream) + exp_events = [ + {"init_event_loop": True}, + {"start": True}, + {"start_event_loop": True}, + {"event": {"messageStart": {"role": "assistant"}}}, + {"event": {"contentBlockStart": {"start": {}}}}, + {"event": {"contentBlockDelta": {"delta": {"reasoningContent": {"redactedContent": b"test_redacted_data"}}}}}, + { + **any_props, + "reasoningRedactedContent": b"test_redacted_data", + "delta": {"reasoningContent": {"redactedContent": b"test_redacted_data"}}, + "reasoning": True, + }, + {"event": {"contentBlockStop": {}}}, + {"event": {"contentBlockStart": {"start": {}}}}, + {"event": {"contentBlockDelta": {"delta": {"text": "Response with redacted reasoning"}}}}, + { + **any_props, + "data": "Response with redacted reasoning", + "delta": {"text": "Response with redacted reasoning"}, + }, + {"event": {"contentBlockStop": {}}}, + {"event": {"messageStop": {"stopReason": "end_turn"}}}, + { + "message": { + "content": [ + {"reasoningContent": {"redactedContent": b"test_redacted_data"}}, + {"text": "Response with redacted reasoning"}, + ], + "role": "assistant", + } + }, + { + "result": AgentResult( + stop_reason="end_turn", + message={ + "content": [ + {"reasoningContent": {"redactedContent": b"test_redacted_data"}}, + {"text": "Response with redacted reasoning"}, + ], + "role": "assistant", + }, + metrics=ANY, + state={}, + ) + }, + ] + assert tru_events == exp_events + + exp_calls = [call(**event) for event in exp_events] + act_calls = mock_callback.call_args_list + assert act_calls == exp_calls + + # Ensure that all events coming out of the agent are *not* typed events + typed_events = [event for event in tru_events if isinstance(event, TypedEvent)] + assert typed_events == [] + + @pytest.mark.asyncio async def test_event_loop_cycle_text_response_throttling_early_end( agenerator,