Create spans manually for LLM calls and custom operations
import zeroeval as ze
import openai
client = openai.OpenAI()
@ze.span(name="chat_completion", kind="llm")
def generate_response(messages: list) -> str:
"""Create an LLM span with automatic input/output capture"""
response = client.chat.completions.create(
model="gpt-4",
messages=messages,
temperature=0.7
)
# The SDK automatically captures function arguments as input
# and return values as output
return response.choices[0].message.content
import zeroeval as ze
import openai
import time
import json
@ze.span(name="chat_completion_advanced", kind="llm")
def generate_with_metrics(messages: list, \*\*kwargs):
"""Create a comprehensive LLM span with all metrics"""
# Get the current span to add attributes
span = ze.get_current_span()
# Track timing
start_time = time.time()
first_token_time = None
# Prepare the request
model = kwargs.get("model", "gpt-4")
temperature = kwargs.get("temperature", 0.7)
max_tokens = kwargs.get("max_tokens", None)
# Set pre-request attributes
span.set_attributes({
"llm.model": model,
"llm.provider": "openai",
"llm.temperature": temperature,
"llm.max_tokens": max_tokens,
"llm.streaming": kwargs.get("stream", False)
})
# Store input messages in the expected format
span.set_io(input_data=json.dumps([
{"role": msg["role"], "content": msg["content"]}
for msg in messages
]))
try:
client = openai.OpenAI()
# Handle streaming responses
if kwargs.get("stream", False):
stream = client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
stream=True
)
full_response = ""
tokens = 0
for chunk in stream:
if chunk.choices[0].delta.content:
if first_token_time is None:
first_token_time = time.time()
ttft_ms = (first_token_time - start_time) * 1000
span.set_attributes({"llm.ttft_ms": ttft_ms})
full_response += chunk.choices[0].delta.content
tokens += 1
# Calculate throughput
total_time = time.time() - start_time
span.set_attributes({
"llm.output_tokens": tokens,
"llm.throughput_tokens_per_sec": tokens / total_time if total_time > 0 else 0,
"llm.duration_ms": total_time * 1000
})
span.set_io(output_data=full_response)
return full_response
else:
# Non-streaming response
response = client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens
)
# Capture all response metadata
span.set_attributes({
"llm.input_tokens": response.usage.prompt_tokens,
"llm.output_tokens": response.usage.completion_tokens,
"llm.total_tokens": response.usage.total_tokens,
"llm.finish_reason": response.choices[0].finish_reason,
"llm.system_fingerprint": response.system_fingerprint,
"llm.response_id": response.id,
"llm.duration_ms": (time.time() - start_time) * 1000
})
content = response.choices[0].message.content
span.set_io(output_data=content)
return content
except Exception as e:
# Capture error details
span.set_status("error")
span.set_attributes({
"error.type": type(e).__name__,
"error.message": str(e)
})
raise
requests
, httpx
, or similar), you’ll want to capture all the metrics that the automatic integration would provide:
import requests
import json
import time
import uuid
from datetime import datetime, timezone
class OpenAITracer:
def **init**(self, api_key: str, zeroeval_api_key: str):
self.openai_api_key = api_key
self.zeroeval_api_key = zeroeval_api_key
self.zeroeval_url = "https://api.zeroeval.com/api/v1/spans"
def chat_completion_with_tracing(self, messages: list, model: str = "gpt-4o", **kwargs):
"""Make OpenAI API call with full ZeroEval instrumentation"""
# Generate span identifiers
trace_id = str(uuid.uuid4())
span_id = str(uuid.uuid4())
# Track timing
start_time = time.time()
# Prepare OpenAI request
openai_payload = {
"model": model,
"messages": messages,
**kwargs # temperature, max_tokens, etc.
}
# Add stream_options for token usage in streaming calls
is_streaming = kwargs.get("stream", False)
if is_streaming and "stream_options" not in kwargs:
openai_payload["stream_options"] = {"include_usage": True}
try:
# Make the OpenAI API call
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers={
"Authorization": f"Bearer {self.openai_api_key}",
"Content-Type": "application/json"
},
json=openai_payload,
stream=is_streaming
)
response.raise_for_status()
end_time = time.time()
duration_ms = (end_time - start_time) * 1000
if is_streaming:
# Handle streaming response
full_response = ""
input_tokens = 0
output_tokens = 0
finish_reason = None
response_id = None
system_fingerprint = None
first_token_time = None
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data_str = line[6:]
if data_str == '[DONE]':
break
try:
data = json.loads(data_str)
# Capture first token timing
if data.get('choices') and data['choices'][0].get('delta', {}).get('content'):
if first_token_time is None:
first_token_time = time.time()
full_response += data['choices'][0]['delta']['content']
# Capture final metadata
if 'usage' in data:
input_tokens = data['usage']['prompt_tokens']
output_tokens = data['usage']['completion_tokens']
if data.get('choices') and data['choices'][0].get('finish_reason'):
finish_reason = data['choices'][0]['finish_reason']
if 'id' in data:
response_id = data['id']
if 'system_fingerprint' in data:
system_fingerprint = data['system_fingerprint']
except json.JSONDecodeError:
continue
# Send ZeroEval span for streaming
self._send_span(
span_id=span_id,
trace_id=trace_id,
model=model,
messages=messages,
response_text=full_response,
input_tokens=input_tokens,
output_tokens=output_tokens,
duration_ms=duration_ms,
start_time=start_time,
finish_reason=finish_reason,
response_id=response_id,
system_fingerprint=system_fingerprint,
streaming=True,
first_token_time=first_token_time,
**kwargs
)
return full_response
else:
# Handle non-streaming response
response_data = response.json()
# Extract response details
content = response_data['choices'][0]['message']['content']
usage = response_data.get('usage', {})
# Send ZeroEval span
self._send_span(
span_id=span_id,
trace_id=trace_id,
model=model,
messages=messages,
response_text=content,
input_tokens=usage.get('prompt_tokens', 0),
output_tokens=usage.get('completion_tokens', 0),
duration_ms=duration_ms,
start_time=start_time,
finish_reason=response_data['choices'][0].get('finish_reason'),
response_id=response_data.get('id'),
system_fingerprint=response_data.get('system_fingerprint'),
streaming=False,
**kwargs
)
return content
except Exception as e:
# Send error span
end_time = time.time()
duration_ms = (end_time - start_time) * 1000
self._send_error_span(
span_id=span_id,
trace_id=trace_id,
model=model,
messages=messages,
duration_ms=duration_ms,
start_time=start_time,
error=e,
**kwargs
)
raise
def _send_span(self, span_id: str, trace_id: str, model: str, messages: list,
response_text: str, input_tokens: int, output_tokens: int,
duration_ms: float, start_time: float, finish_reason: str = None,
response_id: str = None, system_fingerprint: str = None,
streaming: bool = False, first_token_time: float = None, **kwargs):
"""Send successful span to ZeroEval"""
# Calculate throughput metrics
throughput = output_tokens / (duration_ms / 1000) if duration_ms > 0 else 0
ttft_ms = None
if streaming and first_token_time:
ttft_ms = (first_token_time - start_time) * 1000
# Prepare span attributes following ZeroEval's expected format
attributes = {
# Core LLM attributes (these are used for cost calculation)
"provider": "openai", # Key for cost calculation
"model": model, # Key for cost calculation
"inputTokens": input_tokens, # Key for cost calculation
"outputTokens": output_tokens, # Key for cost calculation
# OpenAI-specific attributes
"temperature": kwargs.get("temperature"),
"max_tokens": kwargs.get("max_tokens"),
"top_p": kwargs.get("top_p"),
"frequency_penalty": kwargs.get("frequency_penalty"),
"presence_penalty": kwargs.get("presence_penalty"),
"streaming": streaming,
"finish_reason": finish_reason,
"response_id": response_id,
"system_fingerprint": system_fingerprint,
# Performance metrics
"throughput": throughput,
"duration_ms": duration_ms,
}
if ttft_ms:
attributes["ttft_ms"] = ttft_ms
# Clean up None values
attributes = {k: v for k, v in attributes.items() if v is not None}
# Format messages for good conversation display
formatted_messages = self._format_messages_for_display(messages)
span_data = {
"id": span_id,
"trace_id": trace_id,
"name": f"{model}_completion",
"kind": "llm", # Critical: must be "llm" for cost calculation
"started_at": datetime.fromtimestamp(start_time, timezone.utc).isoformat(),
"ended_at": datetime.fromtimestamp(start_time + duration_ms/1000, timezone.utc).isoformat(),
"status": "ok",
"attributes": attributes,
"input_data": json.dumps(formatted_messages),
"output_data": response_text,
"tags": {
"provider": "openai",
"model": model,
"streaming": str(streaming).lower()
}
}
# Send to ZeroEval
response = requests.post(
self.zeroeval_url,
headers={
"Authorization": f"Bearer {self.zeroeval_api_key}",
"Content-Type": "application/json"
},
json=[span_data]
)
if response.status_code != 200:
print(f"Warning: Failed to send span to ZeroEval: {response.text}")
def _send_error_span(self, span_id: str, trace_id: str, model: str,
messages: list, duration_ms: float, start_time: float,
error: Exception, **kwargs):
"""Send error span to ZeroEval"""
attributes = {
"provider": "openai",
"model": model,
"temperature": kwargs.get("temperature"),
"max_tokens": kwargs.get("max_tokens"),
"streaming": kwargs.get("stream", False),
"error_type": type(error).__name__,
"error_message": str(error),
"duration_ms": duration_ms,
}
# Clean up None values
attributes = {k: v for k, v in attributes.items() if v is not None}
formatted_messages = self._format_messages_for_display(messages)
span_data = {
"id": span_id,
"trace_id": trace_id,
"name": f"{model}_completion",
"kind": "llm",
"started_at": datetime.fromtimestamp(start_time, timezone.utc).isoformat(),
"ended_at": datetime.fromtimestamp(start_time + duration_ms/1000, timezone.utc).isoformat(),
"status": "error",
"attributes": attributes,
"input_data": json.dumps(formatted_messages),
"output_data": "",
"error_message": str(error),
"tags": {
"provider": "openai",
"model": model,
"error": "true"
}
}
requests.post(
self.zeroeval_url,
headers={
"Authorization": f"Bearer {self.zeroeval_api_key}",
"Content-Type": "application/json"
},
json=[span_data]
)
def _format_messages_for_display(self, messages: list) -> list:
"""Format messages for optimal display in ZeroEval UI"""
formatted = []
for msg in messages:
# Handle both dict and object formats
if hasattr(msg, 'role'):
role = msg.role
content = msg.content
else:
role = msg.get('role', 'user')
content = msg.get('content', '')
# Handle multimodal content
if isinstance(content, list):
# Extract text parts for display
text_parts = []
for part in content:
if isinstance(part, dict) and part.get('type') == 'text':
text_parts.append(part['text'])
elif isinstance(part, str):
text_parts.append(part)
content = '\n'.join(text_parts) if text_parts else '[Multimodal content]'
formatted.append({
"role": role,
"content": content
})
return formatted
# Usage example
tracer = OpenAITracer(
api_key="your-openai-api-key",
zeroeval_api_key="your-zeroeval-api-key"
)
# Non-streaming call
response = tracer.chat_completion_with_tracing([
{"role": "user", "content": "What is the capital of France?"}
], model="gpt-4o", temperature=0.7)
# Streaming call
response = tracer.chat_completion_with_tracing([
{"role": "user", "content": "Write a short story"}
], model="gpt-4o", stream=True, temperature=0.9)
contents
instead of messages
and different parameter names. Here’s how to instrument Gemini API calls:
import requests
import json
import time
import uuid
from datetime import datetime, timezone
class GeminiTracer:
def **init**(self, api_key: str, zeroeval_api_key: str):
self.gemini_api_key = api_key
self.zeroeval_api_key = zeroeval_api_key
self.zeroeval_url = "https://api.zeroeval.com/api/v1/spans"
def generate_content_with_tracing(self, messages: list, model: str = "gemini-1.5-flash", **kwargs):
"""Make Gemini API call with full ZeroEval instrumentation"""
trace_id = str(uuid.uuid4())
span_id = str(uuid.uuid4())
start_time = time.time()
# Convert OpenAI-style messages to Gemini contents format
contents, system_instruction = self._convert_messages_to_contents(messages)
# Prepare Gemini request payload
gemini_payload = {
"contents": contents
}
# Add generation config
generation_config = {}
if kwargs.get("temperature") is not None:
generation_config["temperature"] = kwargs["temperature"]
if kwargs.get("max_tokens"):
generation_config["maxOutputTokens"] = kwargs["max_tokens"]
if kwargs.get("top_p") is not None:
generation_config["topP"] = kwargs["top_p"]
if kwargs.get("top_k") is not None:
generation_config["topK"] = kwargs["top_k"]
if kwargs.get("stop"):
stop = kwargs["stop"]
generation_config["stopSequences"] = stop if isinstance(stop, list) else [stop]
if generation_config:
gemini_payload["generationConfig"] = generation_config
# Add system instruction if present
if system_instruction:
gemini_payload["systemInstruction"] = {"parts": [{"text": system_instruction}]}
# Add tools if provided
if kwargs.get("tools"):
gemini_payload["tools"] = kwargs["tools"]
if kwargs.get("tool_choice"):
gemini_payload["toolConfig"] = {
"functionCallingConfig": {"mode": kwargs["tool_choice"]}
}
# Choose endpoint based on streaming
is_streaming = kwargs.get("stream", False)
endpoint = "streamGenerateContent" if is_streaming else "generateContent"
url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:{endpoint}"
try:
response = requests.post(
url,
headers={
"x-goog-api-key": self.gemini_api_key,
"Content-Type": "application/json"
},
json=gemini_payload,
stream=is_streaming
)
response.raise_for_status()
end_time = time.time()
duration_ms = (end_time - start_time) * 1000
if is_streaming:
# Handle streaming response
full_response = ""
input_tokens = 0
output_tokens = 0
finish_reason = None
model_version = None
first_token_time = None
for line in response.iter_lines():
if line:
try:
# Gemini streaming sends JSON objects separated by newlines
data = json.loads(line.decode('utf-8'))
if 'candidates' in data and data['candidates']:
candidate = data['candidates'][0]
# Extract content
if 'content' in candidate and 'parts' in candidate['content']:
for part in candidate['content']['parts']:
if 'text' in part:
if first_token_time is None:
first_token_time = time.time()
full_response += part['text']
# Extract finish reason
if 'finishReason' in candidate:
finish_reason = candidate['finishReason']
# Extract usage metadata (usually in final chunk)
if 'usageMetadata' in data:
usage = data['usageMetadata']
input_tokens = usage.get('promptTokenCount', 0)
output_tokens = usage.get('candidatesTokenCount', 0)
# Extract model version
if 'modelVersion' in data:
model_version = data['modelVersion']
except json.JSONDecodeError:
continue
self._send_span(
span_id=span_id, trace_id=trace_id, model=model,
original_messages=messages, response_text=full_response,
input_tokens=input_tokens, output_tokens=output_tokens,
duration_ms=duration_ms, start_time=start_time,
finish_reason=finish_reason, model_version=model_version,
streaming=True, first_token_time=first_token_time,
**kwargs
)
return full_response
else:
# Handle non-streaming response
response_data = response.json()
# Extract response content
content = ""
if 'candidates' in response_data and response_data['candidates']:
candidate = response_data['candidates'][0]
if 'content' in candidate and 'parts' in candidate['content']:
content_parts = []
for part in candidate['content']['parts']:
if 'text' in part:
content_parts.append(part['text'])
content = ''.join(content_parts)
# Extract usage
usage = response_data.get('usageMetadata', {})
input_tokens = usage.get('promptTokenCount', 0)
output_tokens = usage.get('candidatesTokenCount', 0)
# Extract other metadata
finish_reason = None
if 'candidates' in response_data and response_data['candidates']:
finish_reason = response_data['candidates'][0].get('finishReason')
model_version = response_data.get('modelVersion')
self._send_span(
span_id=span_id, trace_id=trace_id, model=model,
original_messages=messages, response_text=content,
input_tokens=input_tokens, output_tokens=output_tokens,
duration_ms=duration_ms, start_time=start_time,
finish_reason=finish_reason, model_version=model_version,
streaming=False, **kwargs
)
return content
except Exception as e:
end_time = time.time()
duration_ms = (end_time - start_time) * 1000
self._send_error_span(
span_id=span_id, trace_id=trace_id, model=model,
original_messages=messages, duration_ms=duration_ms,
start_time=start_time, error=e, **kwargs
)
raise
def _convert_messages_to_contents(self, messages: list) -> tuple:
"""Convert OpenAI-style messages to Gemini contents format"""
contents = []
system_instruction = None
for msg in messages:
role = msg.get('role', 'user') if isinstance(msg, dict) else msg.role
content = msg.get('content', '') if isinstance(msg, dict) else msg.content
if role == 'system':
# Collect system instructions
if system_instruction:
system_instruction += f"\n{content}"
else:
system_instruction = content
continue
# Convert content to parts
if isinstance(content, list):
# Handle multimodal content
parts = []
for item in content:
if isinstance(item, dict) and item.get('type') == 'text':
parts.append({"text": item['text']})
# Add support for images, etc. if needed
else:
parts = [{"text": str(content)}]
# Convert role
gemini_role = "user" if role == "user" else "model"
contents.append({"role": gemini_role, "parts": parts})
return contents, system_instruction
def _send_span(self, span_id: str, trace_id: str, model: str,
original_messages: list, response_text: str,
input_tokens: int, output_tokens: int, duration_ms: float,
start_time: float, finish_reason: str = None,
model_version: str = None, streaming: bool = False,
first_token_time: float = None, **kwargs):
"""Send successful span to ZeroEval"""
# Calculate performance metrics
throughput = output_tokens / (duration_ms / 1000) if duration_ms > 0 else 0
ttft_ms = None
if streaming and first_token_time:
ttft_ms = (first_token_time - start_time) * 1000
# Prepare attributes following ZeroEval's expected format
attributes = {
# Core attributes for cost calculation (use provider naming)
"provider": "gemini", # Key for cost calculation
"model": model, # Key for cost calculation
"inputTokens": input_tokens, # Key for cost calculation
"outputTokens": output_tokens, # Key for cost calculation
# Gemini-specific attributes
"temperature": kwargs.get("temperature"),
"max_tokens": kwargs.get("max_tokens"), # maxOutputTokens
"top_p": kwargs.get("top_p"),
"top_k": kwargs.get("top_k"),
"stop_sequences": kwargs.get("stop"),
"streaming": streaming,
"finish_reason": finish_reason,
"model_version": model_version,
# Performance metrics
"throughput": throughput,
"duration_ms": duration_ms,
}
if ttft_ms:
attributes["ttft_ms"] = ttft_ms
# Include tool information if present
if kwargs.get("tools"):
attributes["tools_count"] = len(kwargs["tools"])
attributes["tool_choice"] = kwargs.get("tool_choice")
# Clean up None values
attributes = {k: v for k, v in attributes.items() if v is not None}
# Format original messages for display (convert back to OpenAI format for consistency)
formatted_messages = self._format_messages_for_display(original_messages)
span_data = {
"id": span_id,
"trace_id": trace_id,
"name": f"{model}_completion",
"kind": "llm", # Critical: must be "llm" for cost calculation
"started_at": datetime.fromtimestamp(start_time, timezone.utc).isoformat(),
"ended_at": datetime.fromtimestamp(start_time + duration_ms/1000, timezone.utc).isoformat(),
"status": "ok",
"attributes": attributes,
"input_data": json.dumps(formatted_messages),
"output_data": response_text,
"tags": {
"provider": "gemini",
"model": model,
"streaming": str(streaming).lower()
}
}
# Send to ZeroEval
response = requests.post(
self.zeroeval_url,
headers={
"Authorization": f"Bearer {self.zeroeval_api_key}",
"Content-Type": "application/json"
},
json=[span_data]
)
if response.status_code != 200:
print(f"Warning: Failed to send span to ZeroEval: {response.text}")
def _send_error_span(self, span_id: str, trace_id: str, model: str,
original_messages: list, duration_ms: float,
start_time: float, error: Exception, **kwargs):
"""Send error span to ZeroEval"""
attributes = {
"provider": "gemini",
"model": model,
"temperature": kwargs.get("temperature"),
"max_tokens": kwargs.get("max_tokens"),
"streaming": kwargs.get("stream", False),
"error_type": type(error).__name__,
"error_message": str(error),
"duration_ms": duration_ms,
}
# Clean up None values
attributes = {k: v for k, v in attributes.items() if v is not None}
formatted_messages = self._format_messages_for_display(original_messages)
span_data = {
"id": span_id,
"trace_id": trace_id,
"name": f"{model}_completion",
"kind": "llm",
"started_at": datetime.fromtimestamp(start_time, timezone.utc).isoformat(),
"ended_at": datetime.fromtimestamp(start_time + duration_ms/1000, timezone.utc).isoformat(),
"status": "error",
"attributes": attributes,
"input_data": json.dumps(formatted_messages),
"output_data": "",
"error_message": str(error),
"tags": {
"provider": "gemini",
"model": model,
"error": "true"
}
}
requests.post(
self.zeroeval_url,
headers={
"Authorization": f"Bearer {self.zeroeval_api_key}",
"Content-Type": "application/json"
},
json=[span_data]
)
def _format_messages_for_display(self, messages: list) -> list:
"""Format messages for optimal display in ZeroEval UI"""
formatted = []
for msg in messages:
if hasattr(msg, 'role'):
role = msg.role
content = msg.content
else:
role = msg.get('role', 'user')
content = msg.get('content', '')
# Handle multimodal content
if isinstance(content, list):
text_parts = []
for part in content:
if isinstance(part, dict) and part.get('type') == 'text':
text_parts.append(part['text'])
elif isinstance(part, str):
text_parts.append(part)
content = '\n'.join(text_parts) if text_parts else '[Multimodal content]'
formatted.append({
"role": role,
"content": content
})
return formatted
# Usage example
tracer = GeminiTracer(
api_key="your-gemini-api-key",
zeroeval_api_key="your-zeroeval-api-key"
)
# Non-streaming call
response = tracer.generate_content_with_tracing([
{"role": "user", "content": "What is the capital of France?"}
], model="gemini-1.5-flash", temperature=0.7)
# Streaming call
response = tracer.generate_content_with_tracing([
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Write a short story"}
], model="gemini-1.5-flash", stream=True, temperature=0.9)
Attribute | Required | Description | Example Values |
---|---|---|---|
provider | ✅ | Provider identifier for pricing lookup | "openai" , "gemini" , "anthropic" |
model | ✅ | Model identifier for pricing lookup | "gpt-4o" , "gemini-1.5-flash" |
inputTokens | ✅ | Number of input tokens consumed | 150 |
outputTokens | ✅ | Number of output tokens generated | 75 |
kind | ✅ | Must be set to "llm" | "llm" |
provider_models
table using provider
and model
(inputTokens × inputPrice + outputTokens × outputPrice) / 1,000,000
cost
fieldgpt-4o
, gpt-4o-mini
, gpt-4-turbo
, gpt-3.5-turbo
gemini-1.5-flash
, gemini-1.5-pro
, gemini-1.0-pro
claude-3-5-sonnet
, claude-3-haiku
, claude-3-opus
0
and you’ll see a warning in the logs. Contact support to add pricing for new models.
def format_messages_for_zeroeval(messages: list) -> list:
"""Format messages for optimal display in ZeroEval UI"""
formatted = []
for msg in messages:
# Handle both dict and object formats
if hasattr(msg, 'role'):
role = msg.role
content = msg.content
else:
role = msg.get('role', 'user')
content = msg.get('content', '')
# Standardize role names
if role in ['assistant', 'bot', 'ai']:
role = 'assistant'
elif role in ['human', 'user']:
role = 'user'
elif role == 'system':
role = 'system'
# Handle multimodal content - extract text for display
if isinstance(content, list):
text_parts = []
for part in content:
if isinstance(part, dict):
if part.get('type') == 'text':
text_parts.append(part['text'])
elif part.get('type') == 'image_url':
text_parts.append(f"[Image: {part.get('image_url', {}).get('url', 'Unknown')}]")
elif isinstance(part, str):
text_parts.append(part)
# Join text parts with newlines for readability
content = '\n'.join(text_parts) if text_parts else '[Multimodal content]'
# Ensure content is a string
if not isinstance(content, str):
content = str(content)
# Trim excessive whitespace but preserve meaningful formatting
content = content.strip()
formatted.append({
"role": role,
"content": content
})
return formatted
# Usage in span creation
span_data = {
"input_data": json.dumps(format_messages_for_zeroeval(original_messages)),
"output_data": response_text.strip(), # Clean response text too
# ... other fields
}
"user"
, "assistant"
, and "system"
consistentlybot
vs assistant
)import zeroeval as ze
@ze.span(name="rag_pipeline", kind="generic")
def answer_with_context(question: str) -> str:
# Retrieval step
with ze.span(name="retrieve_context", kind="vector_store") as retrieval_span:
context = vector_db.search(question, k=5)
retrieval_span.set_attributes({
"vector_store.query": question,
"vector_store.k": 5,
"vector_store.results": len(context)
})
# LLM generation step
with ze.span(name="generate_answer", kind="llm") as llm_span:
messages = [
{"role": "system", "content": f"Context: {context}"},
{"role": "user", "content": question}
]
response = generate_response(messages)
llm_span.set_attributes({
"llm.model": "gpt-4",
"llm.context_length": len(str(context))
})
return response
Authorization: Bearer YOUR_API_KEY
/api/v1/spans
with your span data:
curl -X POST https://api.zeroeval.com/api/v1/spans \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '[{
"id": "550e8400-e29b-41d4-a716-446655440000",
"trace_id": "550e8400-e29b-41d4-a716-446655440001",
"name": "chat_completion",
"kind": "llm",
"started_at": "2024-01-15T10:30:00Z",
"ended_at": "2024-01-15T10:30:02Z",
"status": "ok",
"attributes": {
"llm.model": "gpt-4",
"llm.provider": "openai",
"llm.temperature": 0.7,
"llm.input_tokens": 150,
"llm.output_tokens": 230,
"llm.total_tokens": 380
},
"input_data": "[{\"role\": \"user\", \"content\": \"What is the capital of France?\"}]",
"output_data": "The capital of France is Paris."
}]'
import requests
import json
from datetime import datetime, timezone
import uuid
import time
class ZeroEvalClient:
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.zeroeval.com/api/v1"
self.session_id = str(uuid.uuid4())
def create_llm_span(
self,
messages: list,
response: dict,
model: str = "gpt-4",
trace_id: str = None,
parent_span_id: str = None,
start_time: float = None,
end_time: float = None
):
"""Create a comprehensive LLM span with all metadata"""
if not trace_id:
trace_id = str(uuid.uuid4())
if not start_time:
start_time = time.time()
if not end_time:
end_time = time.time()
span_id = str(uuid.uuid4())
# Calculate duration
duration_ms = (end_time - start_time) * 1000
# Prepare comprehensive span data
span_data = {
"id": span_id,
"trace_id": trace_id,
"parent_span_id": parent_span_id,
"name": f"{model}_completion",
"kind": "llm",
"started_at": datetime.fromtimestamp(start_time, timezone.utc).isoformat(),
"ended_at": datetime.fromtimestamp(end_time, timezone.utc).isoformat(),
"duration_ms": duration_ms,
"status": "ok",
# Session context
"session": {
"id": self.session_id,
"name": "API Client Session"
},
# Core attributes
"attributes": {
"llm.model": model,
"llm.provider": "openai",
"llm.temperature": 0.7,
"llm.max_tokens": 1000,
"llm.streaming": False,
# Token metrics
"llm.input_tokens": response.get("usage", {}).get("prompt_tokens"),
"llm.output_tokens": response.get("usage", {}).get("completion_tokens"),
"llm.total_tokens": response.get("usage", {}).get("total_tokens"),
# Performance metrics
"llm.duration_ms": duration_ms,
"llm.throughput_tokens_per_sec": (
response.get("usage", {}).get("completion_tokens", 0) /
(duration_ms / 1000) if duration_ms > 0 else 0
),
# Response metadata
"llm.finish_reason": response.get("choices", [{}])[0].get("finish_reason"),
"llm.response_id": response.get("id"),
"llm.system_fingerprint": response.get("system_fingerprint")
},
# Tags for filtering
"tags": {
"environment": "production",
"version": "1.0.0",
"user_id": "user_123"
},
# Input/Output
"input_data": json.dumps(messages),
"output_data": response.get("choices", [{}])[0].get("message", {}).get("content", ""),
# Cost calculation (optional - will be calculated server-side if not provided)
"cost": self.calculate_cost(
model,
response.get("usage", {}).get("prompt_tokens", 0),
response.get("usage", {}).get("completion_tokens", 0)
)
}
# Send the span
response = requests.post(
f"{self.base_url}/spans",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json=[span_data]
)
if response.status_code != 200:
raise Exception(f"Failed to send span: {response.text}")
return span_id
def calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
"""Calculate cost based on model and token usage"""
# Example pricing (adjust based on actual pricing)
pricing = {
"gpt-4": {"input": 0.03 / 1000, "output": 0.06 / 1000},
"gpt-3.5-turbo": {"input": 0.001 / 1000, "output": 0.002 / 1000}
}
if model in pricing:
input_cost = input_tokens * pricing[model]["input"]
output_cost = output_tokens * pricing[model]["output"]
return input_cost + output_cost
return 0.0
Field | Type | Description |
---|---|---|
trace_id | string (UUID) | Unique identifier for the trace |
name | string | Descriptive name for the span |
started_at | ISO 8601 datetime | When the span started |
Field | Type | Description |
---|---|---|
id | string (UUID) | Unique span identifier (auto-generated if not provided) |
kind | string | Set to "llm" for LLM spans |
ended_at | ISO 8601 datetime | When the span completed |
status | string | "ok" , "error" , or "unset" |
input_data | string | JSON string of input messages |
output_data | string | Generated text response |
duration_ms | number | Total duration in milliseconds |
cost | number | Calculated cost (auto-calculated if not provided) |
attributes
field:
Attribute | Type | Description |
---|---|---|
llm.model | string | Model identifier (e.g., “gpt-4”, “claude-3”) |
llm.provider | string | Provider name (e.g., “openai”, “anthropic”) |
llm.temperature | number | Temperature parameter |
llm.max_tokens | number | Maximum tokens limit |
llm.input_tokens | number | Number of input tokens |
llm.output_tokens | number | Number of output tokens |
llm.total_tokens | number | Total tokens used |
llm.streaming | boolean | Whether response was streamed |
llm.ttft_ms | number | Time to first token (streaming only) |
llm.throughput_tokens_per_sec | number | Token generation rate |
llm.finish_reason | string | Why generation stopped |
llm.response_id | string | Provider’s response ID |
llm.system_fingerprint | string | Model version identifier |
Field | Type | Description |
---|---|---|
parent_span_id | string (UUID) | Parent span for nested operations |
session | object | Session context with id and optional name |
tags | object | Key-value pairs for filtering |
signals | object | Custom signals for alerting |
error_message | string | Error description if status is “error” |
error_stack | string | Stack trace for debugging |
kind
field: Use "llm"
for LLM spans to enable specialized features like embeddings and cost tracking.
{model}_completion
or {provider}_{operation}
.
parent_span_id
to create hierarchical traces for complex workflows.
import zeroeval as ze
import time
import json
@ze.span(name="rag_query", kind="generic")
def rag_pipeline(user_query: str) -> dict:
trace_id = ze.get_current_trace()
# Step 1: Query embedding
with ze.span(name="embed_query", kind="llm") as embed_span:
start = time.time()
embedding = create_embedding(user_query)
embed_span.set_attributes({
"llm.model": "text-embedding-3-small",
"llm.provider": "openai",
"llm.input_tokens": len(user_query.split()),
"llm.duration_ms": (time.time() - start) * 1000
})
# Step 2: Vector search
with ze.span(name="vector_search", kind="vector_store") as search_span:
results = vector_db.similarity_search(embedding, k=5)
search_span.set_attributes({
"vector_store.index": "knowledge_base",
"vector_store.k": 5,
"vector_store.results_count": len(results)
})
# Step 3: Rerank results
with ze.span(name="rerank_results", kind="llm") as rerank_span:
reranked = rerank_documents(user_query, results)
rerank_span.set_attributes({
"llm.model": "rerank-english-v2.0",
"llm.provider": "cohere",
"rerank.input_documents": len(results),
"rerank.output_documents": len(reranked)
})
# Step 4: Generate response
with ze.span(name="generate_response", kind="llm") as gen_span:
context = "\n".join([doc.content for doc in reranked[:3]])
messages = [
{"role": "system", "content": f"Use this context to answer: {context}"},
{"role": "user", "content": user_query}
]
response = generate_with_metrics(messages, model="gpt-4")
gen_span.set_attributes({
"llm.context_documents": 3,
"llm.context_length": len(context)
})
return {
"answer": response,
"sources": [doc.metadata for doc in reranked[:3]],
"trace_id": trace_id
}