From 182a3ac61f8ab1bfe7448c0767145aa612ed966f Mon Sep 17 00:00:00 2001 From: Anil Kumar Date: Sun, 9 Feb 2025 19:50:09 -0800 Subject: [PATCH] feat: Integrate OpenTelemetry tracing with OpenAI, Gemini, and Azure OpenAI support --- examples/document_processing.py | 81 +++++ examples/openai_client.py | 602 +++++++++++++++----------------- examples/strawberry.py | 82 +++++ javelin_sdk/client.py | 226 +++++++++++- javelin_sdk/tracing_setup.py | 61 ++++ pyproject.toml | 9 + 6 files changed, 716 insertions(+), 345 deletions(-) create mode 100644 examples/document_processing.py create mode 100644 examples/strawberry.py create mode 100644 javelin_sdk/tracing_setup.py diff --git a/examples/document_processing.py b/examples/document_processing.py new file mode 100644 index 0000000..232b2da --- /dev/null +++ b/examples/document_processing.py @@ -0,0 +1,81 @@ +import json +import os +import base64 +import requests +import asyncio +from openai import OpenAI, AsyncOpenAI, AzureOpenAI +from javelin_sdk import JavelinClient, JavelinConfig +from pydantic import BaseModel + +# Environment Variables +openai_api_key = os.getenv("OPENAI_API_KEY") +javelin_api_key = os.getenv('JAVELIN_API_KEY') +gemini_api_key = os.getenv("GEMINI_API_KEY") + +# Initialize Javelin Client +config = JavelinConfig( + base_url="https://api-dev.javelin.live", + # base_url="http://localhost:8000", + javelin_api_key=javelin_api_key, +) +client = JavelinClient(config) + +# Initialize Javelin Client +def initialize_javelin_client(): + javelin_api_key = os.getenv('JAVELIN_API_KEY') + config = JavelinConfig( + base_url="https://api-dev.javelin.live", + javelin_api_key=javelin_api_key, + ) + return JavelinClient(config) + +# Create Gemini client +def create_gemini_client(): + gemini_api_key = os.getenv("GEMINI_API_KEY") + return OpenAI( + api_key=gemini_api_key, + base_url="https://generativelanguage.googleapis.com/v1beta/openai/" + ) + +# Register Gemini client with Javelin +def register_gemini(client, openai_client): + client.register_gemini(openai_client, route_name="openai") + +# Gemini Chat Completions +def gemini_chat_completions(openai_client): + # Read the PDF file in binary mode (Download from https://github.com/run-llama/llama_index/blob/main/docs/docs/examples/data/10k/lyft_2021.pdf) + with open('lyft_2021.pdf', 'rb') as pdf_file: + file_data = base64.b64encode(pdf_file.read()).decode('utf-8') + + response = openai_client.chat.completions.create( + model="gemini-2.0-flash-001", + messages=[ + { + "role": "user", + "content": [ + { + "type": "text", + "text": "What's the net income for 2021?" + }, + { + "type": "file", + "data": file_data, # Base64-encoded data + "mimeType": "application/pdf" + } + ] + } + ] + ) + print(response.model_dump_json(indent=2)) + +def main_sync(): + client = initialize_javelin_client() + openai_client = create_gemini_client() + register_gemini(client, openai_client) + gemini_chat_completions(openai_client) + +def main(): + main_sync() # Run synchronous calls + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/openai_client.py b/examples/openai_client.py index 53ee9b4..179639e 100644 --- a/examples/openai_client.py +++ b/examples/openai_client.py @@ -1,110 +1,91 @@ import json import os -import sys +import base64 +import requests import asyncio -from openai import OpenAI -from openai import AsyncOpenAI -from openai import AzureOpenAI -import dotenv -from javelin_sdk import ( - JavelinClient, - JavelinConfig, -) +from openai import OpenAI, AsyncOpenAI, AzureOpenAI +from javelin_sdk import JavelinClient, JavelinConfig +from pydantic import BaseModel -# Create OpenAI client +# Environment Variables openai_api_key = os.getenv("OPENAI_API_KEY") -openai_client = OpenAI(api_key=openai_api_key) +javelin_api_key = os.getenv('JAVELIN_API_KEY') +gemini_api_key = os.getenv("GEMINI_API_KEY") # Initialize Javelin Client -javelin_api_key = os.getenv('JAVELIN_API_KEY') config = JavelinConfig( base_url="https://api-dev.javelin.live", # base_url="http://localhost:8000", javelin_api_key=javelin_api_key, ) client = JavelinClient(config) -client.register_openai(openai_client, route_name="openai") - -# Call OpenAI endpoints -print("OpenAI: 1 - Chat completions") - -chat_completions = openai_client.chat.completions.create( - model="gpt-3.5-turbo", - messages=[{"role": "user", "content": "What is machine learning?"}], -) -print(chat_completions.model_dump_json(indent=2)) - -print("OpenAI: 2 - Completions") - -completions = openai_client.completions.create( - model="gpt-3.5-turbo-instruct", - prompt="What is machine learning?", - max_tokens=7, - temperature=0 -) -print(completions.model_dump_json(indent=2)) - -print("OpenAI: 3 - Embeddings") - -embeddings = openai_client.embeddings.create( - model="text-embedding-ada-002", - input="The food was delicious and the waiter...", - encoding_format="float" -) -print(embeddings.model_dump_json(indent=2)) - -print("OpenAI: 4 - Streaming") -stream = openai_client.chat.completions.create( - messages=[ - { - "role": "user", - "content": "Say this is a test", - } - ], - model="gpt-4o", - stream=True, -) -for chunk in stream: - print(chunk.choices[0].delta.content or "", end="") - -# Prints two blank lines -print("\n\n") - -print("AsyncOpenAI: 5 - Chat completions") - -# Create AsyncOpenAI client -openai_async_client = AsyncOpenAI( - api_key=os.environ.get("OPENAI_API_KEY"), # This is the default and can be omitted -) - -javelin_api_key = os.getenv('JAVELIN_API_KEY') -config = JavelinConfig( - base_url="https://api-dev.javelin.live", - # base_url="http://localhost:8000", - javelin_api_key=javelin_api_key, -) -client = JavelinClient(config) -client.register_openai(openai_async_client, route_name="openai") +# Initialize Javelin Client +def initialize_javelin_client(): + javelin_api_key = os.getenv('JAVELIN_API_KEY') + config = JavelinConfig( + base_url="https://api-dev.javelin.live", + javelin_api_key=javelin_api_key, + ) + return JavelinClient(config) + +def register_openai_client(): + openai_client = OpenAI(api_key=openai_api_key) + client.register_openai(openai_client, route_name="openai") + return openai_client + +def openai_chat_completions(): + openai_client = register_openai_client() + response = openai_client.chat.completions.create( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "What is machine learning?"}], + ) + print(response.model_dump_json(indent=2)) + +def openai_completions(): + openai_client = register_openai_client() + response = openai_client.completions.create( + model="gpt-3.5-turbo-instruct", + prompt="What is machine learning?", + max_tokens=7, + temperature=0 + ) + print(response.model_dump_json(indent=2)) + +def openai_embeddings(): + openai_client = register_openai_client() + response = openai_client.embeddings.create( + model="text-embedding-ada-002", + input="The food was delicious and the waiter...", + encoding_format="float" + ) + print(response.model_dump_json(indent=2)) -async def main() -> None: - chat_completion = await openai_async_client.chat.completions.create( - messages=[ - { - "role": "user", - "content": "Say this is a test", - } - ], +def openai_streaming_chat(): + openai_client = register_openai_client() + stream = openai_client.chat.completions.create( model="gpt-4o", + messages=[{"role": "user", "content": "Say this is a test"}], + stream=True, ) - print(chat_completion.model_dump_json(indent=2)) + for chunk in stream: + print(chunk.choices[0].delta.content or "", end="") -asyncio.run(main()) +def register_async_openai_client(): + openai_async_client = AsyncOpenAI(api_key=openai_api_key) + client.register_openai(openai_async_client, route_name="openai") + return openai_async_client -''' -print("AsyncOpenAI: 6 - Streaming") +async def async_openai_chat_completions(): + openai_async_client = register_async_openai_client() + response = await openai_async_client.chat.completions.create( + model="gpt-4o", + messages=[{"role": "user", "content": "Say this is a test"}], + ) + print(response.model_dump_json(indent=2)) -async def main(): +async def async_openai_streaming_chat(): + openai_async_client = register_async_openai_client() stream = await openai_async_client.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": "Say this is a test"}], @@ -113,265 +94,234 @@ async def main(): async for chunk in stream: print(chunk.choices[0].delta.content or "", end="") -asyncio.run(main()) -''' - -# Prints two blank lines -print("\n\n") - -# Gemini APIs -gemini_api_key = os.getenv("GEMINI_API_KEY") - -# Create OpenAI client -openai_client = OpenAI( - api_key=gemini_api_key, - base_url="https://generativelanguage.googleapis.com/v1beta/openai/" -) +# Create Gemini client +def create_gemini_client(): + gemini_api_key = os.getenv("GEMINI_API_KEY") + return OpenAI( + api_key=gemini_api_key, + base_url="https://generativelanguage.googleapis.com/v1beta/openai/" + ) -# Initialize Javelin Client -config = JavelinConfig( - base_url="https://api-dev.javelin.live", - # base_url="http://localhost:8000", - javelin_api_key=javelin_api_key, -) -client = JavelinClient(config) -client.register_gemini(openai_client, route_name="openai") +# Register Gemini client with Javelin +def register_gemini(client, openai_client): + client.register_gemini(openai_client, route_name="openai") + +# Function to download and encode the image +def encode_image_from_url(image_url): + response = requests.get(image_url) + if response.status_code == 200: + return base64.b64encode(response.content).decode('utf-8') + else: + raise Exception(f"Failed to download image: {response.status_code}") + +# Gemini Chat Completions +def gemini_chat_completions(openai_client): + response = openai_client.chat.completions.create( + model="gemini-1.5-flash", + n=1, + messages=[ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "Explain to me how AI works"} + ] + ) + print(response.model_dump_json(indent=2)) -print("Gemini: 1 - Chat completions") +# Gemini Streaming Chat Completions +def gemini_streaming_chat(openai_client): + response = openai_client.chat.completions.create( + model="gemini-1.5-flash", + messages=[ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "Hello!"} + ], + stream=True + ) + for chunk in response: + print(chunk.choices[0].delta) -response = openai_client.chat.completions.create( - model="gemini-1.5-flash", - n=1, - messages=[ - {"role": "system", "content": "You are a helpful assistant."}, +# Gemini Function Calling +def gemini_function_calling(openai_client): + tools = [ { - "role": "user", - "content": "Explain to me how AI works" + "type": "function", + "function": { + "name": "get_weather", + "description": "Get the weather in a given location", + "parameters": { + "type": "object", + "properties": { + "location": {"type": "string", "description": "The city and state, e.g. Chicago, IL"}, + "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}, + }, + "required": ["location"], + }, + } } ] -) - -# print(response.choices[0].message) -print(response.model_dump_json(indent=2)) - -print("Gemini: 2 - Streaming") - -response = openai_client.chat.completions.create( - model="gemini-1.5-flash", - messages=[ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": "Hello!"} - ], - stream=True -) - -for chunk in response: - print(chunk.choices[0].delta) - -print("Gemini: 3 - Function calling") - -tools = [ - { - "type": "function", - "function": { - "name": "get_weather", - "description": "Get the weather in a given location", - "parameters": { - "type": "object", - "properties": { - "location": { - "type": "string", - "description": "The city and state, e.g. Chicago, IL", - }, - "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}, - }, - "required": ["location"], - }, - } - } -] - -messages = [{"role": "user", "content": "What's the weather like in Chicago today?"}] -response = openai_client.chat.completions.create( - model="gemini-1.5-flash", - messages=messages, - tools=tools, - tool_choice="auto" -) - -print(response.model_dump_json(indent=2)) - -''' -print ("Gemini: 4 - Image understanding") - -import base64 - -# Function to encode the image -def encode_image(image_path): - with open(image_path, "rb") as image_file: - return base64.b64encode(image_file.read()).decode('utf-8') - -# Getting the base64 string -base64_image = encode_image("Path/to/agi/image.jpeg") - -response = client.chat.completions.create( - model="gemini-1.5-flash", - messages=[ - { - "role": "user", - "content": [ - { - "type": "text", - "text": "What is in this image?", - }, - { - "type": "image_url", - "image_url": { - "url": f"data:image/jpeg;base64,{base64_image}" - }, - }, - ], - } - ], -) - -print(response.model_dump_json(indent=2)) -''' - -print("Gemini: 5 - Structured output") - -from pydantic import BaseModel - -class CalendarEvent(BaseModel): - name: str - date: str - participants: list[str] - -completion = openai_client.beta.chat.completions.parse( - model="gemini-1.5-flash", - messages=[ - {"role": "system", "content": "Extract the event information."}, - {"role": "user", "content": "John and Susan are going to an AI conference on Friday."}, - ], - response_format=CalendarEvent, -) - -print(completion.model_dump_json(indent=2)) - -print("Gemini: 6 - Embeddings") - -response = openai_client.embeddings.create( - input="Your text string goes here", - model="text-embedding-004" -) - -print(response.model_dump_json(indent=2)) - -# Prints two blank lines -print("\n\n") - -''' -print("Azure OpenAI: 1 - Chat completions") - -# Create AzureOpenAI client -# gets the API Key from environment variable AZURE_OPENAI_API_KEY -openai_client = AzureOpenAI( - # https://learn.microsoft.com/azure/ai-services/openai/reference#rest-api-versioning - api_version="2023-07-01-preview", - # https://learn.microsoft.com/azure/cognitive-services/openai/how-to/create-resource?pivots=web-portal#create-a-resource - azure_endpoint="https://javelinpreview.openai.azure.com", -) -# Initialize Javelin Client -config = JavelinConfig( - # base_url="https://api-dev.javelin.live", - base_url="http://localhost:8000", - javelin_api_key=javelin_api_key, -) -client = JavelinClient(config) -client.register_azureopenai(openai_client, route_name="openai") + messages = [{"role": "user", "content": "What's the weather like in Chicago today?"}] + response = openai_client.chat.completions.create( + model="gemini-1.5-flash", + messages=messages, + tools=tools, + tool_choice="auto" + ) + print(response.model_dump_json(indent=2)) -completion = openai_client.chat.completions.create( - model="gpt-4o-mini", # e.g. gpt-35-instant - messages=[ - { - "role": "user", - "content": "How do I output all files in a directory using Python?", - }, - ], -) -print(completion.to_json()) -''' +# Gemini Image Understanding +def gemini_image_understanding(openai_client): + image_url = "https://storage.googleapis.com/cloud-samples-data/generative-ai/image/scones.jpg" + base64_image = encode_image_from_url(image_url) -''' -# print("DeepSeek: 1 - Chat completions") + response = openai_client.chat.completions.create( + model="gemini-1.5-flash", + messages=[ + {"role": "user", "content": [ + {"type": "text", "text": "What is in this image?"}, + {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}}, + ]} + ] + ) + print(response.model_dump_json(indent=2)) -deepseek_api_key = os.getenv("DEEPSEEK_API_KEY") +# Gemini Structured Output +def gemini_structured_output(openai_client): + class CalendarEvent(BaseModel): + name: str + date: str + participants: list[str] -# Create OpenAI client -openai_client = OpenAI(api_key=deepseek_api_key, base_url="https://api.deepseek.com") + completion = openai_client.beta.chat.completions.parse( + model="gemini-1.5-flash", + messages=[ + {"role": "system", "content": "Extract the event information."}, + {"role": "user", "content": "John and Susan are going to an AI conference on Friday."} + ], + response_format=CalendarEvent, + ) + print(completion.model_dump_json(indent=2)) -# Initialize Javelin Client -config = JavelinConfig( - # base_url="https://api-dev.javelin.live", - base_url="http://localhost:8000", - javelin_api_key=javelin_api_key, -) +# Gemini Embeddings +def gemini_embeddings(openai_client): + response = openai_client.embeddings.create( + input="Your text string goes here", + model="text-embedding-004" + ) + print(response.model_dump_json(indent=2)) + +# Create Azure OpenAI client +def create_azureopenai_client(): + azure_api_key = os.getenv("AZURE_OPENAI_API_KEY") + return AzureOpenAI( + api_version="2023-07-01-preview", + azure_endpoint="https://javelinpreview.openai.azure.com" + ) -# client = JavelinClient(config) -# client.register_deepseek(openai_client, route_name="openai") +# Register Azure OpenAI client with Javelin +def register_azureopenai(client, openai_client): + client.register_azureopenai(openai_client, route_name="openai") -response = openai_client.chat.completions.create( - model="deepseek-chat", - messages=[ - {"role": "system", "content": "You are a helpful assistant"}, - {"role": "user", "content": "Hello"}, - ], - stream=False -) +# Azure OpenAI Scenario +def azure_openai_chat_completions(openai_client): + response = openai_client.chat.completions.create( + model="gpt-4o-mini", + messages=[{"role": "user", "content": "How do I output all files in a directory using Python?"}] + ) + print(response.model_dump_json(indent=2)) + +# Create DeepSeek client +def create_deepseek_client(): + deepseek_api_key = os.getenv("DEEPSEEK_API_KEY") + return OpenAI( + api_key=deepseek_api_key, + base_url="https://api.deepseek.com" + ) -print(response.to_json()) +# Register DeepSeek client with Javelin +def register_deepseek(client, openai_client): + client.register_deepseek(openai_client, route_name="openai") -print("DeepSeek: 2 - Reasoning Model") +# DeepSeek Chat Completions +def deepseek_chat_completions(openai_client): + response = openai_client.chat.completions.create( + model="deepseek-chat", + messages=[ + {"role": "system", "content": "You are a helpful assistant"}, + {"role": "user", "content": "Hello"} + ], + stream=False + ) + print(response.model_dump_json(indent=2)) -deepseek_api_key = os.getenv("DEEPSEEK_API_KEY") -openai_client = OpenAI(api_key=deepseek_api_key, base_url="https://api.deepseek.com") +# DeepSeek Reasoning Model +def deepseek_reasoning_model(): + deepseek_api_key = os.getenv("DEEPSEEK_API_KEY") + openai_client = OpenAI(api_key=deepseek_api_key, base_url="https://api.deepseek.com") -# Round 1 -messages = [{"role": "user", "content": "9.11 and 9.8, which is greater?"}] -response = openai_client.chat.completions.create( - model="deepseek-reasoner", - messages=messages -) -print(response.to_json()) + # Round 1 + messages = [{"role": "user", "content": "9.11 and 9.8, which is greater?"}] + response = openai_client.chat.completions.create(model="deepseek-reasoner", messages=messages) + print(response.to_json()) -reasoning_content = response.choices[0].message.reasoning_content -content = response.choices[0].message.content + content = response.choices[0].message.content -# Round 2 -messages.append({'role': 'assistant', 'content': content}) -messages.append({'role': 'user', 'content': "How many Rs are there in the word 'strawberry'?"}) -response = openai_client.chat.completions.create( - model="deepseek-reasoner", - messages=messages -) + # Round 2 + messages.append({"role": "assistant", "content": content}) + messages.append({"role": "user", "content": "How many Rs are there in the word 'strawberry'?"}) + response = openai_client.chat.completions.create(model="deepseek-reasoner", messages=messages) -print(response.to_json()) + print(response.to_json()) -''' +# Mistral Chat Completions +def mistral_chat_completions(): + mistral_api_key = os.getenv("MISTRAL_API_KEY") + openai_client = OpenAI(api_key=mistral_api_key, base_url="https://api.mistral.ai/v1") -''' -# Create OpenAI client -mistral_api_key = os.getenv("MISTRAL_API_KEY") -openai_client = OpenAI(api_key=mistral_api_key, base_url="https://api.mistral.ai/v1") -chat_response = openai_client.chat.completions.create( - model="mistral-large-latest", - messages = [ - { - "role": "user", - "content": "What is the best French cheese?", - }, - ] -) -print(chat_response.to_json()) -''' + chat_response = openai_client.chat.completions.create( + model="mistral-large-latest", + messages=[{"role": "user", "content": "What is the best French cheese?"}] + ) + print(chat_response.to_json()) + +def main_sync(): + openai_chat_completions() + openai_completions() + openai_embeddings() + openai_streaming_chat() + + client = initialize_javelin_client() + openai_client = create_gemini_client() + register_gemini(client, openai_client) + gemini_chat_completions(openai_client) + gemini_streaming_chat(openai_client) + gemini_function_calling(openai_client) + gemini_image_understanding(openai_client) + gemini_structured_output(openai_client) + gemini_embeddings(openai_client) + + client = initialize_javelin_client() + openai_client = create_azureopenai_client() + register_azureopenai(client, openai_client) + azure_openai_chat_completions(openai_client) + + client = initialize_javelin_client() + openai_client = create_deepseek_client() + register_deepseek(client, openai_client) + deepseek_chat_completions(openai_client) + + # deepseek_reasoning_model() + + mistral_chat_completions() + +async def main_async(): + await async_openai_chat_completions() + print("\n") + await async_openai_streaming_chat() + print("\n") + +def main(): + main_sync() # Run synchronous calls + # asyncio.run(main_async()) # Run asynchronous calls within a single event loop + +if __name__ == "__main__": + main() diff --git a/examples/strawberry.py b/examples/strawberry.py new file mode 100644 index 0000000..afa6e10 --- /dev/null +++ b/examples/strawberry.py @@ -0,0 +1,82 @@ + +import json +import os +import base64 +import requests +import asyncio +from openai import OpenAI, AsyncOpenAI, AzureOpenAI +from javelin_sdk import JavelinClient, JavelinConfig +from pydantic import BaseModel + +# Environment Variables +openai_api_key = os.getenv("OPENAI_API_KEY") +javelin_api_key = os.getenv('JAVELIN_API_KEY') +gemini_api_key = os.getenv("GEMINI_API_KEY") + +# Initialize Javelin Client +config = JavelinConfig( + base_url="https://api-dev.javelin.live", + # base_url="http://localhost:8000", + javelin_api_key=javelin_api_key, +) +client = JavelinClient(config) + +def register_openai_client(): + openai_client = OpenAI(api_key=openai_api_key) + client.register_openai(openai_client, route_name="openai") + return openai_client + +def openai_chat_completions(): + openai_client = register_openai_client() + response = openai_client.chat.completions.create( + model="o1-mini", + messages=[{"role": "user", "content": "How many Rs are there in the word 'strawberry', 'retriever', 'mulberry', 'refrigerator'?"}], + ) + print(response.model_dump_json(indent=2)) + +# Initialize Javelin Client +def initialize_javelin_client(): + javelin_api_key = os.getenv('JAVELIN_API_KEY') + config = JavelinConfig( + base_url="https://api-dev.javelin.live", + javelin_api_key=javelin_api_key, + ) + return JavelinClient(config) + +# Create Gemini client +def create_gemini_client(): + gemini_api_key = os.getenv("GEMINI_API_KEY") + return OpenAI( + api_key=gemini_api_key, + base_url="https://generativelanguage.googleapis.com/v1beta/openai/" + ) + +# Register Gemini client with Javelin +def register_gemini(client, openai_client): + client.register_gemini(openai_client, route_name="openai") + +# Gemini Chat Completions +def gemini_chat_completions(openai_client): + response = openai_client.chat.completions.create( + model="gemini-2.0-pro-exp", + n=1, + messages=[ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "How many Rs are there in the word 'strawberry', 'retriever', 'mulberry', 'refrigerator'?"} + ] + ) + print(response.model_dump_json(indent=2)) + +def main_sync(): + openai_chat_completions() + + client = initialize_javelin_client() + openai_client = create_gemini_client() + register_gemini(client, openai_client) + gemini_chat_completions(openai_client) + +def main(): + main_sync() # Run synchronous calls + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/javelin_sdk/client.py b/javelin_sdk/client.py index 026a73b..b2aaef6 100644 --- a/javelin_sdk/client.py +++ b/javelin_sdk/client.py @@ -1,3 +1,4 @@ +import json import functools from typing import Any, Coroutine, Dict, Optional, Union from urllib.parse import urljoin, urlparse, urlunparse, unquote @@ -13,7 +14,11 @@ from javelin_sdk.services.template_service import TemplateService from javelin_sdk.services.trace_service import TraceService -# from openai import OpenAI, AsyncOpenAI +from javelin_sdk.tracing_setup import configure_span_exporter +import inspect +from opentelemetry.trace import SpanKind +from opentelemetry.semconv._incubating.attributes import gen_ai_attributes +import openai API_BASEURL = "https://api-dev.javelin.live" API_BASE_PATH = "/v1" @@ -26,13 +31,36 @@ class JavelinClient: PROFILE_ARN_PATTERN = re.compile(r'/model/arn:aws:bedrock:[^:]+:\d+:application-inference-profile/[^/]+') MODEL_ARN_PATTERN = re.compile(r'/model/arn:aws:bedrock:[^:]+::foundation-model/[^/]+') + # Mapping provider_name to well-known gen_ai.system values + GEN_AI_SYSTEM_MAPPING = { + "openai": "openai", + "azureopenai": "az.ai.openai", + "bedrock": "aws.bedrock", + "gemini": "gemini", + "deepseek": "deepseek", + "cohere": "cohere", + "mistral_ai": "mistral_ai", + "anthropic": "anthropic", + "vertex_ai": "vertex_ai", + "perplexity": "perplexity", + "groq": "groq", + "ibm": "ibm.watsonx.ai", + "xai": "xai" + } + + # Mapping method names to well-known operation names + GEN_AI_OPERATION_MAPPING = { + "chat.completions.create": "chat", + "completions.create": "text_completion", + "embeddings.create": "embeddings" + } def __init__(self, config: JavelinConfig) -> None: self.config = config self.base_url = urljoin(config.base_url, config.api_version or "/v1") - + self._headers = { - "x-api-key": config.javelin_api_key, + "x-api-key": config.javelin_api_key } if config.llm_api_key: self._headers["Authorization"] = f"Bearer {config.llm_api_key}" @@ -58,6 +86,13 @@ def __init__(self, config: JavelinConfig) -> None: self.chat = Chat(self) self.completions = Completions(self) + self.tracer = configure_span_exporter() + + self.patched_clients = set() # Track already patched clients + self.patched_methods = set() # Track already patched methods + + self.original_methods = {} + @property def client(self): if self._client is None: @@ -96,6 +131,19 @@ def close(self): if self._client: self._client.close() + @staticmethod + def set_span_attribute_if_not_none(span, key, value): + """Helper function to set span attributes only if the value is not None.""" + if value is not None: + span.set_attribute(key, value) + + @staticmethod + def add_event_with_attributes(span, event_name, attributes): + """Helper function to add events only with non-None attributes.""" + filtered_attributes = {k: v for k, v in attributes.items() if v is not None} + if filtered_attributes: # Add event only if there are valid attributes + span.add_event(name=event_name, attributes=filtered_attributes) + def register_provider(self, openai_client: Any, provider_name: str, @@ -108,6 +156,12 @@ def register_provider(self, - openai_client._custom_headers to include self._headers """ + client_id = id(openai_client) + if client_id in self.patched_clients: + return openai_client # Skip if already patched + + self.patched_clients.add(client_id) # Mark as patched + # Store the OpenAI base URL if self.openai_base_url is None: self.openai_base_url = openai_client.base_url @@ -125,29 +179,163 @@ def register_provider(self, openai_client._custom_headers["x-javelin-provider"] = base_url_str openai_client._custom_headers["x-javelin-route"] = route_name - # Store references to the original methods - original_methods = { - "chat_completions_create": openai_client.chat.completions.create, - "completions_create": openai_client.completions.create, - "embeddings_create": openai_client.embeddings.create, - } + # Store the original methods only if not already stored + if provider_name not in self.original_methods: + self.original_methods[provider_name] = { + "chat_completions_create": openai_client.chat.completions.create, + "completions_create": openai_client.completions.create, + "embeddings_create": openai_client.embeddings.create, + } # Patch methods with tracing and header updates - def create_patched_method(original_method): - def patched_method(*args, **kwargs): - model = kwargs.get('model') - if model and hasattr(openai_client, "_custom_headers"): - openai_client._custom_headers['x-javelin-model'] = model + def create_patched_method(method_name, original_method): + # Check if the original method is asynchronous + if inspect.iscoroutinefunction(original_method): + # Async Patched Method + async def patched_method(*args, **kwargs): + return await _execute_with_tracing(original_method, method_name, args, kwargs) + else: + # Sync Patched Method + def patched_method(*args, **kwargs): + return _execute_with_tracing(original_method, method_name, args, kwargs) + + return patched_method + + def _execute_with_tracing(original_method, method_name, args, kwargs): + model = kwargs.get('model') + + if model and hasattr(openai_client, "_custom_headers"): + openai_client._custom_headers['x-javelin-model'] = model + # Use well-known operation names, fallback to method_name if not mapped + operation_name = self.GEN_AI_OPERATION_MAPPING.get(method_name, method_name) + system_name = self.GEN_AI_SYSTEM_MAPPING.get(provider_name, provider_name) # Fallback if provider is custom + span_name = f"{operation_name} {model}" + + async def _async_execution(span): + response = await original_method(*args, **kwargs) + _capture_response_details(span, response, kwargs, system_name) + return response + + def _sync_execution(span): response = original_method(*args, **kwargs) + _capture_response_details(span, response, kwargs, system_name) return response - return patched_method + # Only create spans if tracing is enabled + if self.tracer: + with self.tracer.start_as_current_span(span_name, kind=SpanKind.CLIENT) as span: + span.set_attribute(gen_ai_attributes.GEN_AI_SYSTEM, system_name) + span.set_attribute(gen_ai_attributes.GEN_AI_OPERATION_NAME, operation_name) + span.set_attribute(gen_ai_attributes.GEN_AI_REQUEST_MODEL, model) + + # Request attributes + JavelinClient.set_span_attribute_if_not_none(span, gen_ai_attributes.GEN_AI_REQUEST_MAX_TOKENS, kwargs.get('max_completion_tokens')) + JavelinClient.set_span_attribute_if_not_none(span, gen_ai_attributes.GEN_AI_REQUEST_PRESENCE_PENALTY, kwargs.get('presence_penalty')) + JavelinClient.set_span_attribute_if_not_none(span, gen_ai_attributes.GEN_AI_REQUEST_FREQUENCY_PENALTY, kwargs.get('frequency_penalty')) + JavelinClient.set_span_attribute_if_not_none(span, gen_ai_attributes.GEN_AI_REQUEST_STOP_SEQUENCES, json.dumps(kwargs.get('stop', [])) if kwargs.get('stop') else None) + JavelinClient.set_span_attribute_if_not_none(span, gen_ai_attributes.GEN_AI_REQUEST_TEMPERATURE, kwargs.get('temperature')) + JavelinClient.set_span_attribute_if_not_none(span, gen_ai_attributes.GEN_AI_REQUEST_TOP_K, kwargs.get('top_k')) + JavelinClient.set_span_attribute_if_not_none(span, gen_ai_attributes.GEN_AI_REQUEST_TOP_P, kwargs.get('top_p')) + + try: + if inspect.iscoroutinefunction(original_method): + return asyncio.run(_async_execution(span)) + else: + return _sync_execution(span) + except Exception as e: + span.set_attribute("error", True) + span.set_attribute("error.message", str(e)) + raise + else: + # Tracing is disabled + if inspect.iscoroutinefunction(original_method): + return asyncio.run(original_method(*args, **kwargs)) + else: + return original_method(*args, **kwargs) + + # Helper to capture response details + def _capture_response_details(span, response, kwargs, system_name): + if hasattr(response, "to_json"): + response_data = response.to_dict() + + # Set basic response attributes + JavelinClient.set_span_attribute_if_not_none(span, gen_ai_attributes.GEN_AI_RESPONSE_MODEL, response_data.get('model')) + JavelinClient.set_span_attribute_if_not_none(span, gen_ai_attributes.GEN_AI_RESPONSE_ID, response_data.get('id')) + JavelinClient.set_span_attribute_if_not_none(span, gen_ai_attributes.GEN_AI_OPENAI_REQUEST_SERVICE_TIER, response_data.get('service_tier')) + JavelinClient.set_span_attribute_if_not_none(span, gen_ai_attributes.GEN_AI_OPENAI_RESPONSE_SYSTEM_FINGERPRINT, response_data.get('system_fingerprint')) + + # Finish reasons for choices + finish_reasons = [ + choice.get('finish_reason') + for choice in response_data.get('choices', []) + if choice.get('finish_reason') + ] + JavelinClient.set_span_attribute_if_not_none( + span, + gen_ai_attributes.GEN_AI_RESPONSE_FINISH_REASONS, + json.dumps(finish_reasons) if finish_reasons else None + ) + + # Token usage + usage = response_data.get('usage', {}) + JavelinClient.set_span_attribute_if_not_none(span, gen_ai_attributes.GEN_AI_USAGE_INPUT_TOKENS, usage.get('prompt_tokens')) + JavelinClient.set_span_attribute_if_not_none(span, gen_ai_attributes.GEN_AI_USAGE_OUTPUT_TOKENS, usage.get('completion_tokens')) + + # System message event + system_message = next( + (msg.get('content') for msg in kwargs.get('messages', []) if msg.get('role') == 'system'), + None + ) + JavelinClient.add_event_with_attributes(span, "gen_ai.system.message", {"gen_ai.system": system_name, "content": system_message}) + + # User message event + user_message = next( + (msg.get('content') for msg in kwargs.get('messages', []) if msg.get('role') == 'user'), + None + ) + JavelinClient.add_event_with_attributes(span, "gen_ai.user.message", {"gen_ai.system": system_name, "content": user_message}) + + # Choice events + choices = response_data.get('choices', []) + for index, choice in enumerate(choices): + choice_attributes = {"gen_ai.system": system_name, "index": index} + message = choice.pop("message", {}) + choice.update(message) + + # Add attributes dynamically, filtering out None values + for key, value in choice.items(): + if isinstance(value, (dict, list)): + value = json.dumps(value) + choice_attributes[key] = value if value is not None else None + + JavelinClient.add_event_with_attributes(span, "gen_ai.choice", choice_attributes) + + else: + span.set_attribute("javelin.response.body", str(response)) + + + def get_nested_attr(obj, attr_path): + attrs = attr_path.split(".") + for attr in attrs: + obj = getattr(obj, attr) + return obj + + for method_name in ["chat.completions.create", "completions.create", "embeddings.create"]: + method_ref = get_nested_attr(openai_client, method_name) + method_id = id(method_ref) + + if method_id in self.patched_methods: + continue # Skip if already patched + + original_method = self.original_methods[provider_name][method_name.replace(".", "_")] + patched_method = create_patched_method(method_name, original_method) + + parent_attr, method_attr = method_name.rsplit(".", 1) + parent_obj = get_nested_attr(openai_client, parent_attr) + setattr(parent_obj, method_attr, patched_method) - # Apply patches - openai_client.chat.completions.create = create_patched_method(original_methods["chat_completions_create"]) - openai_client.completions.create = create_patched_method(original_methods["completions_create"]) - openai_client.embeddings.create = create_patched_method(original_methods["embeddings_create"]) + self.patched_methods.add(method_id) return openai_client diff --git a/javelin_sdk/tracing_setup.py b/javelin_sdk/tracing_setup.py new file mode 100644 index 0000000..361c8e4 --- /dev/null +++ b/javelin_sdk/tracing_setup.py @@ -0,0 +1,61 @@ +# javelin_sdk/tracing_setup.py +from opentelemetry import trace +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +# from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +# Use the HTTP exporter instead of the gRPC one +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +# from opentelemetry.instrumentation.botocore import BotocoreInstrumentor +import os + +# --- OpenTelemetry Setup --- +# TRACES_ENDPOINT = os.getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", "https://api-dev.javelin.live/v1/admin/traces") +# TRACES_ENDPOINT = os.getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", "https://logfire-api.pydantic.dev/v1/traces") + +TRACES_ENDPOINT = os.getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT") +TRACES_HEADERS = os.getenv("OTEL_EXPORTER_OTLP_HEADERS") + +# Initialize OpenTelemetry Tracer +resource = Resource.create({"service.name": "javelin-sdk"}) +trace.set_tracer_provider(TracerProvider(resource=resource)) +tracer = trace.get_tracer("javelin") # Name of the tracer + +def parse_headers(header_str: str) -> dict: + """ + Parses a string like 'Authorization=Bearer xyz,Custom-Header=value' into a dictionary. + """ + headers = {} + if header_str: + for pair in header_str.split(','): + if '=' in pair: + key, value = pair.split('=', 1) + headers[key.strip()] = value.strip() + return headers + +def configure_span_exporter(api_key: str = None): + """Configure OTLP Span Exporter with dynamic headers from environment and API key.""" + + # Disable tracing if TRACES_ENDPOINT is not set + if not TRACES_ENDPOINT: + # print("Tracing is disabled because OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is not set.") + return None + + # Parse headers from environment variable + otlp_headers = parse_headers(TRACES_HEADERS) + + # Add API key if provided (overrides any existing 'x-api-key') + if api_key: + otlp_headers["x-api-key"] = api_key + + # Setup OTLP Exporter with API key in headers + span_exporter = OTLPSpanExporter( + endpoint=TRACES_ENDPOINT, + headers=otlp_headers + ) + + span_processor = BatchSpanProcessor(span_exporter) + trace.get_tracer_provider().add_span_processor(span_processor) + + return tracer + diff --git a/pyproject.toml b/pyproject.toml index cda2611..83f80ce 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,15 @@ requests = "^2.31.0" jmespath = "^1.0.1" jsonpath-ng = "^1.7.0" +# OpenTelemetry Dependencies +opentelemetry-api = "^1.29.0" +opentelemetry-sdk = "^1.29.0" +opentelemetry-exporter-otlp-proto-http = "^1.29.0" +opentelemetry-instrumentation = "^0.50b0" +opentelemetry-instrumentation-botocore = "^0.50b0" +opentelemetry-util-http = "^0.50b0" +opentelemetry-semantic-conventions = "^0.50b0" + [tool.poetry.group.test.dependencies] pytest = "^7.3.1" pytest-httpx = "^0.22.0"