From b9cc221765b287b43d9e379ed69254a45adca0b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Esbj=C3=B6rner?= Date: Thu, 19 Jun 2025 17:30:11 +0200 Subject: [PATCH 1/2] Building Low-Latency AI Agent Workflows with Amazon Bedrock Prompt Caching Jupyter notebook added --- .../create_agent_with_prompt_caching.ipynb | 1410 +++++++++++++++++ 1 file changed, 1410 insertions(+) create mode 100644 agents-and-function-calling/function-calling/function_calling_with_converse/create_agent_with_prompt_caching.ipynb diff --git a/agents-and-function-calling/function-calling/function_calling_with_converse/create_agent_with_prompt_caching.ipynb b/agents-and-function-calling/function-calling/function_calling_with_converse/create_agent_with_prompt_caching.ipynb new file mode 100644 index 000000000..35f733c15 --- /dev/null +++ b/agents-and-function-calling/function-calling/function_calling_with_converse/create_agent_with_prompt_caching.ipynb @@ -0,0 +1,1410 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "0b03f292", + "metadata": {}, + "source": [ + "# Building Low-Latency AI Agent Workflows with Amazon Bedrock Prompt Caching\n", + "This notebook demonstrates how to implement efficient AI agent workflows using Amazon Bedrock's prompt caching capabilities. As organizations move their AI agent applications from proof-of-concept to production, they face challenges with token consumption, latency, and scaling costs. We'll show how to optimize these aspects without compromising the agent's reasoning capabilities.\n", + "\n", + "## What you'll learn\n", + "- How to identify cacheable components in agent prompts\n", + "- Implementation of cache checkpoints in Amazon Bedrock\n", + "- Performance monitoring and optimization techniques\n", + "- Integration with open source agent frameworks\n", + "\n", + "## Why Prompt Caching Matters\n", + "AI agents typically require significant static portions of prompts (system instructions, tool definitions, response formatting guidelines, etc.) that remain largely unchanged between user requests. Without caching, these static components:\n", + "- Consume substantial tokens with each call\n", + "- Introduce unnecessary processing latency\n", + "- Increase costs at scale\n", + "- Can lead to API throttling and rate limit issues\n", + "\n", + "By implementing prompt caching, we can achieve:\n", + "- Up to 85% reduction in latency\n", + "- Up to 90% cost savings through reduced token processing\n", + "- Improved throughput for handling more concurrent users\n", + "\n", + "## Prerequisites\n", + "- An AWS account with access to Amazon Bedrock\n", + "- Access to Anthropic Claude 3.7 Sonnet model in Amazon Bedrock\n", + "- Basic understanding of LLMs and prompt engineering\n", + "- Python 3.7+\n", + "\n", + "## Execution Instructions\n", + "\n", + "This notebook is designed to be run sequentially from top to bottom. Code cells that create shared utilities need to be executed before the implementation sections. Each implementation section (Part 1 and Part 2) can be run independently after the shared utilities are defined.\n", + "\n", + "Note that some cells show execution outputs from our test runs, but you should execute all cells to see the results in your environment." + ] + }, + { + "cell_type": "markdown", + "id": "a7bbe34c", + "metadata": {}, + "source": [ + "## Setup and Configuration\n", + "First, let's import our required libraries and set up our configurations." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "669f0d75", + "metadata": {}, + "outputs": [], + "source": [ + "!python3 -m pip install --upgrade -q botocore\n", + "!python3 -m pip install --upgrade -q boto3\n", + "!python3 -m pip install --upgrade -q awscli\n", + "!python3 -m pip install --upgrade -q langchain" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "76172ab4", + "metadata": {}, + "outputs": [], + "source": [ + "import boto3\n", + "import logging\n", + "import json\n", + "import time\n", + "import re\n", + "from typing import Dict, List, Optional, Any, Union, Tuple\n", + "print(boto3.__version__)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "426e732c", + "metadata": {}, + "outputs": [], + "source": [ + "# setting logger\n", + "logging.basicConfig(format='[%(asctime)s] line:{%(lineno)d} %(levelname)s - %(message)s', level=logging.INFO)\n", + "logger = logging.getLogger(__name__)" + ] + }, + { + "cell_type": "markdown", + "id": "82889000", + "metadata": {}, + "source": [ + "Now that we have our environment configured, we'll create the core functions needed to interact with Amazon Bedrock. These functions will serve as the foundation for both our cached and non-cached implementations, allowing us to make direct comparisons between the two approaches.\n", + "\n", + "## Shared Util functions\n", + "\n", + "### Bedrock Integration\n", + "Create functions to handle communication with Amazon Bedrock." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "241a04a4", + "metadata": {}, + "outputs": [], + "source": [ + "# Create Bedrock client using default credentials\n", + "session = boto3.Session()\n", + "bedrock = session.client(service_name='bedrock-runtime')\n", + "\n", + "def call_bedrock(\n", + " message_list: List[Dict[str, Any]],\n", + " tool_list: Optional[List[Dict[str, Any]]] = None,\n", + " system_prompt: Optional[List[Dict[str, Any]]] = None\n", + ") -> Dict[str, Any]:\n", + " \"\"\"\n", + " Makes a call to Amazon Bedrock.\n", + "\n", + " Args:\n", + " message_list (List[Dict[str, Any]]): List of conversation messages\n", + " tool_list (List[Dict[str, Any]], optional): List of available tools\n", + " system_prompt (List[Dict[str, Any]], optional): System prompt configuration\n", + "\n", + " Returns:\n", + " Dict[str, Any]: Bedrock response containing the message and usage statistics\n", + " \"\"\"\n", + " try:\n", + " kwargs = {\n", + " 'modelId': \"us.anthropic.claude-3-7-sonnet-20250219-v1:0\",\n", + " 'messages': message_list,\n", + " 'inferenceConfig': {\n", + " \"maxTokens\": 2000,\n", + " \"temperature\": 0\n", + " }\n", + " }\n", + "\n", + " if system_prompt:\n", + " kwargs['system'] = system_prompt\n", + "\n", + " if tool_list:\n", + " kwargs['toolConfig'] = {\"tools\": tool_list}\n", + "\n", + " # Start timing to measure call latency\n", + " start_time = time.time()\n", + "\n", + " response = bedrock.converse(**kwargs)\n", + "\n", + " # Calculate latency\n", + " latency = (time.time() - start_time) * 1000 # Convert to milliseconds\n", + "\n", + " # Add latency to the usage stats so these can used later to analyze the performance\n", + " response['usage']['latency_ms'] = latency\n", + "\n", + " return response\n", + "\n", + " except Exception as e:\n", + " logging.error(f\"Error calling Bedrock: {str(e)}\")\n", + " raise\n" + ] + }, + { + "cell_type": "markdown", + "id": "553ab54d", + "metadata": {}, + "source": [ + "## Performance Analysis Utilities\n", + "Let's create utilities to measure and analyze the performance improvements from prompt caching." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "67ceafe5", + "metadata": {}, + "outputs": [], + "source": [ + "def analyze_performance(usage_stats: List[Dict[str, Union[int, float]]]) -> None:\n", + " \"\"\"\n", + " Analyze and display performance metrics from usage statistics.\n", + "\n", + " Args:\n", + " usage_stats (List[Dict[str, Union[int, float]]]): List of usage statistics from Bedrock responses\n", + " \"\"\"\n", + " # Initialize counters\n", + " total_stats = {\n", + " 'input_tokens': 0,\n", + " 'output_tokens': 0,\n", + " 'total_tokens': 0,\n", + " 'cache_read_tokens': 0,\n", + " 'cache_write_tokens': 0,\n", + " 'total_latency': 0\n", + " }\n", + "\n", + " # Aggregate statistics\n", + " for stats in usage_stats:\n", + " total_stats['input_tokens'] += stats.get('inputTokens', 0)\n", + " total_stats['output_tokens'] += stats.get('outputTokens', 0)\n", + " total_stats['total_tokens'] += stats.get('totalTokens', 0)\n", + " total_stats['cache_read_tokens'] += stats.get('cacheReadInputTokens', 0)\n", + " total_stats['cache_write_tokens'] += stats.get('cacheWriteInputTokens', 0)\n", + " total_stats['total_latency'] += stats.get('latency_ms', 0)\n", + "\n", + " # Calculate total requests (interactions)\n", + " total_interactions = len(usage_stats)\n", + "\n", + " # Calculate averages including latency\n", + " avg_latency = total_stats['total_latency'] / total_interactions if total_interactions > 0 else 0\n", + "\n", + " # Calculate cache effectiveness\n", + " total_token_requests = total_stats['total_tokens'] - total_stats['output_tokens']\n", + " cached_tokens = total_stats['cache_read_tokens']\n", + " # Calculate percentage of tokens that were served from cache\n", + " cache_hit_ratio = (cached_tokens / total_token_requests * 100) if total_token_requests > 0 else 0\n", + "\n", + " # Print formatted results\n", + " print(\"\\nPerformance Summary:\")\n", + " print(\"=\" * 50)\n", + " print(f\"Number of Interactions: {total_interactions}\")\n", + " print(f\"\\nToken Usage:\")\n", + " print(f\"Input Tokens: {total_stats['input_tokens']:,}\")\n", + " print(f\"Cache Read Tokens: {total_stats['cache_read_tokens']:,}\")\n", + " print(f\"Cache Write Tokens: {total_stats['cache_write_tokens']:,}\")\n", + " print(f\"Output Tokens: {total_stats['output_tokens']:,}\")\n", + " print(f\"Total Tokens Processed: {total_stats['total_tokens']:,}\")\n", + "\n", + " print(f\"\\nCache Performance:\")\n", + " print(f\"Cache Hit Ratio: {cache_hit_ratio:.2f}%\")\n", + " print(\"=\" * 50)\n", + "\n", + " print(f\"\\nLatency Performance:\")\n", + " print(f\"Total Latency: {total_stats['total_latency']:.2f} ms\")\n", + " print(f\"Average Latency per Request: {avg_latency:.2f} ms\")\n", + "\n", + " # Print per-interaction breakdown\n", + " print(\"\\nPer-Interaction Breakdown:\")\n", + " print(\"-\" * 50)\n", + " for i, stats in enumerate(usage_stats, 1):\n", + " print(f\"\\nInteraction {i}:\")\n", + " print(f\"Input Tokens: {stats.get('inputTokens', 0):,}\")\n", + " print(f\"Cache Read Tokens: {stats.get('cacheReadInputTokens', 0):,}\")\n", + " print(f\"Cache Write Tokens: {stats.get('cacheWriteInputTokens', 0):,}\")\n", + " print(f\"Output Tokens: {stats.get('outputTokens', 0):,}\")\n", + " print(f\"Total Tokens: {stats.get('totalTokens', 0):,}\")\n", + " print(f\"Latency: {stats.get('latency_ms', 0):.2f} ms\")\n", + "\n", + " # Calculate per-interaction cache hit ratio\n", + " interaction_total = stats.get('totalTokens', 0)\n", + " interaction_cache_reads = stats.get('cacheReadInputTokens', 0)\n", + " interaction_hit_ratio = (interaction_cache_reads / interaction_total * 100) if interaction_total > 0 else 0\n", + " print(f\"Interaction Cache Hit Ratio: {interaction_hit_ratio:.2f}%\")" + ] + }, + { + "cell_type": "markdown", + "id": "dbf4d6a9", + "metadata": {}, + "source": [ + "### BaseConversationManager\n", + "\n", + "The `BaseConversationManager` class serves as the foundation for our conversation handling implementations. By centralizing common functionality in this base class, we avoid repeating the same code across Part 1 and Part 2 of this notebook\n", + "\n", + "In the following sections, we'll implement two different conversation managers that extend this base class:\n", + "- Part1: `ConverseAPIManager`: Uses Amazon Bedrock's native Converse API tool configuration\n", + "- Part2: `FrameworkAgnosticManager`: Uses tool definitions embedded in prompts for compatibility with any LLM framework" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "94b63318", + "metadata": {}, + "outputs": [], + "source": [ + "class BaseConversationManager:\n", + " \"\"\"\n", + " Base class for conversation management with common functionality.\n", + " \"\"\"\n", + " def __init__(\n", + " self,\n", + " tool_function_mappings: Dict[str, callable],\n", + " system_prompt: List[Dict[str, Any]],\n", + " tool_definitions: Optional[List[Dict[str, Any]]] = None,\n", + " max_loops: int = 6\n", + " ):\n", + " self.max_loops = max_loops\n", + " self.tool_definitions = tool_definitions\n", + " self.tool_function_mappings = tool_function_mappings\n", + " self.logger = logging.getLogger(__name__)\n", + " self.system_prompt = system_prompt\n", + "\n", + " def handle_tool_response(self, tool_use_block: Dict[str, Any]) -> str:\n", + " \"\"\"\n", + " Processes tool usage and returns appropriate responses.\n", + "\n", + " Args:\n", + " tool_use_block (Dict[str, Any]): Tool usage information containing name and input parameters\n", + "\n", + " Returns:\n", + " str: Tool execution result\n", + "\n", + " Raises:\n", + " ValueError: If the tool name is not found in the registered tools\n", + " \"\"\"\n", + " try:\n", + " tool_name = tool_use_block['name']\n", + " tool_args = tool_use_block['input']\n", + " self.logger.info(f\"Using tool: {tool_name} with inputs: {tool_args}\")\n", + "\n", + " if tool_name not in self.tool_function_mappings:\n", + " raise ValueError(f\"Tool '{tool_name}' not found\")\n", + " return self.tool_function_mappings[tool_name](**tool_args)\n", + "\n", + " except Exception as e:\n", + " self.logger.error(f\"Tool execution error: {str(e)}\")\n", + " raise\n", + "\n", + " def run_conversation(self, prompt: str) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:\n", + " \"\"\"\n", + " Manages the conversation flow with the assistant.\n", + "\n", + " Args:\n", + " prompt (str): User's question\n", + "\n", + " Returns:\n", + " Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:\n", + " - First element: List of conversation messages\n", + " - Second element: List of usage statistics\n", + " \"\"\"\n", + " message_list = [{\n", + " \"role\": \"user\",\n", + " \"content\": [{\"text\": prompt}]\n", + " }]\n", + " usage_stats = []\n", + " loop_count = 0\n", + "\n", + " while loop_count < self.max_loops:\n", + " try:\n", + " # Get response from Bedrock\n", + " self.logger.debug(f\"Sending message list to Bedrock: {message_list}\")\n", + " response = call_bedrock(\n", + " message_list,\n", + " self.tool_definitions,\n", + " self.system_prompt\n", + " )\n", + "\n", + " # Process response\n", + " response_message = response['output']['message']\n", + " usage_stats.append(response['usage'])\n", + " message_list.append(response_message)\n", + " self.logger.info(f\"response_message: {response_message}\")\n", + "\n", + " # Check for tool usage - this is where implementations differ\n", + " should_continue, tool_response = self._process_tool_usage(response_message)\n", + "\n", + " if not should_continue:\n", + " break\n", + "\n", + " if tool_response:\n", + " message_list.append(tool_response)\n", + "\n", + " self.logger.info(f\"message_list: {message_list}\")\n", + " loop_count += 1\n", + "\n", + " except Exception as e:\n", + " self.logger.error(f\"Conversation error: {str(e)}\")\n", + " raise\n", + "\n", + " return message_list, usage_stats\n", + "\n", + " def _process_tool_usage(self, response_message):\n", + " \"\"\"\n", + " Abstract method to be implemented by subclasses.\n", + " Process tool usage from the response message.\n", + "\n", + " Returns:\n", + " tuple: (should_continue, tool_response_message)\n", + " \"\"\"\n", + " raise NotImplementedError(\"Subclasses must implement this method\")" + ] + }, + { + "cell_type": "markdown", + "id": "bafcfa8c", + "metadata": {}, + "source": [ + "### Base System Prompt Function\n", + "\n", + "The `create_base_system_prompt` function creates the foundation for our HR assistant's personality and capabilities. This function is part of our shared utilities because it's used by both implementation approaches (Converse API and Framework-Agnostic).\n", + "\n", + "#### Purpose:\n", + "- Establishes the HR assistant's core identity, responsibilities, and behavioral guidelines\n", + "- Creates a consistent personality across different implementation approaches\n", + "- Handles the addition of cache points for the system instructions when caching is enabled\n", + "\n", + "The system prompt is intentionally verbose and structured with multiple sections. This is not just for clarity, but also to meet the minimum token requirements for effective caching. Different LLMs have different minimum token thresholds for cache checkpoints (for example, Claude 3.7 Sonnet requires at least 1,024 tokens per checkpoint). By providing detailed instructions in a structured format, we ensure the prompt meets these requirements while also giving the model comprehensive guidance.\n", + "\n", + "By centralizing these instructions in a shared function, we ensure consistency across implementations while making it easier to update the assistant's core capabilities in one place. The function also handles the strategic placement of cache points to optimize token usage when prompt caching is enabled." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "473639b4", + "metadata": {}, + "outputs": [], + "source": [ + "def create_base_system_prompt(caching_activated: bool = False) -> List[Dict[str, Any]]:\n", + " \"\"\"\n", + " Create the base system prompt with HR assistant instructions.\n", + "\n", + " This function generates a structured system prompt that defines the HR assistant's\n", + " personality, responsibilities, and behavioral guidelines. The prompt is intentionally\n", + " verbose to meet minimum token requirements for effective caching.\n", + "\n", + " Args:\n", + " caching_activated (bool): Whether to add cache points to the system prompt\n", + "\n", + " Returns:\n", + " List[Dict[str, Any]]: A list of dictionaries containing text blocks and optional\n", + " cache points. Each dictionary has either a \"text\" key with\n", + " prompt content or a \"cachePoint\" key with cache configuration.\n", + " \"\"\"\n", + "\n", + " system_prompt = [\n", + " {\n", + " \"text\": \"\"\"\n", + " You are an expert HR Virtual Assistant working for a large enterprise organization. Your role is to provide accurate, professional, and empathetic support to employees regarding HR matters, with a focus on leave management and HR policies.\n", + " \n", + "\n", + " \n", + " - Assist employees with leave-related inquiries including:\n", + " - Vacation time\n", + " - Sick leave\n", + " - FMLA (Family and Medical Leave Act)\n", + " - Other types of leave\n", + " - Provide clear explanations of HR policies and procedures\n", + " - Help employees understand their benefits and entitlements\n", + " - Guide employees through HR-related processes\n", + " - Maintain strict confidentiality of all employee information\n", + " \n", + "\n", + " \n", + " - Always maintain a professional, friendly, and empathetic tone\n", + " - Verify employee identity before providing personal information\n", + " - Be clear and concise in your explanations\n", + " - When uncertain, acknowledge limitations and offer to escalate to human HR representatives\n", + " - Use inclusive and respectful language\n", + " - Provide relevant policy references when applicable\n", + " \n", + "\n", + " \n", + " - Prioritize data privacy and confidentiality\n", + " - Focus on accuracy and compliance with company policies\n", + " - Show empathy while maintaining professional boundaries\n", + " - Escalate sensitive situations to human HR representatives\n", + " - Document interactions appropriately\n", + " - Avoid making promises or guarantees about approvals\n", + " \n", + "\n", + " \n", + " - Do not provide legal advice\n", + " - Do not make decisions about policy exceptions\n", + " - Do not discuss other employees' information\n", + " - Do not handle grievances or complaints\n", + " - Do not provide medical advice\n", + " - Do not discuss compensation changes or negotiations\n", + " \n", + "\n", + " \n", + " - Always verify employee identity before accessing personal information\n", + " - Only provide information relevant to the requesting employee\n", + " - Follow data privacy guidelines and GDPR/CCPA compliance requirements\n", + " - Log all sensitive data access appropriately\n", + " \"\"\"\n", + " }\n", + " ]\n", + "\n", + " # Add cache point for the system prompt if caching is activated\n", + " if caching_activated:\n", + " system_prompt.append({\n", + " \"cachePoint\": {\n", + " \"type\": \"default\"\n", + " }\n", + " })\n", + "\n", + " return system_prompt" + ] + }, + { + "cell_type": "markdown", + "id": "489b96f7", + "metadata": {}, + "source": [ + "## Tool Registry\n", + "\n", + "The `ToolRegistry` class serves as a central repository for our HR tools. It handles:\n", + "- Registering tool definitions with their implementation functions\n", + "- Formatting tools appropriately for API calls\n", + "- Managing cache points for tool definitions\n", + "- Converting tool definitions to different formats as needed\n", + "\n", + "This abstraction allows us to maintain consistent tool functionality while adapting the presentation format for different implementation approaches." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "357d3096", + "metadata": {}, + "outputs": [], + "source": [ + "class ToolRegistry:\n", + " \"\"\"Simple tool registry for HR tools that matches Bedrock Agents schema\"\"\"\n", + " def __init__(self):\n", + " self.tools: List[Dict[str, Any]] = []\n", + " self.caching_enabled: bool = False\n", + " self._functions: Dict[str, callable] = {} # Private dictionary to store tool functions\n", + "\n", + " def add_tool(\n", + " self,\n", + " name: str,\n", + " description: str,\n", + " properties: Dict[str, Any],\n", + " function: callable,\n", + " required: Optional[List[str]] = None\n", + " ) -> None:\n", + " \"\"\"\n", + " Add a tool with specified schema and its implementation function.\n", + "\n", + " Args:\n", + " name (str): Name of the tool\n", + " description (str): Tool description\n", + " properties (dict): Schema properties for the tool\n", + " function (callable): Function to be called when tool is invoked\n", + " required (list, optional): List of required parameters\n", + " \"\"\"\n", + " tool = {\n", + " 'toolSpec': {\n", + " 'name': name,\n", + " 'description': description,\n", + " 'inputSchema': {\n", + " 'json': {\n", + " \"type\": \"object\",\n", + " \"properties\": properties,\n", + " }\n", + " }\n", + " },\n", + " }\n", + "\n", + " # Add required fields if specified\n", + " if required:\n", + " tool['toolSpec']['inputSchema']['json']['required'] = required\n", + "\n", + " self.tools.append(tool)\n", + " self._functions[name] = function\n", + "\n", + " def set_caching(self, enabled=False):\n", + " \"\"\"Enable or disable prompt caching for tools\"\"\"\n", + " self.caching_enabled = enabled\n", + "\n", + " def execute_tool(self, name, **kwargs):\n", + " \"\"\"\n", + " Execute a registered tool function.\n", + "\n", + " Args:\n", + " name (str): Name of the tool to execute\n", + " **kwargs: Arguments to pass to the tool function\n", + " \"\"\"\n", + " if name not in self._functions:\n", + " raise ValueError(f\"Tool '{name}' not found\")\n", + " return self._functions[name](**kwargs)\n", + "\n", + " def get_tools(self):\n", + " \"\"\"\n", + " Get list of all registered tools with optional caching configuration.\n", + "\n", + " When caching is enabled (self.caching_enabled = True), appends a cachePoint\n", + " configuration to the tools list. This allows Bedrock to cache static parts\n", + " of the prompt, reducing token usage and latency for subsequent calls.\n", + "\n", + " Returns:\n", + " list: List of tool specifications, optionally including cache configuration\n", + " \"\"\"\n", + " tools = self.tools.copy() # Create a copy to avoid modifying the original list\n", + " if self.caching_enabled:\n", + " tools.append({\n", + " \"cachePoint\": {\n", + " \"type\": \"default\"\n", + " }\n", + " })\n", + " return tools\n", + "\n", + " def get_tools_as_json_string(self):\n", + " \"\"\"Get a JSON string representation of all registered tools.\"\"\"\n", + " tools_json = []\n", + " for tool in self.tools:\n", + " if 'toolSpec' in tool:\n", + " tool_spec = tool['toolSpec']\n", + " properties = tool_spec['inputSchema']['json']['properties']\n", + " required = tool_spec['inputSchema']['json'].get('required', [])\n", + "\n", + " tool_json = {\n", + " \"name\": tool_spec['name'],\n", + " \"description\": tool_spec['description'],\n", + " \"parameters\": {\n", + " \"type\": \"object\",\n", + " \"properties\": properties,\n", + " \"required\": required\n", + " }\n", + " }\n", + " tools_json.append(tool_json)\n", + "\n", + " return \"\\n\" + json.dumps(tools_json, indent=2) + \"\\n\"\n", + "\n", + " def get_tool_function_mapping(self):\n", + " \"\"\"\n", + " Get mapping of tool names to their mapped function.\n", + "\n", + " Returns:\n", + " dict: Dictionary mapping tool names to their corresponding functions\n", + " \"\"\"\n", + " return self._functions.copy()\n" + ] + }, + { + "cell_type": "markdown", + "id": "e065e1ea", + "metadata": {}, + "source": [ + "### HR Tools Implementation\n", + "Now that we've created our ToolRegistry class, let's populate it with specific HR management tools. These tools will demonstrate how to structure tool definitions that benefit from prompt caching while maintaining their functionality." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9d5a8b94", + "metadata": {}, + "outputs": [], + "source": [ + "def create_hr_tools():\n", + " \"\"\"\n", + " Create HR tools with proper schema and functions.\n", + "\n", + " Returns:\n", + " ToolRegistry: A registry containing HR-related tools with their implementations\n", + " \"\"\"\n", + " registry = ToolRegistry()\n", + "\n", + " # Add leave balances tool\n", + " def get_leave_balances(employee_id, leave_type=None, as_of_date=None):\n", + " # Example implementation - in production, this would query a database\n", + " return f\"You have 10 days of vacation, 5 sick days remaining\"\n", + "\n", + " registry.add_tool(\n", + " name='get_leave_balances',\n", + " description='Get all available leave balances for different leave types',\n", + " properties={\n", + " \"employee_id\": {\n", + " \"type\": \"integer\",\n", + " \"description\": \"the id of the employee\"\n", + " },\n", + " \"leave_type\": {\n", + " \"type\": \"string\",\n", + " \"description\": \"specific type of leave to check\",\n", + " \"enum\": [\"vacation\", \"sick\", \"personal\", \"floating_holiday\", \"parental\", \"bereavement\", \"all\"]\n", + " },\n", + " \"as_of_date\": {\n", + " \"type\": \"string\",\n", + " \"description\": \"date to check balances for (defaults to current date)\"\n", + " }\n", + " },\n", + " required=[\"employee_id\"],\n", + " function=get_leave_balances\n", + " )\n", + "\n", + " # Add vacation reservation tool\n", + " def reserve_vacation_time(employee_id, start_date, end_date):\n", + " return f\"Vacation reserved from {start_date} to {end_date}\"\n", + "\n", + " registry.add_tool(\n", + " name='reserve_vacation_time',\n", + " description='reserve vacation time for a specific employee - you need all parameters to reserve vacation time',\n", + " properties={\n", + " \"employee_id\": {\n", + " \"type\": \"integer\",\n", + " \"description\": \"the id of the employee for which time off will be reserved\"\n", + " },\n", + " \"start_date\": {\n", + " \"type\": \"string\",\n", + " \"description\": \"the start date for the vacation time\"\n", + " },\n", + " \"end_date\": {\n", + " \"type\": \"string\",\n", + " \"description\": \"the end date for the vacation time\"\n", + " }\n", + " },\n", + " required=[\"employee_id\", \"start_date\", \"end_date\"],\n", + " function=reserve_vacation_time\n", + " )\n", + "\n", + " # Add leave policy tool\n", + " def get_leave_policy_info(policy_type, employee_id=None, state=None):\n", + " return f\"Leave policy information for {policy_type}\"\n", + "\n", + " registry.add_tool(\n", + " name='get_leave_policy_info',\n", + " description='Retrieve information about leave policies and eligibility',\n", + " properties={\n", + " \"policy_type\": {\n", + " \"type\": \"string\",\n", + " \"description\": \"type of leave policy to query\",\n", + " \"enum\": [\"vacation\", \"sick\", \"fmla\", \"std\", \"ltd\", \"parental\", \"bereavement\", \"all\"]\n", + " },\n", + " \"employee_id\": {\n", + " \"type\": \"integer\",\n", + " \"description\": \"employee id to check eligibility (optional)\"\n", + " },\n", + " \"state\": {\n", + " \"type\": \"string\",\n", + " \"description\": \"state code for state-specific policies\"\n", + " }\n", + " },\n", + " required=[\"policy_type\"],\n", + " function=get_leave_policy_info\n", + " )\n", + "\n", + " return registry" + ] + }, + { + "cell_type": "markdown", + "id": "0bad8b6e", + "metadata": {}, + "source": [ + "Now that we've established our shared utilities, we'll implement two different approaches to prompt caching:\n", + "\n", + "1. **Part 1: Native Converse API** - Using Bedrock's built-in tool configuration\n", + "2. **Part 2: Framework-Agnostic** - Embedding tool definitions in prompts\n", + "\n", + "Each approach demonstrates different integration patterns while achieving similar caching benefits. Let's start with the native Converse API approach." + ] + }, + { + "cell_type": "markdown", + "id": "0c6883d5", + "metadata": {}, + "source": [ + "# Part 1: Prompt Caching with Converse API\n", + "\n", + "With our Bedrock integration in place, we can now implement prompt caching using Amazon Bedrock's Converse API. This approach leverages the native caching capabilities of the API, making it ideal for applications that directly interact with Bedrock." + ] + }, + { + "cell_type": "markdown", + "id": "bdbe1698", + "metadata": {}, + "source": [ + "#### Verify the tool configurations\n", + "Let's test the tools and see how the final tool specification looks like." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6316a83f", + "metadata": {}, + "outputs": [], + "source": [ + "hr_tools = create_hr_tools()\n", + "print(\"Tools without caching:\")\n", + "print(json.dumps(hr_tools.get_tools(), indent=4))\n", + "\n", + "# Now enable caching and see the difference\n", + "hr_tools.set_caching(True)\n", + "print(\"\\nTools with caching enabled:\")\n", + "print(json.dumps(hr_tools.get_tools(), indent=4))" + ] + }, + { + "cell_type": "markdown", + "id": "af1aaf53", + "metadata": {}, + "source": [ + "## System Prompt for Converse API Approach\n", + "\n", + "The system prompt for the Converse API approach focuses on providing the HR assistant's personality and guidelines. It doesn't need to include tool definitions directly because these are provided separately through the Converse API's `toolConfig` parameter.\n", + "\n", + "Key characteristics:\n", + "- Defines the assistant's role and responsibilities\n", + "- Sets interaction guidelines and boundaries\n", + "- Includes cache points for efficient token usage\n", + "- Relies on Bedrock's native tool handling" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a44f9bf5", + "metadata": {}, + "outputs": [], + "source": [ + "def create_converse_api_system_prompt(caching_activated=False):\n", + " \"\"\"Create system prompt for Converse API approach (Part 1)\"\"\"\n", + "\n", + " # Get the base system prompt\n", + " system_prompt = create_base_system_prompt(caching_activated)\n", + "\n", + " # Add Converse API specific instructions\n", + " system_prompt.append({\n", + " \"text\": \"\"\"\n", + " You have access to several tools that can help you assist employees with their HR inquiries.\n", + " Use these tools when you need specific information about leave balances, policies, or to make reservations.\n", + " The system will automatically process your tool requests through the Converse API.\n", + " \"\"\"\n", + " })\n", + "\n", + " # Add cache point for the additional instructions if caching is activated\n", + " if caching_activated:\n", + " system_prompt.append({\n", + " \"cachePoint\": {\n", + " \"type\": \"default\"\n", + " }\n", + " })\n", + "\n", + " return system_prompt" + ] + }, + { + "cell_type": "markdown", + "id": "39ed9bd8", + "metadata": {}, + "source": [ + "## Conversation Handler\n", + "The `ConverseAPIManager` extends our base class to implement conversation handling using Amazon Bedrock's native Converse API tool configuration. This approach leverages the built-in capabilities of the API for managing tool interactions.\n", + "\n", + "### Key Features:\n", + "\n", + "1. **Native Tool Integration**: Uses Bedrock's built-in tool configuration format, allowing the model to directly invoke tools through the API.\n", + "\n", + "2. **Structured Tool Results**: Formats tool results using the `toolResult` structure expected by the Converse API.\n", + "\n", + "3. **Efficient Caching**: Places cache points strategically around tool definitions in the API request structure.\n", + "\n", + "This implementation represents the most direct way to leverage Amazon Bedrock's prompt caching capabilities, making it ideal for applications that interact directly with Bedrock without intermediate frameworks." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "02fdae32", + "metadata": {}, + "outputs": [], + "source": [ + "class ConverseAPIManager(BaseConversationManager):\n", + " \"\"\"\n", + " Conversation manager using native Bedrock Converse API tool configuration.\n", + " \"\"\"\n", + " def _create_tool_result_message(\n", + " self,\n", + " tool_use_id: str,\n", + " content: str,\n", + " status: Optional[str] = None\n", + " ) -> Dict[str, Any]:\n", + " \"\"\"\n", + " Creates a properly formatted tool result message for Converse API.\n", + "\n", + " Args:\n", + " tool_use_id (str): ID of the tool use from the model's response\n", + " content (str): Result content from the tool execution\n", + " status (str, optional): Status of the tool execution (e.g., \"error\")\n", + "\n", + " Returns:\n", + " Dict[str, Any]: A properly formatted message with toolResult structure\n", + " that can be sent back to the model\n", + " \"\"\"\n", + " message = {\n", + " \"role\": \"user\",\n", + " \"content\": [{\n", + " \"toolResult\": {\n", + " \"toolUseId\": tool_use_id,\n", + " \"content\": [{\"text\": content}]\n", + " }\n", + " }]\n", + " }\n", + "\n", + " if status:\n", + " message[\"content\"][0][\"toolResult\"][\"status\"] = status\n", + "\n", + " return message\n", + "\n", + " def _process_tool_usage(self, response_message):\n", + " \"\"\"\n", + " Process tool usage from Converse API response format.\n", + "\n", + " Returns:\n", + " tuple: (should_continue, tool_response_message)\n", + " \"\"\"\n", + " # Check for tool usage in last content item\n", + " last_content = response_message[\"content\"][-1]\n", + " if 'toolUse' not in last_content:\n", + " return False, None\n", + "\n", + " # Handle tool execution\n", + " tool_use_block = last_content['toolUse']\n", + " tool_use_id = tool_use_block['toolUseId']\n", + "\n", + " try:\n", + " tool_response = self.handle_tool_response(tool_use_block)\n", + " return True, self._create_tool_result_message(tool_use_id, tool_response)\n", + " except Exception as e:\n", + " return False, self._create_tool_result_message(\n", + " tool_use_id,\n", + " repr(e),\n", + " status=\"error\"\n", + " )" + ] + }, + { + "cell_type": "markdown", + "id": "3314d7c1", + "metadata": {}, + "source": [ + "## Example Usage without Prompt Caching\n", + "Let's try out our HR agent with a vacation-related query without prompt caching to establish a baseline." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1455a938", + "metadata": {}, + "outputs": [], + "source": [ + "# Set up without caching\n", + "CACHING_ACTIVATED = False\n", + "\n", + "# Initialize the system prompt\n", + "system_prompt = create_converse_api_system_prompt(caching_activated=CACHING_ACTIVATED)\n", + "\n", + "# Initialize registry\n", + "tool_registry = create_hr_tools()\n", + "tool_registry.set_caching(CACHING_ACTIVATED)\n", + "\n", + "# Create conversation handler\n", + "handler = ConverseAPIManager( tool_definitions=tool_registry.get_tools(), tool_function_mappings=tool_registry.get_tool_function_mapping(), system_prompt=system_prompt)\n", + "\n", + "# Run a conversation\n", + "messages, usage_stats = handler.run_conversation(\"How many vacation days do I have? My employee ID is 123\")\n", + "\n", + "print(\"\\n===================== ANSWER =====================\")\n", + "print(messages[-1]['content'][0][\"text\"])\n", + "print(\"==================================================\")\n", + "\n", + "# Display performance metrics\n", + "analyze_performance(usage_stats)" + ] + }, + { + "cell_type": "markdown", + "id": "f2354adb", + "metadata": {}, + "source": [ + "### Understanding the Results\n", + "This example demonstrates the baseline performance without prompt caching enabled. Key observations:\n", + "\n", + "#### Performance Metrics\n", + "1. **Token Usage**\n", + " - Each interaction processes a all input tokens\n", + " - Total token consumption is high due to repeated processing of tool definitions and system prompt\n", + " - No cache hits (0%) as caching is disabled\n", + "\n", + "2. **Latency**\n", + " - No latency improvements between first and subsequent calls\n", + "\n", + "3. **Cache Statistics**\n", + " - Cache read/write tokens: 0 (expected with caching disabled)\n", + " - Cache hit ratio: 0% (all prompts fully processed)\n", + "\n", + "These metrics serve as our baseline for comparing against the cached implementation in the next section." + ] + }, + { + "cell_type": "markdown", + "id": "bedcd9cc", + "metadata": {}, + "source": [ + "## Example Usage with Prompt Caching\n", + "Now let's run the same query with prompt caching enabled to see the performance improvements." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "549206b2", + "metadata": {}, + "outputs": [], + "source": [ + "# Set up with caching\n", + "CACHING_ACTIVATED = True\n", + "\n", + "# Initialize the system prompt with caching\n", + "system_prompt = create_converse_api_system_prompt(caching_activated=CACHING_ACTIVATED)\n", + "\n", + "# Initialize registry with caching\n", + "tool_registry = create_hr_tools()\n", + "tool_registry.set_caching(CACHING_ACTIVATED)\n", + "\n", + "# Create conversation handler\n", + "handler = ConverseAPIManager( tool_definitions=tool_registry.get_tools(), tool_function_mappings=tool_registry.get_tool_function_mapping(), system_prompt=system_prompt)\n", + "\n", + "# Run a conversation\n", + "messages, usage_stats = handler.run_conversation(\"How many vacation days do I have? My employee ID is 123\")\n", + "\n", + "print(\"\\n===================== ANSWER =====================\")\n", + "print(messages[-1]['content'][0][\"text\"])\n", + "print(\"==================================================\")\n", + "\n", + "# Display performance metrics\n", + "analyze_performance(usage_stats)" + ] + }, + { + "cell_type": "markdown", + "id": "28dabf1b", + "metadata": {}, + "source": [ + "### Understanding Cached Performance\n", + "This example demonstrates the performance improvements achieved with prompt caching enabled. Let's analyze the key differences:\n", + "\n", + "#### Performance Improvements\n", + "1. **Token Processing Efficiency**\n", + " - Input tokens reduced dramatically compared to the non-cached version\n", + " - Significant cache hit ratio shows effective reuse of cached prompts\n", + " - Each interaction benefits from cached tokens\n", + "\n", + "2. **Key Benefits**\n", + " - Reduced token consumption for input processing\n", + " - Consistent cache hit ratios\n", + " - Lower costs due to reduced token processing\n", + " - Similar response quality despite reduced token processing\n", + "\n", + "The results demonstrate how prompt caching significantly reduces token processing while maintaining the same quality of responses. This efficiency is particularly valuable for production deployments where cost and performance optimization are crucial." + ] + }, + { + "cell_type": "markdown", + "id": "33f319a2", + "metadata": {}, + "source": [ + "# Part 2: Framework-Agnostic Prompt Caching with Tool Definitions in Prompts\n", + "\n", + "While the Bedrock Converse API provides excellent built-in support for tool definitions and caching, many organizations use open-source frameworks like LangChain, LlamaIndex, or custom solutions. In this section, we'll explore how to implement prompt caching in a framework-agnostic way that works with any LLM framework.\n", + "\n", + "The key difference in this approach is that we'll:\n", + "1. Include tool definitions directly in the system prompt as structured text\n", + "2. Add cache points strategically around these definitions\n", + "3. Parse tool invocations from the LLM's text output\n", + "4. Execute tools and return results in the conversation flow\n", + "\n", + "This approach offers greater flexibility and compatibility with existing agent implementations while still leveraging the performance benefits of prompt caching." + ] + }, + { + "cell_type": "markdown", + "id": "3b443514", + "metadata": {}, + "source": [ + "## Tool Definitions in Prompts\n", + "\n", + "Instead of using the Bedrock Converse API's tool configuration, we'll include tool definitions directly in the system prompt. \n", + "\n", + "Below, we'll create a structured text representation of our HR tools that can be included in the system prompt:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "db258db8", + "metadata": {}, + "outputs": [], + "source": [ + "hr_tools_as_string = hr_tools.get_tools_as_json_string()\n", + "print(hr_tools_as_string)" + ] + }, + { + "cell_type": "markdown", + "id": "a97bc945", + "metadata": {}, + "source": [ + "## Framework-Agnostic Approach with Tool Definitions in Prompts\n", + "\n", + "In this approach, we include tool definitions directly in the system prompt as structured text rather than using the Bedrock Converse API's native tool configuration. This makes our implementation compatible with any LLM framework or direct API calls.\n", + "\n", + "### Key Advantages:\n", + "1. **Universal Compatibility**: Works with any LLM framework that supports system prompts\n", + "2. **Simplified Integration**: Easier to integrate with existing agent implementations\n", + "3. **Consistent Caching Benefits**: Achieves similar token and latency savings\n", + "\n", + "### System Prompt Characteristics:\n", + "- Includes all the same HR assistant guidelines as the Converse API approach\n", + "- Embeds tool definitions directly in the prompt as structured JSON\n", + "- Provides explicit instructions for tool invocation format using `` tags\n", + "- Places cache points strategically around large static sections for optimal caching\n", + "\n", + "The LLM will parse these tool definitions and use them to guide its responses, while the caching mechanism ensures we don't repeatedly process the same static text. This approach is particularly valuable for organizations using multiple LLM frameworks or integrating with existing systems." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9e95b75b", + "metadata": {}, + "outputs": [], + "source": [ + "def create_framework_agnostic_system_prompt(tools_json_string, caching_activated=False):\n", + " \"\"\"Create system prompt for Framework-Agnostic approach\"\"\"\n", + "\n", + " # Get the base system prompt\n", + " system_prompt = create_base_system_prompt(caching_activated)\n", + "\n", + " # Add tool definitions and framework-agnostic instructions\n", + " system_prompt.append({\n", + " \"text\": f\"\"\"\n", + " {tools_json_string}\n", + " \n", + " When you determine that you need to use a tool, you MUST format your response using the following JSON structure:\n", + "\n", + " \n", + " {{\n", + " \"name\": \"tool_name\",\n", + " \"input\": {{\n", + " \"param1\": \"value1\",\n", + " \"param2\": \"value2\"\n", + " }}\n", + " }}\n", + " \n", + "\n", + " After sending this format, the application will:\n", + " 1. Parse your tool call\n", + " 2. Execute the tool with the provided parameters\n", + " 3. Return control to you with the tool's output\n", + " \n", + " \"\"\"\n", + " })\n", + "\n", + " # Add cache point for tools if caching is activated\n", + " if caching_activated:\n", + " system_prompt.append({\n", + " \"cachePoint\": {\n", + " \"type\": \"default\"\n", + " }\n", + " })\n", + "\n", + " return system_prompt" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3f6c7926", + "metadata": {}, + "outputs": [], + "source": [ + "CACHING_ACTIVATED = True\n", + "create_framework_agnostic_system_prompt(hr_tools_as_string, CACHING_ACTIVATED)" + ] + }, + { + "cell_type": "markdown", + "id": "bf26963c", + "metadata": {}, + "source": [ + "## Conversation Handler\n", + "\n", + "The `FrameworkAgnosticManager` extends our base class to implement a framework-agnostic approach to conversation handling. Instead of relying on Bedrock's native tool configuration, this implementation embeds tool definitions directly in the system prompt as structured text.\n", + "\n", + "### Key Features:\n", + "\n", + "1. **Universal Compatibility**: Works with any LLM framework that supports system prompts, not just the Bedrock Converse API.\n", + "\n", + "2. **Text-Based Tool Invocation**: Parses tool calls from the model's text output using regex pattern matching.\n", + "\n", + "3. **Flexible Integration**: Can be adapted to work with open-source frameworks like LangChain, LlamaIndex, or custom solutions.\n", + "\n", + "4. **Prompt-Based Caching**: Demonstrates how to implement prompt caching even when using text-based tool definitions.\n", + "\n", + "This approach offers greater flexibility and compatibility with existing agent implementations while still leveraging the performance benefits of prompt caching. It's particularly valuable for organizations that use multiple LLM frameworks or need to integrate with existing systems." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "21fe1522", + "metadata": {}, + "outputs": [], + "source": [ + "class FrameworkAgnosticManager(BaseConversationManager):\n", + " \"\"\"\n", + " Conversation manager using tool definitions in prompts for framework compatibility.\n", + " \"\"\"\n", + " def _create_tool_result_message(self, content):\n", + " \"\"\"\n", + " Creates a properly formatted tool result message for text-based responses.\n", + " \"\"\"\n", + " return {\n", + " \"role\": \"user\",\n", + " \"content\": [{\n", + " \"text\": content\n", + " }]\n", + " }\n", + "\n", + " def extract_tool_call(self, response_text: Dict[str, str]) -> Optional[Dict[str, Any]]:\n", + " \"\"\"\n", + " Extract tool call JSON from the model's response text.\n", + "\n", + " This method uses regex pattern matching to find and parse tool invocations\n", + " that are formatted as JSON within tags in the model's response.\n", + "\n", + " Args:\n", + " response_text (Dict[str, str]): Content item from the model's response\n", + " containing the 'text' key\n", + "\n", + " Returns:\n", + " Optional[Dict[str, Any]]: Parsed tool call with 'name' and 'input' keys if found,\n", + " None if no tool call is present or parsing fails\n", + " \"\"\"\n", + " tool_call_pattern = r\"\\s*(\\{.*?\\})\\s*\"\n", + " text_content = response_text['text']\n", + "\n", + " match = re.search(tool_call_pattern, text_content, re.DOTALL)\n", + " if match:\n", + " try:\n", + " tool_call_json = match.group(1)\n", + " return json.loads(tool_call_json)\n", + " except json.JSONDecodeError:\n", + " print(\"Failed to parse tool call JSON\")\n", + " return None\n", + " return None\n", + "\n", + " def _process_tool_usage(self, response_message):\n", + " \"\"\"\n", + " Process tool usage from text-based response format.\n", + "\n", + " Returns:\n", + " tuple: (should_continue, tool_response_message)\n", + " \"\"\"\n", + " # Check for tool usage in last content item\n", + " last_content = response_message[\"content\"][-1]\n", + "\n", + " # Check if the response contains a tool call\n", + " tool_call = self.extract_tool_call(last_content)\n", + " if not tool_call:\n", + " return False, None\n", + "\n", + " try:\n", + " tool_response = self.handle_tool_response(tool_call)\n", + " return True, self._create_tool_result_message(tool_response)\n", + " except Exception as e:\n", + " return False, self._create_tool_result_message(repr(e))" + ] + }, + { + "cell_type": "markdown", + "id": "c0de698d", + "metadata": {}, + "source": [ + "To quantify the benefits of prompt caching, we'll analyze key performance metrics from both cached and non-cached implementations. This analysis will help us understand:\n", + "\n", + "1. How much token consumption is reduced\n", + "2. The impact on response latency\n", + "3. The effectiveness of our caching strategy through cache hit ratios\n", + "\n", + "Let's start by establishing a baseline with our non-cached implementation.\n", + "\n", + "## Example Usage without Prompt Caching\n", + "Let's try out our HR agent with a vacation-related query without prompt caching to establish a baseline." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a4578c58", + "metadata": {}, + "outputs": [], + "source": [ + "# Set up without caching\n", + "CACHING_ACTIVATED = False\n", + "\n", + "# Initialize the system prompt\n", + "hr_tools_as_string = hr_tools.get_tools_as_json_string()\n", + "system_prompt = create_framework_agnostic_system_prompt(\n", + " tools_json_string=hr_tools_as_string,\n", + " caching_activated=CACHING_ACTIVATED\n", + ")\n", + "\n", + "# Create conversation handler\n", + "hr_tool_function_mapping = hr_tools.get_tool_function_mapping()\n", + "handler = FrameworkAgnosticManager(tool_function_mappings=hr_tool_function_mapping, system_prompt=system_prompt)\n", + "\n", + "# Run a conversation\n", + "messages, usage_stats = handler.run_conversation(\"How many vacation days do I have? My employee ID is 123\")\n", + "\n", + "print(\"\\n===================== ANSWER =====================\")\n", + "print(messages[-1]['content'][0][\"text\"])\n", + "print(\"==================================================\")\n", + "\n", + "# Display performance metrics\n", + "analyze_performance(usage_stats)" + ] + }, + { + "cell_type": "markdown", + "id": "9ff27530", + "metadata": {}, + "source": [ + "## Example Usage with Prompt Caching\n", + "Now let's run the same query with prompt caching enabled to see the performance improvements." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "803f36cb", + "metadata": {}, + "outputs": [], + "source": [ + "# Set up without caching\n", + "CACHING_ACTIVATED = True\n", + "\n", + "# Initialize the system prompt\n", + "hr_tools_as_string = hr_tools.get_tools_as_json_string()\n", + "system_prompt = create_framework_agnostic_system_prompt(\n", + " tools_json_string=hr_tools_as_string,\n", + " caching_activated=CACHING_ACTIVATED\n", + ")\n", + "\n", + "# Create conversation handler\n", + "hr_tool_function_mapping = hr_tools.get_tool_function_mapping()\n", + "handler = FrameworkAgnosticManager(tool_function_mappings=hr_tool_function_mapping, system_prompt=system_prompt)\n", + "\n", + "# Run a conversation\n", + "messages, usage_stats = handler.run_conversation(\"How many vacation days do I have? My employee ID is 123\")\n", + "\n", + "print(\"\\n===================== ANSWER =====================\")\n", + "print(messages[-1]['content'][0][\"text\"])\n", + "print(\"==================================================\")\n", + "\n", + "# Display performance metrics\n", + "analyze_performance(usage_stats)" + ] + }, + { + "cell_type": "markdown", + "id": "conclusion", + "metadata": {}, + "source": [ + "# Conclusion\n", + "\n", + "In this notebook, we've explored two approaches to implementing prompt caching with Amazon Bedrock:\n", + "\n", + "1. **Direct Bedrock Converse API Integration**: Using cache points directly in the API calls for system prompts and tool definitions\n", + "2. **Open Source Framework Integration**: Creating adapters to use prompt caching with frameworks like LangChain\n", + "\n", + "## Key Takeaways\n", + "\n", + "- **Performance Improvements**: Prompt caching can significantly reduce token usage and latency\n", + "- **Cost Savings**: Fewer tokens processed means lower costs at scale\n", + "- **Flexibility**: Caching can be integrated with both direct API calls and open source frameworks\n", + "- **Monitoring**: Tracking cache hit ratios and latency helps optimize performance\n", + "\n", + "## Best Practices\n", + "\n", + "1. **Identify Static Components**: Look for parts of your prompts that don't change between requests\n", + "2. **Strategic Cache Points**: Place cache points after large static sections like system prompts and tool definitions\n", + "3. **Version Management**: Include version markers before cache points to invalidate caches when tools or prompts change\n", + "4. **Performance Monitoring**: Track cache hit ratios and latency to ensure caching is effective\n", + "\n", + "By implementing these techniques, you can build more efficient, cost-effective AI agent workflows that scale better in production environments." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.2" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} From 72517cf526c5070f37246a15c1c7681590443407 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Esbj=C3=B6rner?= Date: Thu, 19 Jun 2025 17:50:58 +0200 Subject: [PATCH 2/2] removed unused imports, fixed log searlisation security concern --- .../create_agent_with_prompt_caching.ipynb | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/agents-and-function-calling/function-calling/function_calling_with_converse/create_agent_with_prompt_caching.ipynb b/agents-and-function-calling/function-calling/function_calling_with_converse/create_agent_with_prompt_caching.ipynb index 35f733c15..714cde0be 100644 --- a/agents-and-function-calling/function-calling/function_calling_with_converse/create_agent_with_prompt_caching.ipynb +++ b/agents-and-function-calling/function-calling/function_calling_with_converse/create_agent_with_prompt_caching.ipynb @@ -55,10 +55,7 @@ "metadata": {}, "outputs": [], "source": [ - "!python3 -m pip install --upgrade -q botocore\n", - "!python3 -m pip install --upgrade -q boto3\n", - "!python3 -m pip install --upgrade -q awscli\n", - "!python3 -m pip install --upgrade -q langchain" + "!python3 -m pip install --upgrade --quiet boto3" ] }, { @@ -341,7 +338,7 @@ " while loop_count < self.max_loops:\n", " try:\n", " # Get response from Bedrock\n", - " self.logger.debug(f\"Sending message list to Bedrock: {message_list}\")\n", + " self.logger.debug(f\"Sending message list to Bedrock: {json.dumps(message_list)}\")\n", " response = call_bedrock(\n", " message_list,\n", " self.tool_definitions,\n", @@ -363,7 +360,7 @@ " if tool_response:\n", " message_list.append(tool_response)\n", "\n", - " self.logger.info(f\"message_list: {message_list}\")\n", + " self.logger.info(f\"message_list: {json.dumps(message_list)}\")\n", " loop_count += 1\n", "\n", " except Exception as e:\n",