diff --git a/src/strands/event_loop/streaming.py b/src/strands/event_loop/streaming.py index 43836fe34..ab1e57b65 100644 --- a/src/strands/event_loop/streaming.py +++ b/src/strands/event_loop/streaming.py @@ -185,6 +185,9 @@ def handle_content_block_start(event: ContentBlockStartEvent) -> dict[str, Any]: current_tool_use["toolUseId"] = tool_use_data["toolUseId"] current_tool_use["name"] = tool_use_data["name"] current_tool_use["input"] = "" + # Preserve thoughtSignature if present (required for Gemini 3 Pro) + if "thoughtSignature" in tool_use_data: + current_tool_use["thoughtSignature"] = tool_use_data["thoughtSignature"] return current_tool_use @@ -285,6 +288,11 @@ def handle_content_block_stop(state: dict[str, Any]) -> dict[str, Any]: name=tool_use_name, input=current_tool_use["input"], ) + + # Preserve thoughtSignature if present (required for Gemini 3 Pro) + if "thoughtSignature" in current_tool_use: + tool_use["thoughtSignature"] = current_tool_use["thoughtSignature"] + content.append({"toolUse": tool_use}) state["current_tool_use"] = {} diff --git a/src/strands/models/gemini.py b/src/strands/models/gemini.py index c24d91a0d..5069335ef 100644 --- a/src/strands/models/gemini.py +++ b/src/strands/models/gemini.py @@ -3,6 +3,7 @@ - Docs: https://ai.google.dev/api """ +import base64 import json import logging import mimetypes @@ -141,12 +142,28 @@ def _format_request_content_part(self, content: ContentBlock) -> genai.types.Par ) if "toolUse" in content: + thought_signature_b64 = content["toolUse"].get("thoughtSignature") + + tool_use_thought_signature: Optional[bytes] = None + if thought_signature_b64: + try: + tool_use_thought_signature = base64.b64decode(thought_signature_b64) + except Exception as e: + tool_use_id = content["toolUse"].get("toolUseId") + logger.error("toolUseId=<%s> | failed to decode thoughtSignature: %s", tool_use_id, e) + else: + # thoughtSignature is now preserved by the Strands framework (as of v1.18+) + # If missing, it means the model didn't provide one (e.g., older Gemini versions) + tool_use_id = content["toolUse"].get("toolUseId") + logger.debug("toolUseId=<%s> | no thoughtSignature in toolUse (model may not require it)", tool_use_id) + return genai.types.Part( function_call=genai.types.FunctionCall( args=content["toolUse"]["input"], id=content["toolUse"]["toolUseId"], name=content["toolUse"]["name"], ), + thought_signature=tool_use_thought_signature, ) raise TypeError(f"content_type=<{next(iter(content))}> | unsupported type") @@ -212,9 +229,19 @@ def _format_request_config( Returns: Gemini request config. """ + # Disable thinking text output when tools are present + # Note: Setting include_thoughts=False prevents thinking text in responses but + # Gemini still returns thought_signature for function calls. As of Strands v1.18+, + # the framework properly preserves this field through the message history. + # See: https://ai.google.dev/gemini-api/docs/thought-signatures + thinking_config = None + if tool_specs: + thinking_config = genai.types.ThinkingConfig(include_thoughts=False) + return genai.types.GenerateContentConfig( system_instruction=system_prompt, tools=self._format_request_tools(tool_specs), + thinking_config=thinking_config, **(params or {}), ) @@ -268,14 +295,24 @@ def _format_chunk(self, event: dict[str, Any]) -> StreamEvent: # that name be set in the equivalent FunctionResponse type. Consequently, we assign # function name to toolUseId in our tool use block. And another reason, function_call is # not guaranteed to have id populated. + tool_use: dict[str, Any] = { + "name": event["data"].function_call.name, + "toolUseId": event["data"].function_call.name, + } + + # Get thought_signature from the event dict (passed from stream method) + thought_sig = event.get("thought_signature") + + if thought_sig: + # Ensure it's bytes for encoding + if isinstance(thought_sig, str): + thought_sig = thought_sig.encode("utf-8") + # Use base64 encoding for storage + tool_use["thoughtSignature"] = base64.b64encode(thought_sig).decode("utf-8") + return { "contentBlockStart": { - "start": { - "toolUse": { - "name": event["data"].function_call.name, - "toolUseId": event["data"].function_call.name, - }, - }, + "start": {"toolUse": cast(Any, tool_use)}, }, } @@ -373,6 +410,10 @@ async def stream( yield self._format_chunk({"chunk_type": "content_start", "data_type": "text"}) tool_used = False + # Track thought_signature to associate with function calls + # According to Gemini docs, thought_signature can be on any part + last_thought_signature: Optional[bytes] = None + async for event in response: candidates = event.candidates candidate = candidates[0] if candidates else None @@ -380,8 +421,22 @@ async def stream( parts = content.parts if content and content.parts else [] for part in parts: + # Check ALL parts for thought_signature (Gemini may still include it even with thinking disabled) + if hasattr(part, "thought_signature") and part.thought_signature: + last_thought_signature = part.thought_signature + if part.function_call: - yield self._format_chunk({"chunk_type": "content_start", "data_type": "tool", "data": part}) + # Use the last thought_signature captured + effective_thought_signature = last_thought_signature + + yield self._format_chunk( + { + "chunk_type": "content_start", + "data_type": "tool", + "data": part, + "thought_signature": effective_thought_signature, + } + ) yield self._format_chunk({"chunk_type": "content_delta", "data_type": "tool", "data": part}) yield self._format_chunk({"chunk_type": "content_stop", "data_type": "tool", "data": part}) tool_used = True diff --git a/src/strands/types/content.py b/src/strands/types/content.py index 4d0bbe412..e4efc9208 100644 --- a/src/strands/types/content.py +++ b/src/strands/types/content.py @@ -8,7 +8,7 @@ from typing import Dict, List, Literal, Optional -from typing_extensions import TypedDict +from typing_extensions import NotRequired, TypedDict from .citations import CitationsContentBlock from .media import DocumentContent, ImageContent, VideoContent @@ -123,16 +123,18 @@ class DeltaContent(TypedDict, total=False): toolUse: Dict[Literal["input"], str] -class ContentBlockStartToolUse(TypedDict): +class ContentBlockStartToolUse(TypedDict, total=False): """The start of a tool use block. Attributes: name: The name of the tool that the model is requesting to use. toolUseId: The ID for the tool request. + thoughtSignature: Optional encrypted token from Gemini for multi-turn reasoning. """ name: str toolUseId: str + thoughtSignature: NotRequired[str] class ContentBlockStart(TypedDict, total=False): diff --git a/src/strands/types/tools.py b/src/strands/types/tools.py index 8343647b2..27073f95e 100644 --- a/src/strands/types/tools.py +++ b/src/strands/types/tools.py @@ -52,7 +52,7 @@ class Tool(TypedDict): toolSpec: ToolSpec -class ToolUse(TypedDict): +class ToolUse(TypedDict, total=False): """A request from the model to use a specific tool with the provided input. Attributes: @@ -60,11 +60,16 @@ class ToolUse(TypedDict): Can be any JSON-serializable type. name: The name of the tool to invoke. toolUseId: A unique identifier for this specific tool use request. + thoughtSignature: Optional encrypted token from Gemini that preserves + the model's internal reasoning process for multi-turn conversations. + Required for Gemini 3 Pro when using function calling. + See: https://ai.google.dev/gemini-api/docs/thought-signatures """ input: Any name: str toolUseId: str + thoughtSignature: NotRequired[str] class ToolResultContent(TypedDict, total=False): diff --git a/tests/strands/event_loop/test_streaming.py b/tests/strands/event_loop/test_streaming.py index 3f5a6c998..5c8483632 100644 --- a/tests/strands/event_loop/test_streaming.py +++ b/tests/strands/event_loop/test_streaming.py @@ -124,6 +124,10 @@ def test_handle_message_start(): {"start": {"toolUse": {"toolUseId": "test", "name": "test"}}}, {"toolUseId": "test", "name": "test", "input": ""}, ), + ( + {"start": {"toolUse": {"toolUseId": "test", "name": "test", "thoughtSignature": "YWJj"}}}, + {"toolUseId": "test", "name": "test", "input": "", "thoughtSignature": "YWJj"}, + ), ], ) def test_handle_content_block_start(chunk: ContentBlockStartEvent, exp_tool_use): @@ -245,6 +249,39 @@ def test_handle_content_block_delta(event: ContentBlockDeltaEvent, state, exp_up "redactedContent": b"", }, ), + # Tool Use - With thoughtSignature + ( + { + "content": [], + "current_tool_use": { + "toolUseId": "123", + "name": "test", + "input": '{"key": "value"}', + "thoughtSignature": "dGVzdF9zaWduYXR1cmU=", + }, + "text": "", + "reasoningText": "", + "citationsContent": [], + "redactedContent": b"", + }, + { + "content": [ + { + "toolUse": { + "toolUseId": "123", + "name": "test", + "input": {"key": "value"}, + "thoughtSignature": "dGVzdF9zaWduYXR1cmU=", + } + } + ], + "current_tool_use": {}, + "text": "", + "reasoningText": "", + "citationsContent": [], + "redactedContent": b"", + }, + ), # Tool Use - Missing input ( { @@ -1058,3 +1095,161 @@ async def test_stream_messages_normalizes_messages(agenerator, alist): {"content": [{"toolUse": {"name": "INVALID_TOOL_NAME"}}], "role": "assistant"}, {"content": [{"toolUse": {"name": "INVALID_TOOL_NAME"}}], "role": "assistant"}, ] + + +@pytest.mark.asyncio +async def test_process_stream_preserves_thought_signature(agenerator, alist): + """Test that thoughtSignature is preserved through the entire streaming pipeline.""" + response = [ + {"messageStart": {"role": "assistant"}}, + { + "contentBlockStart": { + "start": { + "toolUse": { + "toolUseId": "calculator-123", + "name": "calculator", + "thoughtSignature": "dGVzdF9zaWduYXR1cmVfYnl0ZXM=", + } + } + }, + }, + { + "contentBlockDelta": {"delta": {"toolUse": {"input": '{"expression": "2+2"}'}}}, + }, + {"contentBlockStop": {}}, + { + "messageStop": {"stopReason": "tool_use"}, + }, + { + "metadata": { + "usage": {"inputTokens": 10, "outputTokens": 5, "totalTokens": 15}, + "metrics": {"latencyMs": 100}, + } + }, + ] + + stream = strands.event_loop.streaming.process_stream(agenerator(response)) + + last_event = cast(ModelStopReason, (await alist(stream))[-1]) + message = _get_message_from_event(last_event) + + # Verify the message has the tool use with thoughtSignature preserved + assert len(message["content"]) == 1 + assert "toolUse" in message["content"][0] + tool_use = message["content"][0]["toolUse"] + assert tool_use["toolUseId"] == "calculator-123" + assert tool_use["name"] == "calculator" + assert tool_use["input"] == {"expression": "2+2"} + assert "thoughtSignature" in tool_use + assert tool_use["thoughtSignature"] == "dGVzdF9zaWduYXR1cmVfYnl0ZXM=" + + +@pytest.mark.asyncio +async def test_process_stream_tool_use_without_thought_signature(agenerator, alist): + """Test that tool use works correctly when thoughtSignature is not present.""" + response = [ + {"messageStart": {"role": "assistant"}}, + { + "contentBlockStart": { + "start": { + "toolUse": { + "toolUseId": "calculator-123", + "name": "calculator", + # No thoughtSignature + } + } + }, + }, + { + "contentBlockDelta": {"delta": {"toolUse": {"input": '{"expression": "2+2"}'}}}, + }, + {"contentBlockStop": {}}, + { + "messageStop": {"stopReason": "tool_use"}, + }, + { + "metadata": { + "usage": {"inputTokens": 10, "outputTokens": 5, "totalTokens": 15}, + "metrics": {"latencyMs": 100}, + } + }, + ] + + stream = strands.event_loop.streaming.process_stream(agenerator(response)) + + last_event = cast(ModelStopReason, (await alist(stream))[-1]) + message = _get_message_from_event(last_event) + + # Verify the message has the tool use without thoughtSignature + assert len(message["content"]) == 1 + assert "toolUse" in message["content"][0] + tool_use = message["content"][0]["toolUse"] + assert tool_use["toolUseId"] == "calculator-123" + assert tool_use["name"] == "calculator" + assert tool_use["input"] == {"expression": "2+2"} + assert "thoughtSignature" not in tool_use + + +@pytest.mark.asyncio +async def test_process_stream_multiple_tool_uses_with_thought_signatures(agenerator, alist): + """Test that multiple tool uses each preserve their thoughtSignature.""" + response = [ + {"messageStart": {"role": "assistant"}}, + { + "contentBlockStart": { + "start": { + "toolUse": { + "toolUseId": "tool1", + "name": "calculator", + "thoughtSignature": "c2lnbmF0dXJlMQ==", + } + } + }, + }, + { + "contentBlockDelta": {"delta": {"toolUse": {"input": '{"expression": "2+2"}'}}}, + }, + {"contentBlockStop": {}}, + { + "contentBlockStart": { + "start": { + "toolUse": { + "toolUseId": "tool2", + "name": "weather", + "thoughtSignature": "c2lnbmF0dXJlMg==", + } + } + }, + }, + { + "contentBlockDelta": {"delta": {"toolUse": {"input": '{"city": "SF"}'}}}, + }, + {"contentBlockStop": {}}, + { + "messageStop": {"stopReason": "tool_use"}, + }, + { + "metadata": { + "usage": {"inputTokens": 10, "outputTokens": 5, "totalTokens": 15}, + "metrics": {"latencyMs": 100}, + } + }, + ] + + stream = strands.event_loop.streaming.process_stream(agenerator(response)) + + last_event = cast(ModelStopReason, (await alist(stream))[-1]) + message = _get_message_from_event(last_event) + + # Verify both tool uses have their respective thoughtSignatures + assert len(message["content"]) == 2 + + tool_use1 = message["content"][0]["toolUse"] + assert tool_use1["toolUseId"] == "tool1" + assert tool_use1["name"] == "calculator" + assert tool_use1["thoughtSignature"] == "c2lnbmF0dXJlMQ==" + + tool_use2 = message["content"][1]["toolUse"] + assert tool_use2["toolUseId"] == "tool2" + assert tool_use2["name"] == "weather" + assert tool_use2["thoughtSignature"] == "c2lnbmF0dXJlMg==" diff --git a/tests/strands/models/test_gemini.py b/tests/strands/models/test_gemini.py index a8f5351cc..b312ff532 100644 --- a/tests/strands/models/test_gemini.py +++ b/tests/strands/models/test_gemini.py @@ -251,6 +251,7 @@ async def test_stream_request_with_tool_spec(gemini_client, model, model_id, too ], }, ], + "thinking_config": {"include_thoughts": False}, }, "contents": [], "model": model_id, @@ -258,6 +259,29 @@ async def test_stream_request_with_tool_spec(gemini_client, model, model_id, too gemini_client.aio.models.generate_content_stream.assert_called_with(**exp_request) +@pytest.mark.asyncio +async def test_stream_request_with_tool_spec_sets_thinking_config(gemini_client, model, model_id, tool_spec): + """Test that thinking_config is set to disable thinking text when tools are present.""" + await anext(model.stream([], [tool_spec])) + + # Get the actual call arguments + call_args = gemini_client.aio.models.generate_content_stream.call_args + config = call_args.kwargs.get("config") + + # Verify thinking_config is set correctly + assert config is not None + # Config might be a dict when mocked + if isinstance(config, dict): + assert "thinking_config" in config + thinking_config = config["thinking_config"] + assert thinking_config["include_thoughts"] is False + else: + assert hasattr(config, "thinking_config") + thinking_config = config.thinking_config + assert thinking_config is not None + assert thinking_config.include_thoughts is False + + @pytest.mark.asyncio async def test_stream_request_with_tool_use(gemini_client, model, model_id): messages = [ @@ -299,6 +323,76 @@ async def test_stream_request_with_tool_use(gemini_client, model, model_id): gemini_client.aio.models.generate_content_stream.assert_called_with(**exp_request) +@pytest.mark.asyncio +async def test_stream_request_with_tool_use_and_thought_signature(gemini_client, model, model_id): + """Test that thoughtSignature is properly decoded from base64 and passed to Gemini API.""" + messages = [ + { + "role": "assistant", + "content": [ + { + "toolUse": { + "toolUseId": "c1", + "name": "calculator", + "input": {"expression": "2+2"}, + "thoughtSignature": "YWJjZGVmZ2g=", # base64 encoded "abcdefgh" + }, + }, + ], + }, + ] + await anext(model.stream(messages)) + + # Verify that the call was made - Gemini SDK handles the thought_signature serialization internally + call_args = gemini_client.aio.models.generate_content_stream.call_args + assert call_args is not None + + # Check that the content includes the thought_signature (SDK may serialize it differently) + contents = call_args.kwargs["contents"] + assert len(contents) == 1 + assert len(contents[0]["parts"]) == 1 + part = contents[0]["parts"][0] + assert "function_call" in part + assert part["function_call"]["name"] == "calculator" + # The SDK handles thought_signature internally, verify it's present + assert "thought_signature" in part + + +@pytest.mark.asyncio +async def test_stream_request_with_tool_use_missing_thought_signature(gemini_client, model, model_id): + """Test that missing thoughtSignature is handled gracefully.""" + messages = [ + { + "role": "assistant", + "content": [ + { + "toolUse": { + "toolUseId": "c1", + "name": "calculator", + "input": {"expression": "2+2"}, + # No thoughtSignature + }, + }, + ], + }, + ] + await anext(model.stream(messages)) + + # Verify the call was made without thought_signature when not present + call_args = gemini_client.aio.models.generate_content_stream.call_args + assert call_args is not None + + contents = call_args.kwargs["contents"] + assert len(contents) == 1 + assert len(contents[0]["parts"]) == 1 + part = contents[0]["parts"][0] + assert "function_call" in part + assert part["function_call"]["name"] == "calculator" + # When thoughtSignature is missing, the SDK may omit it entirely + # This is acceptable behavior + assert "thought_signature" not in part or part.get("thought_signature") is None + + @pytest.mark.asyncio async def test_stream_request_with_tool_results(gemini_client, model, model_id): messages = [ @@ -469,6 +563,106 @@ async def test_stream_response_tool_use(gemini_client, model, messages, agenerat assert tru_chunks == exp_chunks +@pytest.mark.asyncio +async def test_stream_response_tool_use_with_thought_signature(gemini_client, model, messages, agenerator, alist): + """Test that thoughtSignature from Gemini response is captured and base64-encoded.""" + gemini_client.aio.models.generate_content_stream.return_value = agenerator( + [ + genai.types.GenerateContentResponse( + candidates=[ + genai.types.Candidate( + content=genai.types.Content( + parts=[ + genai.types.Part( + function_call=genai.types.FunctionCall( + args={"expression": "2+2"}, + id="c1", + name="calculator", + ), + thought_signature=b"test_signature_bytes", # Raw bytes from Gemini + ), + ], + ), + finish_reason="STOP", + ), + ], + usage_metadata=genai.types.GenerateContentResponseUsageMetadata( + prompt_token_count=1, + total_token_count=3, + ), + ), + ] + ) + + tru_chunks = await alist(model.stream(messages)) + exp_chunks = [ + {"messageStart": {"role": "assistant"}}, + {"contentBlockStart": {"start": {}}}, + { + "contentBlockStart": { + "start": { + "toolUse": { + "name": "calculator", + "toolUseId": "calculator", + "thoughtSignature": "dGVzdF9zaWduYXR1cmVfYnl0ZXM=", # base64 encoded + } + } + } + }, + {"contentBlockDelta": {"delta": {"toolUse": {"input": '{"expression": "2+2"}'}}}}, + {"contentBlockStop": {}}, + {"contentBlockStop": {}}, + {"messageStop": {"stopReason": "tool_use"}}, + {"metadata": {"usage": {"inputTokens": 1, "outputTokens": 2, "totalTokens": 3}, "metrics": {"latencyMs": 0}}}, + ] + assert tru_chunks == exp_chunks + + +@pytest.mark.asyncio +async def test_stream_response_tool_use_without_thought_signature(gemini_client, model, messages, agenerator, alist): + """Test that missing thoughtSignature in response is handled gracefully.""" + gemini_client.aio.models.generate_content_stream.return_value = agenerator( + [ + genai.types.GenerateContentResponse( + candidates=[ + genai.types.Candidate( + content=genai.types.Content( + parts=[ + genai.types.Part( + function_call=genai.types.FunctionCall( + args={"expression": "2+2"}, + id="c1", + name="calculator", + ), + # No thought_signature + ), + ], + ), + finish_reason="STOP", + ), + ], + usage_metadata=genai.types.GenerateContentResponseUsageMetadata( + prompt_token_count=1, + total_token_count=3, + ), + ), + ] + ) + + tru_chunks = await alist(model.stream(messages)) + exp_chunks = [ + {"messageStart": {"role": "assistant"}}, + {"contentBlockStart": {"start": {}}}, + {"contentBlockStart": {"start": {"toolUse": {"name": "calculator", "toolUseId": "calculator"}}}}, + {"contentBlockDelta": {"delta": {"toolUse": {"input": '{"expression": "2+2"}'}}}}, + {"contentBlockStop": {}}, + {"contentBlockStop": {}}, + {"messageStop": {"stopReason": "tool_use"}}, + {"metadata": {"usage": {"inputTokens": 1, "outputTokens": 2, "totalTokens": 3}, "metrics": {"latencyMs": 0}}}, + ] + assert tru_chunks == exp_chunks + + @pytest.mark.asyncio async def test_stream_response_reasoning(gemini_client, model, messages, agenerator, alist): gemini_client.aio.models.generate_content_stream.return_value = agenerator( @@ -637,3 +831,104 @@ async def test_stream_handles_non_json_error(gemini_client, model, messages, cap assert "Gemini API returned non-JSON error" in caplog.text assert f"error_message=<{error_message}>" in caplog.text + + +@pytest.mark.asyncio +async def test_stream_request_with_invalid_base64_thought_signature(gemini_client, model, model_id, caplog): + """Test that invalid base64 in thoughtSignature logs error but doesn't crash.""" + messages = [ + { + "role": "assistant", + "content": [ + { + "toolUse": { + "toolUseId": "c1", + "name": "calculator", + "input": {"expression": "2+2"}, + "thoughtSignature": "invalid-base64-data!!!", # Invalid base64 + }, + }, + ], + }, + ] + + with caplog.at_level(logging.ERROR): + await anext(model.stream(messages)) + + # Verify error was logged + assert "failed to decode thoughtSignature" in caplog.text + assert "toolUseId=" in caplog.text + + # Verify the request was still made (graceful degradation) + call_args = gemini_client.aio.models.generate_content_stream.call_args + assert call_args is not None + + # Verify content was formatted despite the error + contents = call_args.kwargs["contents"] + assert len(contents) == 1 + assert len(contents[0]["parts"]) == 1 + part = contents[0]["parts"][0] + assert "function_call" in part + assert part["function_call"]["name"] == "calculator" + # thought_signature should be None when decode fails + assert part.get("thought_signature") is None + + +@pytest.mark.asyncio +async def test_stream_response_tool_use_with_string_thought_signature( + gemini_client, model, messages, agenerator, alist +): + """Test that thoughtSignature as a string (not bytes) is properly converted and encoded.""" + # Create a mock Part with thought_signature as a string instead of bytes + mock_part = genai.types.Part( + function_call=genai.types.FunctionCall( + args={"expression": "3+3"}, + id="c2", + name="calculator", + ) + ) + # Manually set thought_signature as a string (edge case) + mock_part.thought_signature = "string_signature" # String instead of bytes + + gemini_client.aio.models.generate_content_stream.return_value = agenerator( + [ + genai.types.GenerateContentResponse( + candidates=[ + genai.types.Candidate( + content=genai.types.Content( + parts=[mock_part], + ), + finish_reason="STOP", + ), + ], + usage_metadata=genai.types.GenerateContentResponseUsageMetadata( + prompt_token_count=1, + total_token_count=3, + ), + ), + ] + ) + + tru_chunks = await alist(model.stream(messages)) + exp_chunks = [ + {"messageStart": {"role": "assistant"}}, + {"contentBlockStart": {"start": {}}}, + { + "contentBlockStart": { + "start": { + "toolUse": { + "name": "calculator", + "toolUseId": "calculator", + # String should be encoded to bytes, then base64 encoded + "thoughtSignature": "c3RyaW5nX3NpZ25hdHVyZQ==", # base64("string_signature") + } + } + } + }, + {"contentBlockDelta": {"delta": {"toolUse": {"input": '{"expression": "3+3"}'}}}}, + {"contentBlockStop": {}}, + {"contentBlockStop": {}}, + {"messageStop": {"stopReason": "tool_use"}}, + {"metadata": {"usage": {"inputTokens": 1, "outputTokens": 2, "totalTokens": 3}, "metrics": {"latencyMs": 0}}}, + ] + assert tru_chunks == exp_chunks