# Introduction
Source: https://docs.zeroeval.com/autotune/introduction
Run evaluations on models and prompts to find the best variants for your agents
Autotune is a different approach to the traditional evals experience. Instead of setting up complex eval pipelines, we simply ingest your production traces and let you
replay them with different models and generate optimized prompts based on your feedback.
Some of the key features include:
* **Content-based versioning**: Each unique prompt content gets its own version via SHA-256 hashing
* **Variable templating**: Use `{{variable}}` syntax for dynamic content
* **Automatic tracking**: All interactions are traced for analysis
* **One-click model deployments**: Models update instantly without code changes
## How it works
Replace hardcoded prompts with `ze.prompt()` calls
Each time you modify your prompt content, a new version is automatically created and tracked
ZeroEval automatically tracks all LLM interactions and their outcomes
Use the UI to run experiments, vote on outputs, and identify the best prompt/model combinations
Winning configurations are automatically deployed to your application without code changes
Learn how to integrate ze.prompt() into your codebase
Run experiments and deploy winning combinations
# Reference
Source: https://docs.zeroeval.com/autotune/reference
Parameters and configuration for ze.prompt
`ze.prompt` creates or fetches versioned prompts from the Prompt Library and returns decorated content for downstream LLM calls.
## Parameters
| Parameter | Type | Required | Default | Description |
| ----------- | -------------- | -------- | ------- | ----------------------------------------------------------------------------------------- |
| `name` | string | yes | — | Task name associated with the prompt in the library |
| `content` | string | no | `None` | Raw prompt content to ensure/create a version by content |
| `from_` | string | no | `None` | Either `"latest"` or a 64‑char lowercase SHA‑256 content hash to fetch a specific version |
| `from` | string (alias) | no | `None` | Alias for `from_` (keyword‑only) |
| `variables` | dict | no | `None` | Template variables to render `{{variable}}` tokens in content |
Notes:
* Exactly one of `content` or `from_/from` must be provided.
* `from="latest"` fetches the latest version bound to the task; otherwise `from_` must be a 64‑char hex SHA‑256 hash.
## Behavior
* **content provided**: Computes a normalized SHA‑256 hash, ensures a prompt version exists for `name`, and returns decorated content.
* **from="latest"**: Fetches the latest version for `name` and returns decorated content.
* **from=**``: Fetches by content hash for `name` and returns decorated content.
Decoration adds a compact metadata header used by integrations:
* `task`, `prompt_slug`, `prompt_version`, `prompt_version_id`, `variables`, and (when created by content) `content_hash`.
OpenAI integration: when `prompt_version_id` is present, the SDK will automatically patch the `model` parameter to the model bound to that prompt version.
## Return Value
* `str`: Decorated prompt content ready to pass into LLM clients.
## Errors
| Error | When |
| --------------------- | ------------------------------------------------------------------------------------------------ |
| `ValueError` | Both `content` and `from_` provided, or neither; invalid `from_` (not `"latest"` or 64‑char hex) |
| `PromptRequestError` | `from_="latest"` but no versions exist for `name` |
| `PromptNotFoundError` | `from_` is a hash that does not exist for `name` |
## Examples
```python
import zeroeval as ze
# Create/ensure a version by content
system = ze.prompt(
name="support-triage",
content="You are a helpful assistant for {{product}}.",
variables={"product": "Acme"},
)
# Fetch the latest version for this task
system = ze.prompt(name="support-triage", from_="latest")
# Fetch a specific version by content hash
system = ze.prompt(name="support-triage", from_="c6a7...deadbeef...0123")
```
# Setup
Source: https://docs.zeroeval.com/autotune/setup
Getting started with autotune
ZeroEval's autotune feature allows you to continuously improve your prompts and automatically deploy the best-performing models. The setup is simple and powerful.
## Getting started (\<5 mins)
Replace hardcoded prompts with `ze.prompt()` and include the name of the specific part of your agent that you want to tune.
```python
# Before
prompt = "You are a helpful assistant"
# After - with autotune
prompt = ze.prompt(
name="assistant",
content="You are a helpful assistant"
)
```
That's it! You'll start seeing production traces in your dashboard for this specific task at [`ZeroEval › Tuning › [task_name]`](https://app.zeroeval.com).
## Pushing models to production
Once you see a model that performs well, you can send it to production with a single click, as seen below.
Your specified model gets replaced automatically any time you use the prompt from `ze.prompt()`, as seen below.
```python
# You write this
response = client.chat.completions.create(
model="gpt-4", # ← Gets replaced!
messages=[{"role": "system", "content": prompt}]
)
```
## Example
Here's autotune in action for a simple customer support bot:
```python
import zeroeval as ze
from openai import OpenAI
ze.init()
client = OpenAI()
# Define your prompt with version tracking
system_prompt = ze.prompt(
name="support-bot",
content="""You are a customer support agent for {{company}}.
Be helpful, concise, and professional.""",
variables={"company": "TechCorp"}
)
# Use it normally - model gets patched automatically
response = client.chat.completions.create(
model="gpt-4", # This might run claude-3-sonnet in production!
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": "I need help with my order"}
]
)
```
## Understanding Prompt Versions
Every time you change your prompt content, a new version is created:
```python
# Version 1 - Initial prompt
prompt_v1 = ze.prompt(
name="customer-support",
content="You are a helpful assistant."
)
# Version 2 - Updated prompt (automatically creates new version)
prompt_v2 = ze.prompt(
name="customer-support",
content="You are a helpful customer support assistant." # Changed!
)
# Fetch specific versions by hash
latest_prompt = ze.prompt(
name="customer-support",
from="latest" # Always get the latest tuned version
)
# Or fetch a specific version by its content hash
specific_prompt = ze.prompt(
name="customer-support",
from="a1b2c3d4..." # 64-character SHA-256 hash
)
```
# Models
Source: https://docs.zeroeval.com/autotune/tuning/models
Evaluate your agent's performance across multiple models
ZeroEval lets you evaluate real production traces of specific agent tasks across different models, then ranking them over time. This helps you pick the best model for each part of your agent.
# Prompts
Source: https://docs.zeroeval.com/autotune/tuning/prompts
Use feedback on production traces to generate and validate better prompts
ZeroEval derives prompt optimization suggestions directly from feedback on your production traces. By capturing preferences and correctness signals, we provide concrete prompt edits you can test and use for your agents.
## Prompt optimizations from feedback
Once you've given a good amount of feedback on the incoming traffic for a given task, you can generate prompt optimizations using that feedback by clicking on the "Optimize Prompt" button in the "Suggestions" tab for the task.
Once you've generated a new prompt, you can test it with various models and see how it performs against the feedback you've already given.
# Introduction
Source: https://docs.zeroeval.com/calibrated-judges/introduction
Continuously evaluate your production traffic with judges that learn over time
Calibrated LLM judges are AI evaluators that watch your traces, sessions, or spans and score behavior according to criteria you define. They get better over time the more you refine and correct their evaluations.
## When to use
Use a calibrated judge when you want consistent, scalable evaluation of:
* Hallucinations, safety/policy violations
* Response quality (helpfulness, tone, structure)
* Latency, cost, and error patterns tied to behaviors
# Setup
Source: https://docs.zeroeval.com/calibrated-judges/setup
Create and calibrate an AI judge in minutes
## Creating a judge (\<5 mins)
1. Go to [Monitoring → Judges → New Judge](https://app.zeroeval.com/monitoring/signal-automations).
2. Sepcify the behaviour that you want to track from your production traffic.
3. Tweak the prompt of the judge until it matches what you are looking for!
That's it! Historical and future traces will be scored automatically and shown in the dashboard.
## Calibrating your judge
For each evaluated item you have the option to mark it as correct or incorrect. This is automatically stored and used to improve the judge over time.
# A/B Tests
Source: https://docs.zeroeval.com/evaluations/ab-tests
Run weighted A/B tests on models, prompts, or any variants in your code.
## Overview
`ze.choose()` enables A/B testing by making weighted random selections between different variants (models, prompts, parameters, etc.) and automatically tracking which variant was chosen for each execution.
**Key features:**
* Weighted random selection between variants
* Automatic tracking of choices within spans, traces, or sessions
* Consistency caching — same entity always gets the same variant
* Built-in validation of weights and variant keys
## Basic Usage
```python theme={null}
import zeroeval as ze
ze.init()
# Must be called within a span, trace, or session context
with ze.span("my_operation"):
# Choose between two models with 70/30 split
model = ze.choose(
"model_selection",
variants={"fast": "gpt-4o-mini", "powerful": "gpt-4o"},
weights={"fast": 0.7, "powerful": 0.3}
)
# Use the selected model
# model will be either "gpt-4o-mini" (70% chance) or "gpt-4o" (30% chance)
```
## Parameters
| Parameter | Type | Required | Description |
| ---------- | ------------------ | -------- | ------------------------------------------------------------------------------ |
| `name` | `str` | Yes | Name of the A/B test (e.g., "model\_selection", "prompt\_variant") |
| `variants` | `Dict[str, Any]` | Yes | Dictionary mapping variant keys to their values |
| `weights` | `Dict[str, float]` | Yes | Dictionary mapping variant keys to selection probabilities (must sum to \~1.0) |
## Returns
Returns the **value** from the selected variant (not the key).
## Complete Example
```python theme={null}
import zeroeval as ze
import openai
ze.init()
client = openai.OpenAI()
with ze.span("model_ab_test", tags={"feature": "model_comparison"}):
# A/B test between two models
selected_model = ze.choose(
"model_selection",
variants={
"mini": "gpt-4o-mini",
"full": "gpt-4o"
},
weights={
"mini": 0.7, # 70% traffic
"full": 0.3 # 30% traffic
}
)
# The selected model is automatically tracked
response = client.chat.completions.create(
model=selected_model,
messages=[{"role": "user", "content": "Hello!"}]
)
```
## Important Notes
* **Context Required**: Must be called within an active `ze.span()`, trace, or session
* **Consistency**: Same entity (span/trace/session) always receives the same variant
* **Weight Validation**: Weights should sum to 1.0 (warns if not within 0.95-1.05)
* **Key Matching**: Variant keys and weight keys must match exactly
# Datasets
Source: https://docs.zeroeval.com/evaluations/datasets
Create, version, and manage datasets with the ZeroEval Python SDK.
## Why datasets?
**Datasets** are named, versioned collections of rows. Each row is just a Python `dict`. Use them to store test cases for your model and share them across experiments.
## Quick Start
```python
import zeroeval as ze
ze.init() # pick up API key from `zeroeval setup`
# Create from data
capitals = ze.Dataset(
"Capitals", # name as first argument
data=[
{"input": "Colombia", "output": "Bogotá"},
{"input": "Peru", "output": "Lima"},
],
description="Country → capital mapping"
)
capitals.push() # 🚀 creates version 1 in your workspace
capitals = ze.Dataset.pull("Capitals") # later, fetch it back
# Access rows with dot notation or dictionary syntax
print(capitals[0]) # DotDict: supports both access methods
print(capitals[0].input) # "Colombia" (dot notation)
print(capitals[0]["input"]) # "Colombia" (dict syntax)
```
## Creating Datasets
### From Data
```python
# Simple creation
dataset = ze.Dataset("my_dataset", data=[
{"question": "What is 2+2?", "answer": "4"},
{"question": "What is 3+3?", "answer": "6"}
])
# With description
dataset = ze.Dataset(
"math_questions",
data=data_list,
description="Basic arithmetic questions"
)
```
### From CSV Files
Load datasets directly from CSV files:
```python
# Load from CSV - name will be the filename
dataset = ze.Dataset("/path/to/my_data.csv")
# Load with custom description
dataset = ze.Dataset(
"/path/to/survey_data.csv",
description="Customer satisfaction survey results"
)
print(f"Loaded {len(dataset)} rows from CSV")
print(f"Columns: {dataset.columns}")
```
## Row Access & Manipulation
### Accessing Rows
```python
# Single row access (returns DotDict)
first_row = dataset[0]
last_row = dataset[-1]
# Dot notation access
question = first_row.question
answer = first_row.answer
# Dictionary access
question = first_row["question"]
answer = first_row["answer"]
# Iteration
for row in dataset:
print(f"Q: {row.question}, A: {row.answer}")
```
### Slicing & Subsetting
```python
len(dataset) # number of rows
dataset.columns # ['question', 'answer']
# Standard list slicing
first_5 = dataset[:5] # New Dataset with first 5 rows
last_10 = dataset[-10:] # New Dataset with last 10 rows
middle = dataset[10:20] # Rows 10-19
every_other = dataset[::2] # Every other row
# Sliced datasets preserve metadata
print(first_5.name) # "math_questions_slice"
```
### Adding & Modifying Rows
```python
# Add single or multiple rows
dataset.add_rows([
{"question": "What is 5+5?", "answer": "10"},
{"question": "What is 7+3?", "answer": "10"}
])
# Update existing row
dataset.update_row(0, {"question": "What is 1+1?", "answer": "2"})
# Or use indexing
dataset[0] = {"question": "What is 1+1?", "answer": "2"}
# Delete rows
dataset.delete_row(2) # Delete row at index 2
del dataset[1] # Alternative syntax
```
## Multimodal Data
Add images, audio, video, and URLs to any cell:
```python
medical = ze.Dataset("medical_cases", [
{"patient_id": "P001", "symptoms": "chest pain"}
])
# Add different media types
medical.add_image(0, "xray", "scans/patient001_chest.jpg")
medical.add_audio(0, "heartbeat", "audio/patient001_heart.wav")
medical.add_video(0, "exam_footage", "videos/patient001_exam.mp4")
medical.add_media_url(0, "external_report",
"https://example.com/report.pdf",
media_type="image")
medical.push()
# Access media in your tasks
@ze.task(outputs=["diagnosis"])
def diagnose(row):
# row.xray will contain the base64-encoded image
# row.heartbeat will contain the base64-encoded audio
return {"diagnosis": analyze_media(row.xray, row.heartbeat)}
```
**Supported formats:**
* **Images**: `.jpg`, `.jpeg`, `.png`, `.gif`, `.webp`
* **Audio**: `.mp3`, `.wav`, `.ogg`, `.m4a`
* **Video**: `.mp4`, `.webm`, `.mov`
* **URLs**: Any external media link
## Dataset Properties
```python
# Basic info
print(dataset.name) # "medical_cases"
print(dataset.description) # "Medical diagnostic cases"
print(len(dataset)) # 150
# Column information
print(dataset.columns) # ['patient_id', 'symptoms', 'xray', ...]
# Version info (after pushing)
print(dataset.version_number) # 1
# String representations
print(dataset) # Dataset('medical_cases', 150 records)
```
## Versioning & Persistence
### Push & Pull
```python
# Push creates new versions automatically
dataset.push() # Version 1
dataset.add_rows([new_data])
dataset.push() # Version 2 (automatic)
# Pull latest version
latest = ze.Dataset.pull("medical_cases")
# Pull specific version
v1 = ze.Dataset.pull("medical_cases", version_number=1)
v2 = ze.Dataset.pull("medical_cases", version_number=2)
print(f"V1 has {len(v1)} rows")
print(f"V2 has {len(v2)} rows")
```
### Version Properties
```python
# After pulling a dataset
dataset = ze.Dataset.pull("my_dataset")
print(dataset.version_number) # Version number (1, 2, 3, etc.)
print(dataset.name) # Dataset name
```
## Running Experiments
Datasets can run tasks directly (see [Experiments](/evaluations/experiments) for details):
```python
@ze.task(outputs=["prediction"])
def classify(row):
return {"prediction": model.predict(row.text)}
# Run task on dataset
run = dataset.run(classify)
run.eval([accuracy_evaluator])
# Multiple runs for stability testing
all_runs = run.repeat(5)
```
## Method Chaining
```python
# Many operations support chaining
result = (ze.Dataset("test", data=initial_data)
.add_rows(more_data)
.push() # Returns self
.run(my_task)
.eval([my_evaluator]))
```
## Tips
• **Start small**: Test with `dataset[:10]` before running on full datasets
• **Use CSV loading**: Fastest way to get started with existing data\
• **Dot notation**: Makes row access more readable than `row["field"]`
• **Version everything**: Each push creates immutable versions for reproducibility
• **Multimodal**: Add media after creating the basic dataset structure
• **Error handling**: Wrap file operations and data validation in try/catch blocks
# Experiments
Source: https://docs.zeroeval.com/evaluations/experiments
Run tasks and evaluations on your datasets using decorators – simple, clean, and powerful.
## TL;DR
1. Pull (or create) a dataset
2. Write a **task** using `@ze.task` – any function that processes a row
3. Write **evaluations** using `@ze.evaluation` – functions that score your results
4. Run: `dataset.run(task).eval([evaluations])`
## Quick Start
```python
import zeroeval as ze
ze.init()
dataset = ze.Dataset.pull("spam_emails")
@ze.task(outputs=["prediction", "confidence"])
def detect_spam(row):
# Your ML model here
return {"prediction": 1, "confidence": 0.85}
@ze.evaluation(mode="row", outputs=["correct"])
def accuracy_per_row(row):
return {"correct": int(row["prediction"] == row["label"])}
# Run task and evaluate
run = dataset.run(detect_spam)
run.eval([accuracy_per_row])
```
## Task Functions
Tasks are your core ML functions decorated with `@ze.task`:
```python
@ze.task(outputs=["prediction", "confidence"])
def spam_detector(row):
"""Simple spam detection"""
text = row["text"]
spam_words = ["FREE", "WIN", "URGENT"]
score = sum(1 for word in spam_words if word in text.upper())
confidence = min(0.95, score / 3 + 0.2)
prediction = 1 if confidence > 0.5 else 0
return {
"prediction": prediction,
"confidence": confidence
}
```
**Key points:**
* Must return a dictionary with all declared `outputs`
* Receives a single row from your dataset
* Can access row fields using dot notation: `row.text` or `row["text"]`
## Evaluations
Evaluations score your task results. Three types available:
### Row Evaluations
Score each individual result:
```python
@ze.evaluation(mode="row", outputs=["correct", "error"])
def binary_accuracy(row):
"""Check if prediction matches label"""
correct = int(row["prediction"] == row["label"])
error = abs(row["prediction"] - row["label"])
return {"correct": correct, "error": error}
```
### Column Evaluations
Compute aggregate metrics across the entire dataset:
```python
@ze.evaluation(mode="column", outputs=["accuracy", "precision", "recall", "f1"])
def classification_metrics(dataset):
"""Calculate aggregate classification metrics"""
tp = sum(row.get("true_positive", 0) for row in dataset)
fp = sum(row.get("false_positive", 0) for row in dataset)
tn = sum(row.get("true_negative", 0) for row in dataset)
fn = sum(row.get("false_negative", 0) for row in dataset)
accuracy = (tp + tn) / (tp + fp + tn + fn)
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
return {"accuracy": accuracy, "precision": precision, "recall": recall, "f1": f1}
```
### Run Evaluations
Compare results across multiple runs:
```python
@ze.evaluation(mode="run", outputs=["consistency", "avg_accuracy"])
def cross_run_analysis(runs):
"""Analyze consistency across multiple runs"""
accuracies = [run.metrics.get("accuracy", 0) for run in runs]
avg_accuracy = sum(accuracies) / len(accuracies)
consistency = 1 - (max(accuracies) - min(accuracies))
return {"consistency": consistency, "avg_accuracy": avg_accuracy}
```
## Column & Run Metrics
For simpler aggregate calculations, use dedicated metric decorators:
```python
@ze.column_metric(outputs=["accuracy"])
def dataset_accuracy(dataset):
"""Simple accuracy calculation"""
correct = sum(row.get("correct", 0) for row in dataset)
total = len(dataset)
return {"accuracy": correct / total}
@ze.run_metric(outputs=["stability"])
def run_stability(runs):
"""Measure stability across runs"""
import numpy as np
scores = [run.metrics.get("accuracy", 0) for run in runs]
return {"stability": 1 - np.std(scores)}
```
Apply them separately:
```python
run.column_metrics([dataset_accuracy])
run.run_metrics([run_stability], all_runs)
```
## Complete Example
Here's a full spam detection experiment:
```python
import zeroeval as ze
ze.init()
# Create or pull dataset
dataset = ze.Dataset.pull("spam_detection")
@ze.task(outputs=["prediction", "confidence"])
def detect_spam(row):
text = row["text"]
spam_indicators = ["FREE", "WIN", "URGENT", "Click here", "Act now"]
score = sum(1 for word in spam_indicators if word.upper() in text.upper())
confidence = min(0.95, max(0.05, score / 3 + 0.3))
prediction = 1 if confidence > 0.5 else 0
return {"prediction": prediction, "confidence": confidence}
@ze.evaluation(mode="row", outputs=["correct", "true_positive", "false_positive"])
def binary_metrics(row):
pred = row["prediction"]
label = row["label"]
return {
"correct": int(pred == label),
"true_positive": int(pred == 1 and label == 1),
"false_positive": int(pred == 1 and label == 0)
}
@ze.column_metric(outputs=["accuracy", "precision"])
def aggregate_metrics(dataset):
tp = sum(row.get("true_positive", 0) for row in dataset)
fp = sum(row.get("false_positive", 0) for row in dataset)
correct = sum(row.get("correct", 0) for row in dataset)
accuracy = correct / len(dataset)
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
return {"accuracy": accuracy, "precision": precision}
# Run experiment
run = dataset.run(detect_spam)
run.eval([binary_metrics])
run.column_metrics([aggregate_metrics])
# Multiple runs for stability analysis
all_runs = run.repeat(3) # Creates 3 total runs
print(f"Final accuracy: {run.metrics.get('accuracy', 0):.2%}")
```
## Multiple Runs
Test stability by running multiple times:
```python
# Run the same task 5 times
run1 = dataset.run(detect_spam)
all_runs = run1.repeat(5) # Creates 5 total runs
# Apply run-level metrics to analyze consistency
@ze.run_metric(outputs=["mean_accuracy", "std_accuracy"])
def accuracy_stats(runs):
import numpy as np
accuracies = [r.metrics.get("accuracy", 0) for r in runs]
return {
"mean_accuracy": np.mean(accuracies),
"std_accuracy": np.std(accuracies)
}
all_runs[0].run_metrics([accuracy_stats], all_runs)
```
## Subsets & Debugging
Test on smaller subsets while developing:
```python
# Run on first 10 rows only
small_dataset = dataset[:10]
small_run = small_dataset.run(detect_spam)
small_run.eval([binary_metrics])
# Or create a custom subset
subset_indices = [0, 5, 10, 15, 20]
subset_dataset = ze.Dataset(
name="test_subset",
data=[dataset[i] for i in subset_indices]
)
test_run = subset_dataset.run(detect_spam)
```
## Automatic Tracing
Tasks are automatically traced with spans for observability:
```python
@ze.task(outputs=["result"])
def my_task(row):
# This will be automatically traced as span "task:my_task"
result = call_my_model(row["input"])
return {"result": result}
# Any @ze.span decorators inside your task are also captured
from zeroeval.observability import span
@ze.task(outputs=["answer"])
def complex_task(row):
with span(name="preprocessing"):
cleaned = preprocess(row["text"])
with span(name="model_call"):
prediction = model.predict(cleaned)
return {"answer": prediction}
```
# Prompt Library
Source: https://docs.zeroeval.com/evaluations/prompt-management
Fetch versioned prompts by slug with tags, variables, fallback, and task association.
Use `ze.get_prompt` to fetch team-managed prompts by `slug`, pinned by `version` or movable `tag`. You can template variables, specify fallbacks, and associate results with a tuning task for observability.
## Quick start
```python
import zeroeval as ze
ze.init() # ensure ZEROEVAL_API_KEY is set
# Fetch by slug using environment-resolved tag ("production" in prod, otherwise "latest")
p = ze.get_prompt("support-triage")
print(p.content)
```
## API
```python
p = ze.get_prompt(
"support-triage",
version=None, # default: None (int | None) - if set, overrides tag
tag=None, # default: None (str | None) → SDK default tag ("production" in prod else "latest")
fallback=None, # default: None (str | None) - local content if fetch fails / 404
variables=None, # default: None (dict | None) - {name: value} for {{name}} templating
task_name=None, # default: None (str | None) - associate content with a tuning task
render=True, # default: True (bool) - render template with variables
missing="error", # default: "error" ("error" | "leave") - behavior for missing variables
use_cache=True, # default: True (bool) - in-process TTL cache for server results
timeout=None, # default: None (float | None) → uses client default timeout (10.0s)
)
# Returned object
# p.content -> str (prompt text, possibly rendered/decorated)
# p.version -> int | None
# p.version_id -> str | None
# p.tag -> str | None
# p.is_latest -> bool
# p.metadata -> dict
# p.source -> "server" | "fallback"
```
### Version vs tag
* If `version` is provided, it is used and `tag` is ignored.
* If `version` is omitted, the effective tag is `tag` or a default resolved from the environment.
### Default tag resolution
* Explicit env override: `ZEROEVAL_PROMPT_TAG`.
* Else, if `ZEROEVAL_ENV == "production"` → `production`; otherwise → `latest`.
### Variables and templating
* Use double braces in prompt content: `{{variable_name}}`.
* If `variables` is provided and `render=True`, placeholders are replaced with their values.
* `missing="error"` raises if a placeholder has no value; `missing="leave"` leaves it intact.
### Task association (autotune-friendly)
Pass `task_name` to decorate `p.content` so downstream LLM calls are traced to that task. The decoration includes prompt identifiers so spans link to the exact version.
```python
p = ze.get_prompt(
"support-triage",
variables={"customer": "Acme"},
task_name="support-triage",
)
messages = [
{"role": "system", "content": p.content},
{"role": "user", "content": "My issue ..."},
]
```
### Fallback content
Provide `fallback` to ensure resiliency if the prompt is missing or the network fails. The returned `Prompt.source` will be `"fallback"` in that case.
```python
p = ze.get_prompt("support-triage", fallback="You are a helpful assistant.")
```
### Namespace helper
`ze.prompts.get("slug", **kwargs)` is a thin wrapper around `ze.get_prompt`.
## Examples
* Fetch latest (non-prod) or production (prod):
```python
p = ze.get_prompt("support-triage")
```
* Pin to a specific version:
```python
ze.get_prompt("support-triage", version=3)
```
* Render variables:
```python
p = ze.get_prompt(
"events-create",
variables={"event_name": "ZeroEval Launch"},
)
print(p.content)
```
* Associate with a task for autotune:
```python
p = ze.get_prompt("support-triage", task_name="support-triage")
```
# Introduction
Source: https://docs.zeroeval.com/llm-gateway/introduction
A unified interface to seamlessly access and switch between various Large Language Models from different providers.
The LLM Gateway is a unified API that lets you access multiple Large Language Models through a single endpoint. Switch between models from different providers with just a parameter change.
## Getting Started
### 1. Get Your API Key
Create an API key from your [Settings → API Keys](https://app.zeroeval.com/settings?section=api-keys) page.
### 2. Use the API
Replace your OpenAI base URL with ZeroEval's gateway and use model names directly:
```python Python
from openai import OpenAI
# Initialize the client with ZeroEval API
client = OpenAI(
api_key="YOUR_API_KEY",
base_url="https://api.zeroeval.com/v1",
)
# Make a completion request
response = client.chat.completions.create(
model="gpt-4o", # Just the model name, no provider prefix
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello, how are you?"},
],
)
print(response.choices[0].message.content)
```
```typescript TypeScript
import OpenAI from 'openai';
// Initialize the client with ZeroEval API
const client = new OpenAI({
apiKey: 'YOUR_API_KEY',
baseURL: 'https://api.zeroeval.com/v1',
});
// Make a completion request
async function generateResponse() {
const response = await client.chat.completions.create({
model: 'gpt-4o', // Just the model name, no provider prefix
messages: [
{ role: 'system', content: 'You are a helpful assistant.' },
{ role: 'user', content: 'Hello, how are you?' },
],
});
console.log(response.choices[0].message.content);
}
generateResponse();
```
```bash cURL
curl -X POST "https://api.zeroeval.com/v1/chat/completions" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer YOUR_API_KEY" \
-d '{
"model": "gpt-4o",
"messages": [
{
"role": "system",
"content": "You are a helpful assistant."
},
{
"role": "user",
"content": "Hello, how are you?"
}
]
}'
```
## Available Models
Get a list of available models:
```bash
curl -X GET "https://api.zeroeval.com/v1/models" \
-H "Authorization: Bearer YOUR_API_KEY"
```
# Manual Instrumentation
Source: https://docs.zeroeval.com/tracing/manual-instrumentation
Create spans manually for LLM calls and custom operations
This guide covers how to manually instrument your code to create spans, particularly for LLM operations. You'll learn how to use both the SDK and direct API calls to send trace data to ZeroEval.
## SDK Manual Instrumentation
### Basic LLM Span with SDK
The simplest way to create an LLM span is using the SDK's span decorator or context manager:
```python Python (Decorator)
import zeroeval as ze
import openai
client = openai.OpenAI()
@ze.span(name="chat_completion", kind="llm")
def generate_response(messages: list) -> str:
"""Create an LLM span with automatic input/output capture"""
response = client.chat.completions.create(
model="gpt-4",
messages=messages,
temperature=0.7
)
# The SDK automatically captures function arguments as input
# and return values as output
return response.choices[0].message.content
```
```python Python (Context Manager)
import zeroeval as ze
import openai
client = openai.OpenAI()
def generate_response(messages: list) -> str:
"""Create an LLM span with manual control"""
with ze.span(name="chat_completion", kind="llm") as span:
# Set input data
span.set_io(input_data=str(messages))
# Make the API call
response = client.chat.completions.create(
model="gpt-4",
messages=messages,
temperature=0.7
)
# Set output data
span.set_io(output_data=response.choices[0].message.content)
# Add LLM-specific attributes
span.set_attributes({
"llm.model": "gpt-4",
"llm.provider": "openai",
"llm.input_tokens": response.usage.prompt_tokens,
"llm.output_tokens": response.usage.completion_tokens,
"llm.total_tokens": response.usage.total_tokens,
"llm.temperature": 0.7
})
return response.choices[0].message.content
```
```typescript TypeScript
import * as ze from "zeroeval";
import OpenAI from "openai";
const openai = new OpenAI();
async function generateResponse(messages: any[]): Promise {
return ze.withSpan(
{
name: "chat_completion",
kind: "llm",
},
async (span) => {
// Set input data
span.setInput(JSON.stringify(messages));
// Make the API call
const response = await openai.chat.completions.create({
model: "gpt-4",
messages: messages,
temperature: 0.7,
});
// Set output data
const content = response.choices[0].message.content;
span.setOutput(content);
// Add LLM-specific attributes
span.setAttributes({
"llm.model": "gpt-4",
"llm.provider": "openai",
"llm.input_tokens": response.usage?.prompt_tokens,
"llm.output_tokens": response.usage?.completion_tokens,
"llm.total_tokens": response.usage?.total_tokens,
"llm.temperature": 0.7,
});
return content;
}
);
}
```
### Advanced LLM Span with Metrics
For production use, capture comprehensive metrics for better observability:
```python Python
import zeroeval as ze
import openai
import time
import json
@ze.span(name="chat_completion_advanced", kind="llm")
def generate_with_metrics(messages: list, \*\*kwargs):
"""Create a comprehensive LLM span with all metrics"""
# Get the current span to add attributes
span = ze.get_current_span()
# Track timing
start_time = time.time()
first_token_time = None
# Prepare the request
model = kwargs.get("model", "gpt-4")
temperature = kwargs.get("temperature", 0.7)
max_tokens = kwargs.get("max_tokens", None)
# Set pre-request attributes
span.set_attributes({
"llm.model": model,
"llm.provider": "openai",
"llm.temperature": temperature,
"llm.max_tokens": max_tokens,
"llm.streaming": kwargs.get("stream", False)
})
# Store input messages in the expected format
span.set_io(input_data=json.dumps([
{"role": msg["role"], "content": msg["content"]}
for msg in messages
]))
try:
client = openai.OpenAI()
# Handle streaming responses
if kwargs.get("stream", False):
stream = client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
stream=True
)
full_response = ""
tokens = 0
for chunk in stream:
if chunk.choices[0].delta.content:
if first_token_time is None:
first_token_time = time.time()
ttft_ms = (first_token_time - start_time) * 1000
span.set_attributes({"llm.ttft_ms": ttft_ms})
full_response += chunk.choices[0].delta.content
tokens += 1
# Calculate throughput
total_time = time.time() - start_time
span.set_attributes({
"llm.output_tokens": tokens,
"llm.throughput_tokens_per_sec": tokens / total_time if total_time > 0 else 0,
"llm.duration_ms": total_time * 1000
})
span.set_io(output_data=full_response)
return full_response
else:
# Non-streaming response
response = client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens
)
# Capture all response metadata
span.set_attributes({
"llm.input_tokens": response.usage.prompt_tokens,
"llm.output_tokens": response.usage.completion_tokens,
"llm.total_tokens": response.usage.total_tokens,
"llm.finish_reason": response.choices[0].finish_reason,
"llm.system_fingerprint": response.system_fingerprint,
"llm.response_id": response.id,
"llm.duration_ms": (time.time() - start_time) * 1000
})
content = response.choices[0].message.content
span.set_io(output_data=content)
return content
except Exception as e:
# Capture error details
span.set_status("error")
span.set_attributes({
"error.type": type(e).__name__,
"error.message": str(e)
})
raise
```
```typescript TypeScript
import * as ze from 'zeroeval';
import OpenAI from 'openai';
async function generateWithMetrics(
messages: any[],
options: {
model?: string;
temperature?: number;
maxTokens?: number;
stream?: boolean;
} = {}
): Promise {
return ze.withSpan({
name: "chat_completion_advanced",
kind: "llm"
}, async (span) => {
const startTime = Date.now();
let firstTokenTime: number | null = null;
const model = options.model || "gpt-4";
const temperature = options.temperature || 0.7;
// Set pre-request attributes
span.setAttributes({
"llm.model": model,
"llm.provider": "openai",
"llm.temperature": temperature,
"llm.max_tokens": options.maxTokens,
"llm.streaming": options.stream || false
});
// Store input
span.setInput(JSON.stringify(messages));
try {
const openai = new OpenAI();
if (options.stream) {
// Handle streaming
const stream = await openai.chat.completions.create({
model,
messages,
temperature,
max_tokens: options.maxTokens,
stream: true
});
let fullResponse = "";
let tokens = 0;
for await (const chunk of stream) {
if (chunk.choices[0]?.delta?.content) {
if (!firstTokenTime) {
firstTokenTime = Date.now();
span.setAttribute("llm.ttft_ms", firstTokenTime - startTime);
}
fullResponse += chunk.choices[0].delta.content;
tokens++;
}
}
const totalTime = (Date.now() - startTime) / 1000;
span.setAttributes({
"llm.output_tokens": tokens,
"llm.throughput_tokens_per_sec": tokens / totalTime,
"llm.duration_ms": totalTime * 1000
});
span.setOutput(fullResponse);
return fullResponse;
} else {
// Non-streaming response
const response = await openai.chat.completions.create({
model,
messages,
temperature,
max_tokens: options.maxTokens
});
span.setAttributes({
"llm.input_tokens": response.usage?.prompt_tokens,
"llm.output_tokens": response.usage?.completion_tokens,
"llm.total_tokens": response.usage?.total_tokens,
"llm.finish_reason": response.choices[0].finish_reason,
"llm.system_fingerprint": response.system_fingerprint,
"llm.response_id": response.id,
"llm.duration_ms": Date.now() - startTime
});
const content = response.choices[0].message.content || "";
span.setOutput(content);
return content;
}
} catch (error) {
span.setStatus("error");
span.setAttributes({
"error.type": error.constructor.name,
"error.message": error.message
});
throw error;
}
});
}
```
## Provider-Specific Manual Instrumentation
For users making direct API calls to OpenAI or Gemini without using the SDK's automatic instrumentation, here are comprehensive guides to properly instrument your calls with cost calculation and conversation formatting.
### OpenAI API Manual Instrumentation
When calling the OpenAI API directly (using `requests`, `httpx`, or similar), you'll want to capture all the metrics that the automatic integration would provide:
```python Python (OpenAI Direct API)
import requests
import json
import time
import uuid
from datetime import datetime, timezone
class OpenAITracer:
def **init**(self, api_key: str, zeroeval_api_key: str):
self.openai_api_key = api_key
self.zeroeval_api_key = zeroeval_api_key
self.zeroeval_url = "https://api.zeroeval.com/api/v1/spans"
def chat_completion_with_tracing(self, messages: list, model: str = "gpt-4o", **kwargs):
"""Make OpenAI API call with full ZeroEval instrumentation"""
# Generate span identifiers
trace_id = str(uuid.uuid4())
span_id = str(uuid.uuid4())
# Track timing
start_time = time.time()
# Prepare OpenAI request
openai_payload = {
"model": model,
"messages": messages,
**kwargs # temperature, max_tokens, etc.
}
# Add stream_options for token usage in streaming calls
is_streaming = kwargs.get("stream", False)
if is_streaming and "stream_options" not in kwargs:
openai_payload["stream_options"] = {"include_usage": True}
try:
# Make the OpenAI API call
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers={
"Authorization": f"Bearer {self.openai_api_key}",
"Content-Type": "application/json"
},
json=openai_payload,
stream=is_streaming
)
response.raise_for_status()
end_time = time.time()
duration_ms = (end_time - start_time) * 1000
if is_streaming:
# Handle streaming response
full_response = ""
input_tokens = 0
output_tokens = 0
finish_reason = None
response_id = None
system_fingerprint = None
first_token_time = None
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data_str = line[6:]
if data_str == '[DONE]':
break
try:
data = json.loads(data_str)
# Capture first token timing
if data.get('choices') and data['choices'][0].get('delta', {}).get('content'):
if first_token_time is None:
first_token_time = time.time()
full_response += data['choices'][0]['delta']['content']
# Capture final metadata
if 'usage' in data:
input_tokens = data['usage']['prompt_tokens']
output_tokens = data['usage']['completion_tokens']
if data.get('choices') and data['choices'][0].get('finish_reason'):
finish_reason = data['choices'][0]['finish_reason']
if 'id' in data:
response_id = data['id']
if 'system_fingerprint' in data:
system_fingerprint = data['system_fingerprint']
except json.JSONDecodeError:
continue
# Send ZeroEval span for streaming
self._send_span(
span_id=span_id,
trace_id=trace_id,
model=model,
messages=messages,
response_text=full_response,
input_tokens=input_tokens,
output_tokens=output_tokens,
duration_ms=duration_ms,
start_time=start_time,
finish_reason=finish_reason,
response_id=response_id,
system_fingerprint=system_fingerprint,
streaming=True,
first_token_time=first_token_time,
**kwargs
)
return full_response
else:
# Handle non-streaming response
response_data = response.json()
# Extract response details
content = response_data['choices'][0]['message']['content']
usage = response_data.get('usage', {})
# Send ZeroEval span
self._send_span(
span_id=span_id,
trace_id=trace_id,
model=model,
messages=messages,
response_text=content,
input_tokens=usage.get('prompt_tokens', 0),
output_tokens=usage.get('completion_tokens', 0),
duration_ms=duration_ms,
start_time=start_time,
finish_reason=response_data['choices'][0].get('finish_reason'),
response_id=response_data.get('id'),
system_fingerprint=response_data.get('system_fingerprint'),
streaming=False,
**kwargs
)
return content
except Exception as e:
# Send error span
end_time = time.time()
duration_ms = (end_time - start_time) * 1000
self._send_error_span(
span_id=span_id,
trace_id=trace_id,
model=model,
messages=messages,
duration_ms=duration_ms,
start_time=start_time,
error=e,
**kwargs
)
raise
def _send_span(self, span_id: str, trace_id: str, model: str, messages: list,
response_text: str, input_tokens: int, output_tokens: int,
duration_ms: float, start_time: float, finish_reason: str = None,
response_id: str = None, system_fingerprint: str = None,
streaming: bool = False, first_token_time: float = None, **kwargs):
"""Send successful span to ZeroEval"""
# Calculate throughput metrics
throughput = output_tokens / (duration_ms / 1000) if duration_ms > 0 else 0
ttft_ms = None
if streaming and first_token_time:
ttft_ms = (first_token_time - start_time) * 1000
# Prepare span attributes following ZeroEval's expected format
attributes = {
# Core LLM attributes (these are used for cost calculation)
"provider": "openai", # Key for cost calculation
"model": model, # Key for cost calculation
"inputTokens": input_tokens, # Key for cost calculation
"outputTokens": output_tokens, # Key for cost calculation
# OpenAI-specific attributes
"temperature": kwargs.get("temperature"),
"max_tokens": kwargs.get("max_tokens"),
"top_p": kwargs.get("top_p"),
"frequency_penalty": kwargs.get("frequency_penalty"),
"presence_penalty": kwargs.get("presence_penalty"),
"streaming": streaming,
"finish_reason": finish_reason,
"response_id": response_id,
"system_fingerprint": system_fingerprint,
# Performance metrics
"throughput": throughput,
"duration_ms": duration_ms,
}
if ttft_ms:
attributes["ttft_ms"] = ttft_ms
# Clean up None values
attributes = {k: v for k, v in attributes.items() if v is not None}
# Format messages for good conversation display
formatted_messages = self._format_messages_for_display(messages)
span_data = {
"id": span_id,
"trace_id": trace_id,
"name": f"{model}_completion",
"kind": "llm", # Critical: must be "llm" for cost calculation
"started_at": datetime.fromtimestamp(start_time, timezone.utc).isoformat(),
"ended_at": datetime.fromtimestamp(start_time + duration_ms/1000, timezone.utc).isoformat(),
"status": "ok",
"attributes": attributes,
"input_data": json.dumps(formatted_messages),
"output_data": response_text,
"tags": {
"provider": "openai",
"model": model,
"streaming": str(streaming).lower()
}
}
# Send to ZeroEval
response = requests.post(
self.zeroeval_url,
headers={
"Authorization": f"Bearer {self.zeroeval_api_key}",
"Content-Type": "application/json"
},
json=[span_data]
)
if response.status_code != 200:
print(f"Warning: Failed to send span to ZeroEval: {response.text}")
def _send_error_span(self, span_id: str, trace_id: str, model: str,
messages: list, duration_ms: float, start_time: float,
error: Exception, **kwargs):
"""Send error span to ZeroEval"""
attributes = {
"provider": "openai",
"model": model,
"temperature": kwargs.get("temperature"),
"max_tokens": kwargs.get("max_tokens"),
"streaming": kwargs.get("stream", False),
"error_type": type(error).__name__,
"error_message": str(error),
"duration_ms": duration_ms,
}
# Clean up None values
attributes = {k: v for k, v in attributes.items() if v is not None}
formatted_messages = self._format_messages_for_display(messages)
span_data = {
"id": span_id,
"trace_id": trace_id,
"name": f"{model}_completion",
"kind": "llm",
"started_at": datetime.fromtimestamp(start_time, timezone.utc).isoformat(),
"ended_at": datetime.fromtimestamp(start_time + duration_ms/1000, timezone.utc).isoformat(),
"status": "error",
"attributes": attributes,
"input_data": json.dumps(formatted_messages),
"output_data": "",
"error_message": str(error),
"tags": {
"provider": "openai",
"model": model,
"error": "true"
}
}
requests.post(
self.zeroeval_url,
headers={
"Authorization": f"Bearer {self.zeroeval_api_key}",
"Content-Type": "application/json"
},
json=[span_data]
)
def _format_messages_for_display(self, messages: list) -> list:
"""Format messages for optimal display in ZeroEval UI"""
formatted = []
for msg in messages:
# Handle both dict and object formats
if hasattr(msg, 'role'):
role = msg.role
content = msg.content
else:
role = msg.get('role', 'user')
content = msg.get('content', '')
# Handle multimodal content
if isinstance(content, list):
# Extract text parts for display
text_parts = []
for part in content:
if isinstance(part, dict) and part.get('type') == 'text':
text_parts.append(part['text'])
elif isinstance(part, str):
text_parts.append(part)
content = '\n'.join(text_parts) if text_parts else '[Multimodal content]'
formatted.append({
"role": role,
"content": content
})
return formatted
# Usage example
tracer = OpenAITracer(
api_key="your-openai-api-key",
zeroeval_api_key="your-zeroeval-api-key"
)
# Non-streaming call
response = tracer.chat_completion_with_tracing([
{"role": "user", "content": "What is the capital of France?"}
], model="gpt-4o", temperature=0.7)
# Streaming call
response = tracer.chat_completion_with_tracing([
{"role": "user", "content": "Write a short story"}
], model="gpt-4o", stream=True, temperature=0.9)
```
```javascript JavaScript (OpenAI Direct API)
class OpenAITracer {
constructor(openaiApiKey, zeroevalApiKey) {
this.openaiApiKey = openaiApiKey;
this.zeroevalApiKey = zeroevalApiKey;
this.zeroevalUrl = "https://api.zeroeval.com/api/v1/spans";
}
async chatCompletionWithTracing(messages, model = "gpt-4o", options = {}) {
const traceId = crypto.randomUUID();
const spanId = crypto.randomUUID();
const startTime = Date.now();
// Prepare OpenAI payload
const openaiPayload = {
model,
messages,
...options
};
// Add stream_options for token usage in streaming calls
const isStreaming = options.stream || false;
if (isStreaming && !options.stream_options) {
openaiPayload.stream_options = { include_usage: true };
}
try {
const response = await fetch("https://api.openai.com/v1/chat/completions", {
method: "POST",
headers: {
"Authorization": `Bearer ${this.openaiApiKey}`,
"Content-Type": "application/json",
},
body: JSON.stringify(openaiPayload),
});
if (!response.ok) {
throw new Error(`OpenAI API error: ${response.statusText}`);
}
const endTime = Date.now();
const durationMs = endTime - startTime;
if (isStreaming) {
// Handle streaming response
let fullResponse = "";
let inputTokens = 0;
let outputTokens = 0;
let finishReason = null;
let responseId = null;
let systemFingerprint = null;
let firstTokenTime = null;
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const dataStr = line.slice(6);
if (dataStr === '[DONE]') break;
try {
const data = JSON.parse(dataStr);
// Capture first token timing
if (data.choices?.[0]?.delta?.content) {
if (!firstTokenTime) {
firstTokenTime = Date.now();
}
fullResponse += data.choices[0].delta.content;
}
// Capture final metadata
if (data.usage) {
inputTokens = data.usage.prompt_tokens;
outputTokens = data.usage.completion_tokens;
}
if (data.choices?.[0]?.finish_reason) {
finishReason = data.choices[0].finish_reason;
}
if (data.id) responseId = data.id;
if (data.system_fingerprint) systemFingerprint = data.system_fingerprint;
} catch (e) {
// Skip invalid JSON lines
}
}
}
}
await this.sendSpan({
spanId, traceId, model, messages,
responseText: fullResponse,
inputTokens, outputTokens, durationMs,
startTime: startTime / 1000,
finishReason, responseId, systemFingerprint,
streaming: true, firstTokenTime,
...options
});
return fullResponse;
} else {
// Handle non-streaming response
const responseData = await response.json();
const content = responseData.choices[0].message.content;
const usage = responseData.usage || {};
await this.sendSpan({
spanId, traceId, model, messages,
responseText: content,
inputTokens: usage.prompt_tokens || 0,
outputTokens: usage.completion_tokens || 0,
durationMs,
startTime: startTime / 1000,
finishReason: responseData.choices[0].finish_reason,
responseId: responseData.id,
systemFingerprint: responseData.system_fingerprint,
streaming: false,
...options
});
return content;
}
} catch (error) {
const endTime = Date.now();
const durationMs = endTime - startTime;
await this.sendErrorSpan({
spanId, traceId, model, messages,
durationMs, startTime: startTime / 1000,
error, ...options
});
throw error;
}
}
async sendSpan({
spanId, traceId, model, messages, responseText,
inputTokens, outputTokens, durationMs, startTime,
finishReason, responseId, systemFingerprint,
streaming, firstTokenTime, ...options
}) {
// Calculate performance metrics
const throughput = outputTokens / (durationMs / 1000);
const ttftMs = streaming && firstTokenTime ?
(firstTokenTime - startTime * 1000) : null;
// Prepare attributes for cost calculation and display
const attributes = {
// Core attributes for cost calculation
provider: "openai",
model: model,
inputTokens: inputTokens,
outputTokens: outputTokens,
// OpenAI-specific attributes
temperature: options.temperature,
max_tokens: options.max_tokens,
top_p: options.top_p,
frequency_penalty: options.frequency_penalty,
presence_penalty: options.presence_penalty,
streaming: streaming,
finish_reason: finishReason,
response_id: responseId,
system_fingerprint: systemFingerprint,
// Performance metrics
throughput: throughput,
duration_ms: durationMs,
};
if (ttftMs) attributes.ttft_ms = ttftMs;
// Clean up undefined values
Object.keys(attributes).forEach(key => {
if (attributes[key] === undefined) delete attributes[key];
});
const spanData = {
id: spanId,
trace_id: traceId,
name: `${model}_completion`,
kind: "llm", // Critical for cost calculation
started_at: new Date(startTime * 1000).toISOString(),
ended_at: new Date((startTime * 1000) + durationMs).toISOString(),
status: "ok",
attributes: attributes,
input_data: JSON.stringify(this.formatMessagesForDisplay(messages)),
output_data: responseText,
tags: {
provider: "openai",
model: model,
streaming: streaming.toString()
}
};
try {
const response = await fetch(this.zeroevalUrl, {
method: "POST",
headers: {
"Authorization": `Bearer ${this.zeroevalApiKey}`,
"Content-Type": "application/json",
},
body: JSON.stringify([spanData])
});
if (!response.ok) {
console.warn(`Failed to send span to ZeroEval: ${response.statusText}`);
}
} catch (error) {
console.warn(`Error sending span to ZeroEval: ${error.message}`);
}
}
formatMessagesForDisplay(messages) {
return messages.map(msg => {
let content = msg.content;
// Handle multimodal content
if (Array.isArray(content)) {
const textParts = content
.filter(part => part.type === 'text')
.map(part => part.text);
content = textParts.length > 0 ? textParts.join('\n') : '[Multimodal content]';
}
return {
role: msg.role,
content: content
};
});
}
// Error handling method...
async sendErrorSpan({ spanId, traceId, model, messages, durationMs, startTime, error, ...options }) {
// Implementation similar to Python version
}
}
// Usage
const tracer = new OpenAITracer("your-openai-api-key", "your-zeroeval-api-key");
// Non-streaming
const response = await tracer.chatCompletionWithTracing([
{ role: "user", content: "What is the capital of France?" }
], "gpt-4o", { temperature: 0.7 });
// Streaming
const streamResponse = await tracer.chatCompletionWithTracing([
{ role: "user", content: "Write a short story" }
], "gpt-4o", { stream: true, temperature: 0.9 });
```
### Gemini API Manual Instrumentation
Gemini has a different API structure with `contents` instead of `messages` and different parameter names. Here's how to instrument Gemini API calls:
```python Python (Gemini Direct API)
import requests
import json
import time
import uuid
from datetime import datetime, timezone
class GeminiTracer:
def **init**(self, api_key: str, zeroeval_api_key: str):
self.gemini_api_key = api_key
self.zeroeval_api_key = zeroeval_api_key
self.zeroeval_url = "https://api.zeroeval.com/api/v1/spans"
def generate_content_with_tracing(self, messages: list, model: str = "gemini-1.5-flash", **kwargs):
"""Make Gemini API call with full ZeroEval instrumentation"""
trace_id = str(uuid.uuid4())
span_id = str(uuid.uuid4())
start_time = time.time()
# Convert OpenAI-style messages to Gemini contents format
contents, system_instruction = self._convert_messages_to_contents(messages)
# Prepare Gemini request payload
gemini_payload = {
"contents": contents
}
# Add generation config
generation_config = {}
if kwargs.get("temperature") is not None:
generation_config["temperature"] = kwargs["temperature"]
if kwargs.get("max_tokens"):
generation_config["maxOutputTokens"] = kwargs["max_tokens"]
if kwargs.get("top_p") is not None:
generation_config["topP"] = kwargs["top_p"]
if kwargs.get("top_k") is not None:
generation_config["topK"] = kwargs["top_k"]
if kwargs.get("stop"):
stop = kwargs["stop"]
generation_config["stopSequences"] = stop if isinstance(stop, list) else [stop]
if generation_config:
gemini_payload["generationConfig"] = generation_config
# Add system instruction if present
if system_instruction:
gemini_payload["systemInstruction"] = {"parts": [{"text": system_instruction}]}
# Add tools if provided
if kwargs.get("tools"):
gemini_payload["tools"] = kwargs["tools"]
if kwargs.get("tool_choice"):
gemini_payload["toolConfig"] = {
"functionCallingConfig": {"mode": kwargs["tool_choice"]}
}
# Choose endpoint based on streaming
is_streaming = kwargs.get("stream", False)
endpoint = "streamGenerateContent" if is_streaming else "generateContent"
url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:{endpoint}"
try:
response = requests.post(
url,
headers={
"x-goog-api-key": self.gemini_api_key,
"Content-Type": "application/json"
},
json=gemini_payload,
stream=is_streaming
)
response.raise_for_status()
end_time = time.time()
duration_ms = (end_time - start_time) * 1000
if is_streaming:
# Handle streaming response
full_response = ""
input_tokens = 0
output_tokens = 0
finish_reason = None
model_version = None
first_token_time = None
for line in response.iter_lines():
if line:
try:
# Gemini streaming sends JSON objects separated by newlines
data = json.loads(line.decode('utf-8'))
if 'candidates' in data and data['candidates']:
candidate = data['candidates'][0]
# Extract content
if 'content' in candidate and 'parts' in candidate['content']:
for part in candidate['content']['parts']:
if 'text' in part:
if first_token_time is None:
first_token_time = time.time()
full_response += part['text']
# Extract finish reason
if 'finishReason' in candidate:
finish_reason = candidate['finishReason']
# Extract usage metadata (usually in final chunk)
if 'usageMetadata' in data:
usage = data['usageMetadata']
input_tokens = usage.get('promptTokenCount', 0)
output_tokens = usage.get('candidatesTokenCount', 0)
# Extract model version
if 'modelVersion' in data:
model_version = data['modelVersion']
except json.JSONDecodeError:
continue
self._send_span(
span_id=span_id, trace_id=trace_id, model=model,
original_messages=messages, response_text=full_response,
input_tokens=input_tokens, output_tokens=output_tokens,
duration_ms=duration_ms, start_time=start_time,
finish_reason=finish_reason, model_version=model_version,
streaming=True, first_token_time=first_token_time,
**kwargs
)
return full_response
else:
# Handle non-streaming response
response_data = response.json()
# Extract response content
content = ""
if 'candidates' in response_data and response_data['candidates']:
candidate = response_data['candidates'][0]
if 'content' in candidate and 'parts' in candidate['content']:
content_parts = []
for part in candidate['content']['parts']:
if 'text' in part:
content_parts.append(part['text'])
content = ''.join(content_parts)
# Extract usage
usage = response_data.get('usageMetadata', {})
input_tokens = usage.get('promptTokenCount', 0)
output_tokens = usage.get('candidatesTokenCount', 0)
# Extract other metadata
finish_reason = None
if 'candidates' in response_data and response_data['candidates']:
finish_reason = response_data['candidates'][0].get('finishReason')
model_version = response_data.get('modelVersion')
self._send_span(
span_id=span_id, trace_id=trace_id, model=model,
original_messages=messages, response_text=content,
input_tokens=input_tokens, output_tokens=output_tokens,
duration_ms=duration_ms, start_time=start_time,
finish_reason=finish_reason, model_version=model_version,
streaming=False, **kwargs
)
return content
except Exception as e:
end_time = time.time()
duration_ms = (end_time - start_time) * 1000
self._send_error_span(
span_id=span_id, trace_id=trace_id, model=model,
original_messages=messages, duration_ms=duration_ms,
start_time=start_time, error=e, **kwargs
)
raise
def _convert_messages_to_contents(self, messages: list) -> tuple:
"""Convert OpenAI-style messages to Gemini contents format"""
contents = []
system_instruction = None
for msg in messages:
role = msg.get('role', 'user') if isinstance(msg, dict) else msg.role
content = msg.get('content', '') if isinstance(msg, dict) else msg.content
if role == 'system':
# Collect system instructions
if system_instruction:
system_instruction += f"\n{content}"
else:
system_instruction = content
continue
# Convert content to parts
if isinstance(content, list):
# Handle multimodal content
parts = []
for item in content:
if isinstance(item, dict) and item.get('type') == 'text':
parts.append({"text": item['text']})
# Add support for images, etc. if needed
else:
parts = [{"text": str(content)}]
# Convert role
gemini_role = "user" if role == "user" else "model"
contents.append({"role": gemini_role, "parts": parts})
return contents, system_instruction
def _send_span(self, span_id: str, trace_id: str, model: str,
original_messages: list, response_text: str,
input_tokens: int, output_tokens: int, duration_ms: float,
start_time: float, finish_reason: str = None,
model_version: str = None, streaming: bool = False,
first_token_time: float = None, **kwargs):
"""Send successful span to ZeroEval"""
# Calculate performance metrics
throughput = output_tokens / (duration_ms / 1000) if duration_ms > 0 else 0
ttft_ms = None
if streaming and first_token_time:
ttft_ms = (first_token_time - start_time) * 1000
# Prepare attributes following ZeroEval's expected format
attributes = {
# Core attributes for cost calculation (use provider naming)
"provider": "gemini", # Key for cost calculation
"model": model, # Key for cost calculation
"inputTokens": input_tokens, # Key for cost calculation
"outputTokens": output_tokens, # Key for cost calculation
# Gemini-specific attributes
"temperature": kwargs.get("temperature"),
"max_tokens": kwargs.get("max_tokens"), # maxOutputTokens
"top_p": kwargs.get("top_p"),
"top_k": kwargs.get("top_k"),
"stop_sequences": kwargs.get("stop"),
"streaming": streaming,
"finish_reason": finish_reason,
"model_version": model_version,
# Performance metrics
"throughput": throughput,
"duration_ms": duration_ms,
}
if ttft_ms:
attributes["ttft_ms"] = ttft_ms
# Include tool information if present
if kwargs.get("tools"):
attributes["tools_count"] = len(kwargs["tools"])
attributes["tool_choice"] = kwargs.get("tool_choice")
# Clean up None values
attributes = {k: v for k, v in attributes.items() if v is not None}
# Format original messages for display (convert back to OpenAI format for consistency)
formatted_messages = self._format_messages_for_display(original_messages)
span_data = {
"id": span_id,
"trace_id": trace_id,
"name": f"{model}_completion",
"kind": "llm", # Critical: must be "llm" for cost calculation
"started_at": datetime.fromtimestamp(start_time, timezone.utc).isoformat(),
"ended_at": datetime.fromtimestamp(start_time + duration_ms/1000, timezone.utc).isoformat(),
"status": "ok",
"attributes": attributes,
"input_data": json.dumps(formatted_messages),
"output_data": response_text,
"tags": {
"provider": "gemini",
"model": model,
"streaming": str(streaming).lower()
}
}
# Send to ZeroEval
response = requests.post(
self.zeroeval_url,
headers={
"Authorization": f"Bearer {self.zeroeval_api_key}",
"Content-Type": "application/json"
},
json=[span_data]
)
if response.status_code != 200:
print(f"Warning: Failed to send span to ZeroEval: {response.text}")
def _send_error_span(self, span_id: str, trace_id: str, model: str,
original_messages: list, duration_ms: float,
start_time: float, error: Exception, **kwargs):
"""Send error span to ZeroEval"""
attributes = {
"provider": "gemini",
"model": model,
"temperature": kwargs.get("temperature"),
"max_tokens": kwargs.get("max_tokens"),
"streaming": kwargs.get("stream", False),
"error_type": type(error).__name__,
"error_message": str(error),
"duration_ms": duration_ms,
}
# Clean up None values
attributes = {k: v for k, v in attributes.items() if v is not None}
formatted_messages = self._format_messages_for_display(original_messages)
span_data = {
"id": span_id,
"trace_id": trace_id,
"name": f"{model}_completion",
"kind": "llm",
"started_at": datetime.fromtimestamp(start_time, timezone.utc).isoformat(),
"ended_at": datetime.fromtimestamp(start_time + duration_ms/1000, timezone.utc).isoformat(),
"status": "error",
"attributes": attributes,
"input_data": json.dumps(formatted_messages),
"output_data": "",
"error_message": str(error),
"tags": {
"provider": "gemini",
"model": model,
"error": "true"
}
}
requests.post(
self.zeroeval_url,
headers={
"Authorization": f"Bearer {self.zeroeval_api_key}",
"Content-Type": "application/json"
},
json=[span_data]
)
def _format_messages_for_display(self, messages: list) -> list:
"""Format messages for optimal display in ZeroEval UI"""
formatted = []
for msg in messages:
if hasattr(msg, 'role'):
role = msg.role
content = msg.content
else:
role = msg.get('role', 'user')
content = msg.get('content', '')
# Handle multimodal content
if isinstance(content, list):
text_parts = []
for part in content:
if isinstance(part, dict) and part.get('type') == 'text':
text_parts.append(part['text'])
elif isinstance(part, str):
text_parts.append(part)
content = '\n'.join(text_parts) if text_parts else '[Multimodal content]'
formatted.append({
"role": role,
"content": content
})
return formatted
# Usage example
tracer = GeminiTracer(
api_key="your-gemini-api-key",
zeroeval_api_key="your-zeroeval-api-key"
)
# Non-streaming call
response = tracer.generate_content_with_tracing([
{"role": "user", "content": "What is the capital of France?"}
], model="gemini-1.5-flash", temperature=0.7)
# Streaming call
response = tracer.generate_content_with_tracing([
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Write a short story"}
], model="gemini-1.5-flash", stream=True, temperature=0.9)
```
### Key Attributes for Cost Calculation
For accurate cost calculation, ZeroEval requires these specific attributes in your span:
| Attribute | Required | Description | Example Values |
| -------------- | -------- | -------------------------------------- | ------------------------------------- |
| `provider` | ✅ | Provider identifier for pricing lookup | `"openai"`, `"gemini"`, `"anthropic"` |
| `model` | ✅ | Model identifier for pricing lookup | `"gpt-4o"`, `"gemini-1.5-flash"` |
| `inputTokens` | ✅ | Number of input tokens consumed | `150` |
| `outputTokens` | ✅ | Number of output tokens generated | `75` |
| `kind` | ✅ | Must be set to `"llm"` | `"llm"` |
**Cost Calculation Process:**
1. ZeroEval looks up pricing in the `provider_models` table using `provider` and `model`
2. Calculates: `(inputTokens × inputPrice + outputTokens × outputPrice) / 1,000,000`
3. Stores the result in the span's `cost` field
4. Cost is displayed in cents, automatically converted to dollars in the UI
**Current Supported Models for Cost Calculation:**
* **OpenAI**: `gpt-4o`, `gpt-4o-mini`, `gpt-4-turbo`, `gpt-3.5-turbo`
* **Gemini**: `gemini-1.5-flash`, `gemini-1.5-pro`, `gemini-1.0-pro`
* **Anthropic**: `claude-3-5-sonnet`, `claude-3-haiku`, `claude-3-opus`
If your model isn't listed, the cost will be `0` and you'll see a warning in the logs. Contact support to add pricing for new models.
### Conversation Formatting Best Practices
To ensure your conversations display properly in the ZeroEval UI, follow these formatting guidelines:
```python Python Message Formatting
def format_messages_for_zeroeval(messages: list) -> list:
"""Format messages for optimal display in ZeroEval UI"""
formatted = []
for msg in messages:
# Handle both dict and object formats
if hasattr(msg, 'role'):
role = msg.role
content = msg.content
else:
role = msg.get('role', 'user')
content = msg.get('content', '')
# Standardize role names
if role in ['assistant', 'bot', 'ai']:
role = 'assistant'
elif role in ['human', 'user']:
role = 'user'
elif role == 'system':
role = 'system'
# Handle multimodal content - extract text for display
if isinstance(content, list):
text_parts = []
for part in content:
if isinstance(part, dict):
if part.get('type') == 'text':
text_parts.append(part['text'])
elif part.get('type') == 'image_url':
text_parts.append(f"[Image: {part.get('image_url', {}).get('url', 'Unknown')}]")
elif isinstance(part, str):
text_parts.append(part)
# Join text parts with newlines for readability
content = '\n'.join(text_parts) if text_parts else '[Multimodal content]'
# Ensure content is a string
if not isinstance(content, str):
content = str(content)
# Trim excessive whitespace but preserve meaningful formatting
content = content.strip()
formatted.append({
"role": role,
"content": content
})
return formatted
# Usage in span creation
span_data = {
"input_data": json.dumps(format_messages_for_zeroeval(original_messages)),
"output_data": response_text.strip(), # Clean response text too
# ... other fields
}
```
```javascript JavaScript Message Formatting
function formatMessagesForZeroEval(messages) {
return messages.map((msg) => {
let role = msg.role || "user";
let content = msg.content || "";
// Standardize role names
if (["assistant", "bot", "ai"].includes(role)) {
role = "assistant";
} else if (["human", "user"].includes(role)) {
role = "user";
} else if (role === "system") {
role = "system";
}
// Handle multimodal content
if (Array.isArray(content)) {
const textParts = [];
for (const part of content) {
if (part.type === "text") {
textParts.push(part.text);
} else if (part.type === "image_url") {
textParts.push(`[Image: ${part.image_url?.url || "Unknown"}]`);
} else if (typeof part === "string") {
textParts.push(part);
}
}
content =
textParts.length > 0 ? textParts.join("\n") : "[Multimodal content]";
}
// Ensure content is a string and trim whitespace
content = String(content).trim();
return {
role: role,
content: content,
};
});
}
// Usage in span creation
const spanData = {
input_data: JSON.stringify(formatMessagesForZeroEval(originalMessages)),
output_data: responseText.trim(),
// ... other fields
};
```
**Key Formatting Rules:**
1. **Standardize Role Names**: Use `"user"`, `"assistant"`, and `"system"` consistently
2. **Handle Multimodal Content**: Extract text content and add descriptive placeholders for non-text elements
3. **Clean Whitespace**: Trim excessive whitespace while preserving intentional formatting
4. **Ensure String Types**: Convert all content to strings to avoid serialization issues
5. **Preserve Conversation Flow**: Maintain the original message order and context
**UI Display Features:**
* **Message Bubbles**: Conversations appear as chat bubbles with clear role distinction
* **Token Counts**: Hover over messages to see token usage breakdown
* **Copy Functionality**: Users can copy individual messages or entire conversations
* **Search**: Well-formatted messages are easily searchable within traces
* **Export**: Clean formatting ensures readable exports to various formats
**Common Formatting Issues to Avoid:**
* ❌ Mixed role naming (`bot` vs `assistant`)
* ❌ Nested objects in content fields
* ❌ Excessive line breaks or whitespace
* ❌ Empty or null content fields
* ❌ Non-string data types in content
**Pro Tips:**
* Keep system messages concise but informative
* Use consistent formatting across your application
* Include relevant context in message content for better debugging
* Consider truncating very long messages (>10k characters) with ellipsis
### Creating Child Spans
Create nested spans to track sub-operations within an LLM call:
```python
import zeroeval as ze
@ze.span(name="rag_pipeline", kind="generic")
def answer_with_context(question: str) -> str:
# Retrieval step
with ze.span(name="retrieve_context", kind="vector_store") as retrieval_span:
context = vector_db.search(question, k=5)
retrieval_span.set_attributes({
"vector_store.query": question,
"vector_store.k": 5,
"vector_store.results": len(context)
})
# LLM generation step
with ze.span(name="generate_answer", kind="llm") as llm_span:
messages = [
{"role": "system", "content": f"Context: {context}"},
{"role": "user", "content": question}
]
response = generate_response(messages)
llm_span.set_attributes({
"llm.model": "gpt-4",
"llm.context_length": len(str(context))
})
return response
```
## Direct API Instrumentation
If you prefer to send spans directly to the API without using an SDK, here's how to do it:
### API Authentication
First, obtain an API key from your [Settings → API Keys](https://app.zeroeval.com/settings?section=api-keys) page.
Include the API key in your request headers:
```bash
Authorization: Bearer YOUR_API_KEY
```
### Basic Span Creation
Send a POST request to `/api/v1/spans` with your span data:
```bash cURL
curl -X POST https://api.zeroeval.com/api/v1/spans \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '[{
"id": "550e8400-e29b-41d4-a716-446655440000",
"trace_id": "550e8400-e29b-41d4-a716-446655440001",
"name": "chat_completion",
"kind": "llm",
"started_at": "2024-01-15T10:30:00Z",
"ended_at": "2024-01-15T10:30:02Z",
"status": "ok",
"attributes": {
"llm.model": "gpt-4",
"llm.provider": "openai",
"llm.temperature": 0.7,
"llm.input_tokens": 150,
"llm.output_tokens": 230,
"llm.total_tokens": 380
},
"input_data": "[{\"role\": \"user\", \"content\": \"What is the capital of France?\"}]",
"output_data": "The capital of France is Paris."
}]'
```
```python Python (Requests)
import requests
import json
from datetime import datetime, timezone
import uuid
def send_llm_span(messages, response_text, model="gpt-4", tokens=None):
"""Send an LLM span directly to the ZeroEval API"""
# Generate IDs
span_id = str(uuid.uuid4())
trace_id = str(uuid.uuid4())
# Prepare the span data
span_data = {
"id": span_id,
"trace_id": trace_id,
"name": "chat_completion",
"kind": "llm",
"started_at": datetime.now(timezone.utc).isoformat(),
"ended_at": datetime.now(timezone.utc).isoformat(),
"status": "ok",
"attributes": {
"llm.model": model,
"llm.provider": "openai",
"llm.temperature": 0.7
},
"input_data": json.dumps(messages),
"output_data": response_text
}
# Add token counts if provided
if tokens:
span_data["attributes"].update({
"llm.input_tokens": tokens.get("prompt_tokens"),
"llm.output_tokens": tokens.get("completion_tokens"),
"llm.total_tokens": tokens.get("total_tokens")
})
# Send to API
response = requests.post(
"https://api.zeroeval.com/api/v1/spans",
headers={
"Authorization": f"Bearer {YOUR_API_KEY}",
"Content-Type": "application/json"
},
json=[span_data] # Note: API expects an array
)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"Failed to send span: {response.text}")
```
```javascript JavaScript (Fetch)
async function sendLLMSpan(
messages,
responseText,
model = "gpt-4",
tokens = null
) {
// Generate IDs
const spanId = crypto.randomUUID();
const traceId = crypto.randomUUID();
// Prepare span data
const spanData = {
id: spanId,
trace_id: traceId,
name: "chat_completion",
kind: "llm",
started_at: new Date().toISOString(),
ended_at: new Date().toISOString(),
status: "ok",
attributes: {
"llm.model": model,
"llm.provider": "openai",
"llm.temperature": 0.7,
},
input_data: JSON.stringify(messages),
output_data: responseText,
};
// Add token counts if provided
if (tokens) {
spanData.attributes["llm.input_tokens"] = tokens.prompt_tokens;
spanData.attributes["llm.output_tokens"] = tokens.completion_tokens;
spanData.attributes["llm.total_tokens"] = tokens.total_tokens;
}
// Send to API
const response = await fetch("https://api.zeroeval.com/api/v1/spans", {
method: "POST",
headers: {
Authorization: `Bearer ${YOUR_API_KEY}`,
"Content-Type": "application/json",
},
body: JSON.stringify([spanData]), // Note: API expects an array
});
if (!response.ok) {
throw new Error(`Failed to send span: ${await response.text()}`);
}
return await response.json();
}
```
### Complete LLM Span with Session
Create a full trace with session context:
```python
import requests
import json
from datetime import datetime, timezone
import uuid
import time
class ZeroEvalClient:
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.zeroeval.com/api/v1"
self.session_id = str(uuid.uuid4())
def create_llm_span(
self,
messages: list,
response: dict,
model: str = "gpt-4",
trace_id: str = None,
parent_span_id: str = None,
start_time: float = None,
end_time: float = None
):
"""Create a comprehensive LLM span with all metadata"""
if not trace_id:
trace_id = str(uuid.uuid4())
if not start_time:
start_time = time.time()
if not end_time:
end_time = time.time()
span_id = str(uuid.uuid4())
# Calculate duration
duration_ms = (end_time - start_time) * 1000
# Prepare comprehensive span data
span_data = {
"id": span_id,
"trace_id": trace_id,
"parent_span_id": parent_span_id,
"name": f"{model}_completion",
"kind": "llm",
"started_at": datetime.fromtimestamp(start_time, timezone.utc).isoformat(),
"ended_at": datetime.fromtimestamp(end_time, timezone.utc).isoformat(),
"duration_ms": duration_ms,
"status": "ok",
# Session context
"session": {
"id": self.session_id,
"name": "API Client Session"
},
# Core attributes
"attributes": {
"llm.model": model,
"llm.provider": "openai",
"llm.temperature": 0.7,
"llm.max_tokens": 1000,
"llm.streaming": False,
# Token metrics
"llm.input_tokens": response.get("usage", {}).get("prompt_tokens"),
"llm.output_tokens": response.get("usage", {}).get("completion_tokens"),
"llm.total_tokens": response.get("usage", {}).get("total_tokens"),
# Performance metrics
"llm.duration_ms": duration_ms,
"llm.throughput_tokens_per_sec": (
response.get("usage", {}).get("completion_tokens", 0) /
(duration_ms / 1000) if duration_ms > 0 else 0
),
# Response metadata
"llm.finish_reason": response.get("choices", [{}])[0].get("finish_reason"),
"llm.response_id": response.get("id"),
"llm.system_fingerprint": response.get("system_fingerprint")
},
# Tags for filtering
"tags": {
"environment": "production",
"version": "1.0.0",
"user_id": "user_123"
},
# Input/Output
"input_data": json.dumps(messages),
"output_data": response.get("choices", [{}])[0].get("message", {}).get("content", ""),
# Cost calculation (optional - will be calculated server-side if not provided)
"cost": self.calculate_cost(
model,
response.get("usage", {}).get("prompt_tokens", 0),
response.get("usage", {}).get("completion_tokens", 0)
)
}
# Send the span
response = requests.post(
f"{self.base_url}/spans",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json=[span_data]
)
if response.status_code != 200:
raise Exception(f"Failed to send span: {response.text}")
return span_id
def calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
"""Calculate cost based on model and token usage"""
# Example pricing (adjust based on actual pricing)
pricing = {
"gpt-4": {"input": 0.03 / 1000, "output": 0.06 / 1000},
"gpt-3.5-turbo": {"input": 0.001 / 1000, "output": 0.002 / 1000}
}
if model in pricing:
input_cost = input_tokens * pricing[model]["input"]
output_cost = output_tokens * pricing[model]["output"]
return input_cost + output_cost
return 0.0
```
## Span Schema Reference
### Required Fields
| Field | Type | Description |
| ------------ | ----------------- | ------------------------------- |
| `trace_id` | string (UUID) | Unique identifier for the trace |
| `name` | string | Descriptive name for the span |
| `started_at` | ISO 8601 datetime | When the span started |
### Recommended Fields for LLM Spans
| Field | Type | Description |
| ------------- | ----------------- | ------------------------------------------------------- |
| `id` | string (UUID) | Unique span identifier (auto-generated if not provided) |
| `kind` | string | Set to `"llm"` for LLM spans |
| `ended_at` | ISO 8601 datetime | When the span completed |
| `status` | string | `"ok"`, `"error"`, or `"unset"` |
| `input_data` | string | JSON string of input messages |
| `output_data` | string | Generated text response |
| `duration_ms` | number | Total duration in milliseconds |
| `cost` | number | Calculated cost (auto-calculated if not provided) |
### LLM-Specific Attributes
Store these in the `attributes` field:
| Attribute | Type | Description |
| ------------------------------- | ------- | -------------------------------------------- |
| `llm.model` | string | Model identifier (e.g., "gpt-4", "claude-3") |
| `llm.provider` | string | Provider name (e.g., "openai", "anthropic") |
| `llm.temperature` | number | Temperature parameter |
| `llm.max_tokens` | number | Maximum tokens limit |
| `llm.input_tokens` | number | Number of input tokens |
| `llm.output_tokens` | number | Number of output tokens |
| `llm.total_tokens` | number | Total tokens used |
| `llm.streaming` | boolean | Whether response was streamed |
| `llm.ttft_ms` | number | Time to first token (streaming only) |
| `llm.throughput_tokens_per_sec` | number | Token generation rate |
| `llm.finish_reason` | string | Why generation stopped |
| `llm.response_id` | string | Provider's response ID |
| `llm.system_fingerprint` | string | Model version identifier |
### Optional Context Fields
| Field | Type | Description |
| ---------------- | ------------- | --------------------------------------------- |
| `parent_span_id` | string (UUID) | Parent span for nested operations |
| `session` | object | Session context with `id` and optional `name` |
| `tags` | object | Key-value pairs for filtering |
| `signals` | object | Custom signals for alerting |
| `error_message` | string | Error description if status is "error" |
| `error_stack` | string | Stack trace for debugging |
## Best Practices
1. **Always set the `kind` field**: Use `"llm"` for LLM spans to enable specialized features like embeddings and cost tracking.
2. **Include token counts**: These are essential for cost calculation and performance monitoring.
3. **Capture timing metrics**: For streaming responses, track TTFT (time to first token) and throughput.
4. **Use consistent naming**: Follow a pattern like `{model}_completion` or `{provider}_{operation}`.
5. **Add context with tags**: Use tags for environment, version, user ID, etc., to enable powerful filtering.
6. **Handle errors gracefully**: Set status to "error" and include error details in attributes.
7. **Link related spans**: Use `parent_span_id` to create hierarchical traces for complex workflows.
8. **Batch span submissions**: When sending multiple spans, include them in a single API call as an array.
## Examples
### Multi-Step LLM Pipeline
Here's a complete example of tracking a RAG (Retrieval-Augmented Generation) pipeline:
```python
import zeroeval as ze
import time
import json
@ze.span(name="rag_query", kind="generic")
def rag_pipeline(user_query: str) -> dict:
trace_id = ze.get_current_trace()
# Step 1: Query embedding
with ze.span(name="embed_query", kind="llm") as embed_span:
start = time.time()
embedding = create_embedding(user_query)
embed_span.set_attributes({
"llm.model": "text-embedding-3-small",
"llm.provider": "openai",
"llm.input_tokens": len(user_query.split()),
"llm.duration_ms": (time.time() - start) * 1000
})
# Step 2: Vector search
with ze.span(name="vector_search", kind="vector_store") as search_span:
results = vector_db.similarity_search(embedding, k=5)
search_span.set_attributes({
"vector_store.index": "knowledge_base",
"vector_store.k": 5,
"vector_store.results_count": len(results)
})
# Step 3: Rerank results
with ze.span(name="rerank_results", kind="llm") as rerank_span:
reranked = rerank_documents(user_query, results)
rerank_span.set_attributes({
"llm.model": "rerank-english-v2.0",
"llm.provider": "cohere",
"rerank.input_documents": len(results),
"rerank.output_documents": len(reranked)
})
# Step 4: Generate response
with ze.span(name="generate_response", kind="llm") as gen_span:
context = "\n".join([doc.content for doc in reranked[:3]])
messages = [
{"role": "system", "content": f"Use this context to answer: {context}"},
{"role": "user", "content": user_query}
]
response = generate_with_metrics(messages, model="gpt-4")
gen_span.set_attributes({
"llm.context_documents": 3,
"llm.context_length": len(context)
})
return {
"answer": response,
"sources": [doc.metadata for doc in reranked[:3]],
"trace_id": trace_id
}
```
This comprehensive instrumentation provides full visibility into your LLM operations, enabling you to monitor performance, track costs, and debug issues effectively.
## Next Steps
Complete guide to environment variables, initialization parameters, and
runtime configuration options.
Automatic instrumentation for popular LLM libraries without manual code
changes.
For automatic instrumentation of popular LLM libraries, check out our [SDK
integrations](/tracing/sdks/python/integrations) which handle all of this
automatically.
# OpenTelemetry
Source: https://docs.zeroeval.com/tracing/opentelemetry
Send traces to ZeroEval using the OpenTelemetry collector
ZeroEval provides native support for the OpenTelemetry Protocol (OTLP), allowing you to send traces from any OpenTelemetry-instrumented application directly to ZeroEval's API. This guide shows you how to configure the OpenTelemetry collector to export traces to ZeroEval.
## Prerequisites
* A ZeroEval API key (get one from your [workspace settings](https://app.zeroeval.com/settings/api-keys))
* OpenTelemetry collector installed ([installation guide](https://opentelemetry.io/docs/collector/getting-started/))
## Configuration
Create a collector configuration file (`otel-collector-config.yaml`):
```yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
batch:
timeout: 1s
send_batch_size: 1024
# ZeroEval-specific attributes
attributes:
actions:
- key: deployment.environment
value: "production" # or staging, development, etc.
action: upsert
exporters:
otlphttp:
endpoint: https://api.zeroeval.com
headers:
Authorization: "Bearer YOUR_ZEROEVAL_API_KEY"
traces_endpoint: https://api.zeroeval.com/v1/traces
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch, attributes]
exporters: [otlphttp]
```
## Docker Deployment
For containerized deployments, use this Docker Compose configuration:
```yaml
version: '3.8'
services:
otel-collector:
image: otel/opentelemetry-collector-contrib:latest
container_name: otel-collector
command: ["--config=/etc/otel-collector-config.yaml"]
volumes:
- ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
ports:
- "4317:4317" # OTLP gRPC receiver
- "4318:4318" # OTLP HTTP receiver
- "8888:8888" # Prometheus metrics
environment:
- ZEROEVAL_API_KEY=${ZEROEVAL_API_KEY}
restart: unless-stopped
```
## Environment-based Configuration
To avoid hardcoding sensitive information, use environment variables:
```yaml
exporters:
otlphttp:
endpoint: https://api.zeroeval.com
headers:
Authorization: "Bearer ${env:ZEROEVAL_API_KEY}"
traces_endpoint: https://api.zeroeval.com/v1/traces
```
Then set the environment variable:
```bash
export ZEROEVAL_API_KEY="your-api-key-here"
```
## Kubernetes Deployment
For Kubernetes environments, use this ConfigMap and Deployment:
```yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: otel-collector-config
data:
otel-collector-config.yaml: |
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
batch:
timeout: 1s
k8sattributes:
extract:
metadata:
- k8s.namespace.name
- k8s.deployment.name
- k8s.pod.name
exporters:
otlphttp:
endpoint: https://api.zeroeval.com
headers:
Authorization: "Bearer ${env:ZEROEVAL_API_KEY}"
traces_endpoint: https://api.zeroeval.com/v1/traces
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch, k8sattributes]
exporters: [otlphttp]
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: otel-collector
spec:
replicas: 2
selector:
matchLabels:
app: otel-collector
template:
metadata:
labels:
app: otel-collector
spec:
containers:
- name: otel-collector
image: otel/opentelemetry-collector-contrib:latest
args: ["--config=/etc/otel-collector-config.yaml"]
env:
- name: ZEROEVAL_API_KEY
valueFrom:
secretKeyRef:
name: zeroeval-secret
key: api-key
ports:
- containerPort: 4317
name: otlp-grpc
- containerPort: 4318
name: otlp-http
volumeMounts:
- name: config
mountPath: /etc/otel-collector-config.yaml
subPath: otel-collector-config.yaml
volumes:
- name: config
configMap:
name: otel-collector-config
---
apiVersion: v1
kind: Service
metadata:
name: otel-collector
spec:
selector:
app: otel-collector
ports:
- name: otlp-grpc
port: 4317
targetPort: 4317
- name: otlp-http
port: 4318
targetPort: 4318
```
# Quickstart
Source: https://docs.zeroeval.com/tracing/quickstart
Get started with tracing and observability in ZeroEval
### Get your API key
Create an API key from your [Settings → API Keys](https://app.zeroeval.com/settings?section=api-keys) page.
### Install the SDK
Choose your SDK to begin integrating ZeroEval:
For Python applications using frameworks like FastAPI, Django, or Flask
For Node.js, Next.js, and browser-based applications
# Reference
Source: https://docs.zeroeval.com/tracing/reference
Environment variables and configuration parameters for the ZeroEval tracer
Configure the ZeroEval tracer through environment variables, initialization parameters, or runtime methods.
## Environment Variables
Set before importing ZeroEval to configure default behavior.
| Variable | Type | Default | Description |
| -------------------------------- | ------- | ---------------------------- | --------------------------------------- |
| `ZEROEVAL_API_KEY` | string | `""` | API key for authentication |
| `ZEROEVAL_API_URL` | string | `"https://api.zeroeval.com"` | API endpoint URL |
| `ZEROEVAL_WORKSPACE_NAME` | string | `"Personal Workspace"` | Workspace name |
| `ZEROEVAL_SESSION_ID` | string | auto-generated | Session ID for grouping traces |
| `ZEROEVAL_SESSION_NAME` | string | `""` | Human-readable session name |
| `ZEROEVAL_SAMPLING_RATE` | float | `"1.0"` | Sampling rate (0.0-1.0) |
| `ZEROEVAL_DISABLED_INTEGRATIONS` | string | `""` | Comma-separated integrations to disable |
| `ZEROEVAL_DEBUG` | boolean | `"false"` | Enable debug logging |
**Activation:** Set environment variables before importing the SDK.
```bash
export ZEROEVAL_API_KEY="ze_1234567890abcdef"
export ZEROEVAL_SAMPLING_RATE="0.1"
export ZEROEVAL_DEBUG="true"
```
## Initialization Parameters
Configure via `ze.init()` - overrides environment variables.
| Parameter | Type | Default | Description |
| ----------------------- | --------------- | ---------------------------- | -------------------------------- |
| `api_key` | string | `None` | API key for authentication |
| `workspace_name` | string | `"Personal Workspace"` | Workspace name |
| `debug` | boolean | `False` | Enable debug logging with colors |
| `api_url` | string | `"https://api.zeroeval.com"` | API endpoint URL |
| `disabled_integrations` | list\[str] | `None` | Integrations to disable |
| `enabled_integrations` | list\[str] | `None` | Only enable these integrations |
| `setup_otlp` | boolean | `True` | Setup OpenTelemetry OTLP export |
| `service_name` | string | `"zeroeval-app"` | OTLP service name |
| `tags` | dict\[str, str] | `None` | Global tags for all spans |
| `sampling_rate` | float | `None` | Sampling rate (0.0-1.0) |
**Activation:** Pass parameters to `ze.init()`.
```python
ze.init(
api_key="ze_1234567890abcdef",
sampling_rate=0.1,
disabled_integrations=["langchain"],
debug=True
)
```
## Runtime Configuration
Configure after initialization via `ze.tracer.configure()`.
| Parameter | Type | Default | Description |
| ---------------------- | ---------------- | ------- | ------------------------------------ |
| `flush_interval` | float | `1.0` | Flush frequency in seconds |
| `max_spans` | int | `20` | Buffer size before forced flush |
| `collect_code_details` | boolean | `True` | Capture code details in spans |
| `integrations` | dict\[str, bool] | `{}` | Enable/disable specific integrations |
| `sampling_rate` | float | `None` | Sampling rate (0.0-1.0) |
**Activation:** Call `ze.tracer.configure()` anytime after initialization.
```python
ze.tracer.configure(
flush_interval=0.5,
max_spans=100,
sampling_rate=0.05,
integrations={"openai": True, "langchain": False}
)
```
## Available Integrations
| Integration | User-Friendly Name | Auto-Instruments |
| ---------------------- | ------------------ | -------------------- |
| `OpenAIIntegration` | `"openai"` | OpenAI client calls |
| `GeminiIntegration` | `"gemini"` | Google Gemini calls |
| `LangChainIntegration` | `"langchain"` | LangChain components |
| `LangGraphIntegration` | `"langgraph"` | LangGraph workflows |
| `HttpxIntegration` | `"httpx"` | HTTPX requests |
| `VocodeIntegration` | `"vocode"` | Vocode voice SDK |
**Control via:**
* Environment: `ZEROEVAL_DISABLED_INTEGRATIONS="langchain,langgraph"`
* Init: `disabled_integrations=["langchain"]` or `enabled_integrations=["openai"]`
* Runtime: `ze.tracer.configure(integrations={"langchain": False})`
## Configuration Examples
### Production Setup
```python
# High-volume production with sampling
ze.init(
api_key="your_key",
sampling_rate=0.05, # 5% sampling
debug=False,
disabled_integrations=["langchain"]
)
ze.tracer.configure(
flush_interval=0.5, # Faster flushes
max_spans=100 # Larger buffer
)
```
### Development Setup
```python
# Full tracing with debug info
ze.init(
api_key="your_key",
debug=True, # Colored logs
sampling_rate=1.0 # Capture everything
)
```
### Memory-Optimized Setup
```python
# Minimize memory usage
ze.tracer.configure(
max_spans=5, # Small buffer
collect_code_details=False, # No code capture
flush_interval=2.0 # Less frequent flushes
)
```
# Integrations
Source: https://docs.zeroeval.com/tracing/sdks/python/integrations
Automatic instrumentation for popular AI/ML frameworks
The [ZeroEval Python SDK](https://pypi.org/project/zeroeval/) automatically traces intruments the supported integrations, meaning the only thing to do is to initialize the SDK before importing the frameworks you want to trace.
## OpenAI
```python
import zeroeval as ze
ze.init()
import openai
client = openai.OpenAI()
# This call is automatically traced
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}]
)
# Streaming is also automatically traced
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Tell me a story"}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
```
## LangChain
```python
import zeroeval as ze
ze.init()
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
# All components are automatically traced
model = ChatOpenAI()
prompt = ChatPromptTemplate.from_template("Tell me about {topic}")
chain = prompt | model
response = chain.invoke({"topic": "AI"})
```
## LangGraph
```python
import zeroeval as ze
ze.init()
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import HumanMessage
# Define a multi-node graph
workflow = StateGraph(AgentState)
workflow.add_node("reasoning", reasoning_node)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node)
workflow.add_conditional_edges(
"agent",
should_continue,
{"tools": "tools", "end": END}
)
app = workflow.compile()
# Full graph execution is automatically traced
result = app.invoke({"messages": [HumanMessage(content="Help me plan a trip")]})
# Streaming is also supported
for chunk in app.stream({"messages": [HumanMessage(content="Hello")]}):
print(chunk)
```
## LiveKit
The SDK automatically creates traces for LiveKit agents, including events from the following plugins:
* Cartesia (TTS)
* Deepgram (STT)
* OpenAI (LLM)
```python
import zeroeval as ze
ze.init()
from livekit import agents
from livekit.agents import AgentSession, Agent
from livekit.plugins import openai
async def entrypoint(ctx: agents.JobContext):
await ctx.connect()
# All agent sessions are automatically traced
session = AgentSession(
llm=openai.realtime.RealtimeModel(voice="coral")
)
await session.start(
room=ctx.room,
agent=Agent(instructions="You are a helpful voice AI assistant.")
)
# Agent interactions are automatically captured
await session.generate_reply(
instructions="Greet the user and offer your assistance."
)
if __name__ == "__main__":
agents.cli.run_app(agents.WorkerOptions(entrypoint_fnc=entrypoint))
```
Need help? Contact us at [founders@zeroeval.com](mailto:founders@zeroeval.com) or join our [Discord](https://discord.gg/MuExkGMNVz).
# Reference
Source: https://docs.zeroeval.com/tracing/sdks/python/reference
Complete API reference for the Python SDK
## Installation
```bash
pip install zeroeval
```
## Core Functions
### `init()`
Initializes the ZeroEval SDK. Must be called before using any other SDK features.
```python
def init(
api_key: str = None,
workspace_name: str = "Personal Workspace",
debug: bool = False,
api_url: str = "https://api.zeroeval.com"
) -> None
```
**Parameters:**
* `api_key` (str, optional): Your ZeroEval API key. If not provided, uses `ZEROEVAL_API_KEY` environment variable
* `workspace_name` (str, optional): The name of your workspace. Defaults to `"Personal Workspace"`
* `debug` (bool, optional): If True, enables detailed logging for debugging. Can also be enabled by setting `ZEROEVAL_DEBUG=true` environment variable
* `api_url` (str, optional): The URL of the ZeroEval API. Defaults to `"https://api.zeroeval.com"`
**Example:**
```python
import zeroeval as ze
ze.init(
api_key="your-api-key",
workspace_name="My Workspace",
debug=True
)
```
## Decorators
### `@span`
Decorator and context manager for creating spans around code blocks.
```python
@span(
name: str,
session_id: Optional[str] = None,
session: Optional[Union[str, dict[str, str]]] = None,
attributes: Optional[dict[str, Any]] = None,
input_data: Optional[str] = None,
output_data: Optional[str] = None,
tags: Optional[dict[str, str]] = None
)
```
**Parameters:**
* `name` (str): Name of the span
* `session_id` (str, optional): **Deprecated** - Use `session` parameter instead
* `session` (Union\[str, dict], optional): Session information. Can be:
* A string containing the session ID
* A dict with `{"id": "...", "name": "..."}`
* `attributes` (dict, optional): Additional attributes to attach to the span
* `input_data` (str, optional): Manual input data override
* `output_data` (str, optional): Manual output data override
* `tags` (dict, optional): Tags to attach to the span
**Usage as Decorator:**
```python
import zeroeval as ze
@ze.span(name="calculate_sum")
def add_numbers(a: int, b: int) -> int:
return a + b # Parameters and return value automatically captured
# With manual I/O
@ze.span(name="process_data", input_data="manual input", output_data="manual output")
def process():
# Process logic here
pass
# With session
@ze.span(name="user_action", session={"id": "123", "name": "John's Session"})
def user_action():
pass
```
**Usage as Context Manager:**
```python
import zeroeval as ze
with ze.span(name="data_processing") as current_span:
result = process_data()
current_span.set_io(input_data="input", output_data=str(result))
```
### `@experiment`
Decorator that attaches dataset and model information to a function.
```python
@experiment(
dataset: Optional[Dataset] = None,
model: Optional[str] = None
)
```
**Parameters:**
* `dataset` (Dataset, optional): Dataset to use for the experiment
* `model` (str, optional): Model identifier
**Example:**
```python
import zeroeval as ze
dataset = ze.Dataset.pull("my-dataset")
@ze.experiment(dataset=dataset, model="gpt-4")
def my_experiment():
# Experiment logic
pass
```
## Classes
### `Dataset`
A class to represent a named collection of dictionary records.
#### Constructor
```python
Dataset(
name: str,
data: list[dict[str, Any]],
description: Optional[str] = None
)
```
**Parameters:**
* `name` (str): The name of the dataset
* `data` (list\[dict]): A list of dictionaries containing the data
* `description` (str, optional): A description of the dataset
**Example:**
```python
dataset = Dataset(
name="Capitals",
description="Country to capital mapping",
data=[
{"input": "France", "output": "Paris"},
{"input": "Germany", "output": "Berlin"}
]
)
```
#### Methods
##### `push()`
Push the dataset to the backend, creating a new version if it already exists.
```python
def push(self, create_new_version: bool = False) -> Dataset
```
**Parameters:**
* `self`: The Dataset instance
* `create_new_version` (bool, optional): For backward compatibility. This parameter is no longer needed as new versions are automatically created when a dataset name already exists. Defaults to False
**Returns:** Returns self for method chaining
##### `pull()`
Static method to pull a dataset from the backend.
```python
@classmethod
def pull(
cls,
dataset_name: str,
version_number: Optional[int] = None
) -> Dataset
```
**Parameters:**
* `cls`: The Dataset class itself (automatically provided when using `@classmethod`)
* `dataset_name` (str): The name of the dataset to pull from the backend
* `version_number` (int, optional): Specific version number to pull. If not provided, pulls the latest version
**Returns:** A new Dataset instance populated with data from the backend
##### `add_rows()`
Add new rows to the dataset.
```python
def add_rows(self, new_rows: list[dict[str, Any]]) -> None
```
**Parameters:**
* `self`: The Dataset instance
* `new_rows` (list\[dict]): A list of dictionaries representing the rows to add
##### `add_image()`
Add an image to a specific row.
```python
def add_image(
self,
row_index: int,
column_name: str,
image_path: str
) -> None
```
**Parameters:**
* `self`: The Dataset instance
* `row_index` (int): Index of the row to update (0-based)
* `column_name` (str): Name of the column to add the image to
* `image_path` (str): Path to the image file to add
##### `add_audio()`
Add audio to a specific row.
```python
def add_audio(
self,
row_index: int,
column_name: str,
audio_path: str
) -> None
```
**Parameters:**
* `self`: The Dataset instance
* `row_index` (int): Index of the row to update (0-based)
* `column_name` (str): Name of the column to add the audio to
* `audio_path` (str): Path to the audio file to add
##### `add_media_url()`
Add a media URL to a specific row.
```python
def add_media_url(
self,
row_index: int,
column_name: str,
media_url: str,
media_type: str = "image"
) -> None
```
**Parameters:**
* `self`: The Dataset instance
* `row_index` (int): Index of the row to update (0-based)
* `column_name` (str): Name of the column to add the media URL to
* `media_url` (str): URL pointing to the media file
* `media_type` (str, optional): Type of media - "image", "audio", or "video". Defaults to "image"
#### Properties
* `name` (str): The name of the dataset
* `description` (str): The description of the dataset
* `columns` (list\[str]): List of all unique column names
* `data` (list\[dict]): List of the data portion for each row
* `backend_id` (str): The ID in the backend (after pushing)
* `version_id` (str): The version ID in the backend
* `version_number` (int): The version number in the backend
#### Example
```python
import zeroeval as ze
# Create a dataset
dataset = ze.Dataset(
name="Capitals",
description="Country to capital mapping",
data=[
{"input": "France", "output": "Paris"},
{"input": "Germany", "output": "Berlin"}
]
)
# Push to backend
dataset.push()
# Pull from backend
dataset = ze.Dataset.pull("Capitals", version_number=1)
# Add rows
dataset.add_rows([{"input": "Italy", "output": "Rome"}])
# Add multimodal data
dataset.add_image(0, "flag", "flags/france.png")
dataset.add_audio(0, "anthem", "anthems/france.mp3")
dataset.add_media_url(0, "video_url", "https://example.com/video.mp4", "video")
```
### `Experiment`
Represents an experiment that runs a task on a dataset with optional evaluators.
#### Constructor
```python
Experiment(
dataset: Dataset,
task: Callable[[Any], Any],
evaluators: Optional[list[Callable[[Any, Any], Any]]] = None,
name: Optional[str] = None,
description: Optional[str] = None
)
```
**Parameters:**
* `dataset` (Dataset): The dataset to run the experiment on
* `task` (Callable): Function that processes each row and returns output
* `evaluators` (list\[Callable], optional): List of evaluator functions that take (row, output) and return evaluation result
* `name` (str, optional): Name of the experiment. Defaults to task function name
* `description` (str, optional): Description of the experiment. Defaults to task function's docstring
**Example:**
```python
import zeroeval as ze
ze.init()
# Pull dataset
dataset = ze.Dataset.pull("Capitals")
# Define task
def capitalize_task(row):
return row["input"].upper()
# Define evaluator
def exact_match(row, output):
return row["output"].upper() == output
# Create and run experiment
exp = ze.Experiment(
dataset=dataset,
task=capitalize_task,
evaluators=[exact_match],
name="Capital Uppercase Test"
)
results = exp.run()
# Or run task and evaluators separately
results = exp.run_task()
exp.run_evaluators([exact_match], results)
```
#### Methods
##### `run()`
Run the complete experiment (task + evaluators).
```python
def run(
self,
subset: Optional[list[dict]] = None
) -> list[ExperimentResult]
```
**Parameters:**
* `self`: The Experiment instance
* `subset` (list\[dict], optional): Subset of dataset rows to run the experiment on. If None, runs on entire dataset
**Returns:** List of experiment results for each row
##### `run_task()`
Run only the task without evaluators.
```python
def run_task(
self,
subset: Optional[list[dict]] = None,
raise_on_error: bool = False
) -> list[ExperimentResult]
```
**Parameters:**
* `self`: The Experiment instance
* `subset` (list\[dict], optional): Subset of dataset rows to run the task on. If None, runs on entire dataset
* `raise_on_error` (bool, optional): If True, raises exceptions encountered during task execution. If False, captures errors. Defaults to False
**Returns:** List of experiment results for each row
##### `run_evaluators()`
Run evaluators on existing results.
```python
def run_evaluators(
self,
evaluators: Optional[list[Callable[[Any, Any], Any]]] = None,
results: Optional[list[ExperimentResult]] = None
) -> list[ExperimentResult]
```
**Parameters:**
* `self`: The Experiment instance
* `evaluators` (list\[Callable], optional): List of evaluator functions to run. If None, uses evaluators from the Experiment instance
* `results` (list\[ExperimentResult], optional): List of results to evaluate. If None, uses results from the Experiment instance
**Returns:** The evaluated results
### `Span`
Represents a span in the tracing system. Usually created via the `@span` decorator.
#### Methods
##### `set_io()`
Set input and output data for the span.
```python
def set_io(
self,
input_data: Optional[str] = None,
output_data: Optional[str] = None
) -> None
```
**Parameters:**
* `self`: The Span instance
* `input_data` (str, optional): Input data to attach to the span. Will be converted to string if not already
* `output_data` (str, optional): Output data to attach to the span. Will be converted to string if not already
##### `set_tags()`
Set tags on the span.
```python
def set_tags(self, tags: dict[str, str]) -> None
```
**Parameters:**
* `self`: The Span instance
* `tags` (dict\[str, str]): Dictionary of tags to set on the span
##### `set_attributes()`
Set attributes on the span.
```python
def set_attributes(self, attributes: dict[str, Any]) -> None
```
**Parameters:**
* `self`: The Span instance
* `attributes` (dict\[str, Any]): Dictionary of attributes to set on the span
##### `set_error()`
Set error information for the span.
```python
def set_error(
self,
code: str,
message: str,
stack: Optional[str] = None
) -> None
```
**Parameters:**
* `self`: The Span instance
* `code` (str): Error code or exception class name
* `message` (str): Error message
* `stack` (str, optional): Stack trace information
## Context Functions
### `get_current_span()`
Returns the currently active span, if any.
```python
def get_current_span() -> Optional[Span]
```
**Returns:** The currently active Span instance, or None if no span is active
### `get_current_trace()`
Returns the current trace ID.
```python
def get_current_trace() -> Optional[str]
```
**Returns:** The current trace ID, or None if no trace is active
### `get_current_session()`
Returns the current session ID.
```python
def get_current_session() -> Optional[str]
```
**Returns:** The current session ID, or None if no session is active
### `set_tag()`
Sets tags on a span, trace, or session.
```python
def set_tag(
target: Union[Span, str],
tags: dict[str, str]
) -> None
```
**Parameters:**
* `target`: The target to set tags on
* `Span`: Sets tags on the specific span
* `str`: Sets tags on the trace (if valid trace ID) or session (if valid session ID)
* `tags` (dict\[str, str]): Dictionary of tags to set
**Example:**
```python
import zeroeval as ze
# Set tags on current span
current_span = ze.get_current_span()
if current_span:
ze.set_tag(current_span, {"user_id": "12345", "environment": "production"})
# Set tags on trace
trace_id = ze.get_current_trace()
if trace_id:
ze.set_tag(trace_id, {"version": "1.5"})
```
### `set_signal()`
Send a signal to a span, trace, or session.
```python
def set_signal(
target: Union[Span, str],
signals: dict[str, Union[str, bool, int, float]]
) -> bool
```
**Parameters:**
* `target`: The entity to attach signals to
* `Span`: Sends signals to the specific span
* `str`: Sends signals to the trace (if active trace ID) or session
* `signals` (dict): Dictionary of signal names to values
**Returns:** True if signals were sent successfully, False otherwise
**Example:**
```python
import zeroeval as ze
# Send signals to current span
current_span = ze.get_current_span()
if current_span:
ze.set_signal(current_span, {
"accuracy": 0.95,
"is_successful": True,
"error_count": 0
})
# Send signals to trace
trace_id = ze.get_current_trace()
if trace_id:
ze.set_signal(trace_id, {"model_score": 0.85})
```
## CLI Commands
The ZeroEval SDK includes a CLI tool for running experiments and setup.
### `zeroeval run`
Run a Python script containing ZeroEval experiments.
```bash
zeroeval run script.py
```
### `zeroeval setup`
Interactive setup to configure API credentials.
```bash
zeroeval setup
```
## Environment Variables
The SDK uses the following environment variables:
* `ZEROEVAL_API_KEY`: Your ZeroEval API key
* `ZEROEVAL_API_URL`: API endpoint URL (defaults to `https://api.zeroeval.com`)
* `ZEROEVAL_DEBUG`: Set to `true` to enable debug logging
* `ZEROEVAL_DISABLED_INTEGRATIONS`: Comma-separated list of integrations to disable
# Setup
Source: https://docs.zeroeval.com/tracing/sdks/python/setup
Get started with ZeroEval tracing in Python applications
The [ZeroEval Python SDK](https://pypi.org/project/zeroeval/) provides seamless integration with your Python applications through automatic instrumentation and a simple decorator-based API.
## Installation
```bash pip
pip install zeroeval
```
```bash poetry
poetry add zeroeval
```
## Basic Setup
```python
import zeroeval as ze
# Option 1: ZEROEVAL_API_KEY in your environment variable file
ze.init()
# Option 2: Provide API key directly from
# https://app.zeroeval.com/settings?tab=api-keys
ze.init(api_key="YOUR_API_KEY")
```
Run `zeroeval setup` once to save your API key securely to
`~/.config/zeroeval/config.json`
## Patterns
### Decorators
The `@span` decorator is the easiest way to add tracing:
```python
import zeroeval as ze
@ze.span(name="fetch_data")
def fetch_data(user_id: str):
# Function arguments are automatically captured as inputs
# Return values are automatically captured as outputs
return {"user_id": user_id, "name": "John Doe"}
@ze.span(name="process_data", attributes={"version": "1.0"})
def process_data(data: dict):
# Add custom attributes for better filtering
return f"Welcome, {data['name']}!"
```
### Context Manager
For more control over span lifecycles:
```python
import zeroeval as ze
def complex_workflow():
with ze.span(name="data_pipeline") as pipeline_span:
# Fetch stage
with ze.span(name="fetch_stage") as fetch_span:
data = fetch_external_data()
fetch_span.set_io(output_data=str(data))
# Process stage
with ze.span(name="process_stage") as process_span:
processed = transform_data(data)
process_span.set_io(
input_data=str(data),
output_data=str(processed)
)
# Save stage
with ze.span(name="save_stage") as save_span:
result = save_to_database(processed)
save_span.set_io(output_data=f"Saved {result} records")
```
## Advanced Configuration
Fine-tune the tracer behavior:
```python
from zeroeval.observability.tracer import tracer
# Configure tracer settings
tracer.configure(
flush_interval=5.0, # Flush every 5 seconds
max_spans=200, # Buffer up to 200 spans
collect_code_details=True # Capture source code context
)
```
## Context
Access current context information:
```python
# Get the current span
current_span = ze.get_current_span()
# Get the current trace ID
trace_id = ze.get_current_trace()
# Get the current session ID
session_id = ze.get_current_session()
```
## CLI Tooling
The Python SDK includes helpful CLI commands:
```bash
# Save your API key securely
zeroeval setup
# Run scripts with automatic tracing
zeroeval run my_script.py
```
# Integrations
Source: https://docs.zeroeval.com/tracing/sdks/typescript/integrations
Tracing integrations with popular libraries
## OpenAI
```typescript
import { OpenAI } from 'openai';
import * as ze from 'zeroeval';
const openai = ze.wrap(new OpenAI());
const completion = await openai.chat.completions.create({
model: 'gpt-4',
messages: [{ role: 'user', content: 'Hello!' }]
});
const stream = await openai.chat.completions.create({
model: 'gpt-4',
messages: [{ role: 'user', content: 'Tell me a story' }],
stream: true
});
for await (const chunk of stream) {
process.stdout.write(chunk.choices[0]?.delta?.content || '');
}
```
## Vercel AI SDK
```typescript
import * as ai from 'ai';
import { openai } from '@ai-sdk/openai';
import * as ze from 'zeroeval';
const wrappedAI = ze.wrap(ai);
// Text generation
const { text } = await wrappedAI.generateText({
model: openai('gpt-4'),
prompt: 'Write a haiku about coding'
});
// Streaming
const { textStream } = await wrappedAI.streamText({
model: openai('gpt-4'),
messages: [{ role: 'user', content: 'Hello!' }]
});
for await (const delta of textStream) {
process.stdout.write(delta);
}
// Structured output
const { object } = await wrappedAI.generateObject({
model: openai('gpt-4'),
schema: z.object({
name: z.string(),
age: z.number()
}),
prompt: 'Generate a random person'
});
```
## LangChain / LangGraph
```typescript
import {
ZeroEvalCallbackHandler,
setGlobalCallbackHandler
} from 'zeroeval/langchain';
// Set globally, no need to pass on each individual call
setGlobalCallbackHandler(new ZeroEvalCallbackHandler());
// OPTIONAL: alternatively use per-invocation
const handler = new ZeroEvalCallbackHandler();
const result = await chain.invoke(
{ topic: 'AI' },
{ callbacks: [handler] }
);
```
Need help? Check out our [GitHub examples](https://github.com/zeroeval/zeroeval-ts-sdk/tree/main/examples) or reach out on [Discord](https://discord.gg/MuExkGMNVz).
# Reference
Source: https://docs.zeroeval.com/tracing/sdks/typescript/reference
Complete API reference for the TypeScript SDK
## Installation
```bash
npm install zeroeval
```
## Core Functions
### `init()`
Initializes the ZeroEval SDK. Must be called before using any other SDK features.
```typescript
function init(opts?: InitOptions): void
```
#### Parameters
* `opts` (optional): `InitOptions`
* `apiKey` (optional): `string` - Your ZeroEval API key. If not provided, uses `ZEROEVAL_API_KEY` environment variable
* `apiUrl` (optional): `string` - Custom API URL. Defaults to `https://api.zeroeval.com`
* `flushInterval` (optional): `number` - Interval in milliseconds to flush spans
* `maxSpans` (optional): `number` - Maximum number of spans to buffer before flushing
* `collectCodeDetails` (optional): `boolean` - Whether to collect code location details
* `integrations` (optional): `Record` - Enable/disable specific integrations
* `debug` (optional): `boolean` - Enable debug logging
#### Example
```typescript
import * as ze from 'zeroeval';
ze.init({
apiKey: 'your-api-key',
debug: true
});
```
## Wrapper Functions
### `wrap()`
Wraps a supported AI client to automatically trace all API calls.
```typescript
function wrap(client: T): WrappedClient
```
#### Supported Clients
* OpenAI SDK (`openai` package)
* Vercel AI SDK (`ai` package)
#### Examples
```typescript
// OpenAI
import { OpenAI } from 'openai';
import * as ze from 'zeroeval';
const openai = ze.wrap(new OpenAI());
// Vercel AI SDK
import * as ai from 'ai';
import * as ze from 'zeroeval';
const wrappedAI = ze.wrap(ai);
```
## Context Functions
### `getCurrentSpan()`
Returns the currently active span, if any.
```typescript
function getCurrentSpan(): Span | undefined
```
### `getCurrentTrace()`
Returns the current trace ID.
```typescript
function getCurrentTrace(): string | undefined
```
### `getCurrentSession()`
Returns the current session ID.
```typescript
function getCurrentSession(): string | undefined
```
### `setTag()`
Sets tags on a span, trace, or session.
```typescript
function setTag(
target: Span | string | undefined,
tags: Record
): void
```
#### Parameters
* `target`: The target to set tags on
* `Span`: Sets tags on the specific span
* `string`: Sets tags on the trace (if valid trace ID) or session (if valid session ID)
* `undefined`: Sets tags on the current span
* `tags`: Object containing key-value pairs of tags
#### Example
```typescript
// Set tags on current span
ze.setTag(undefined, { user_id: '12345', environment: 'production' });
// Set tags on specific trace
const traceId = ze.getCurrentTrace();
if (traceId) {
ze.setTag(traceId, { feature: 'checkout' });
}
// Set tags on a span object
const span = ze.getCurrentSpan();
if (span) {
ze.setTag(span, { action: 'process_payment' });
}
```
## Spans API
There are two main ways to create spans in the TypeScript SDK:
### `withSpan()`
Wraps a function execution in a span, automatically capturing input/output and timing.
```typescript
function withSpan(
opts: SpanOptions,
fn: () => Promise | T
): Promise | T
```
**Parameters:**
* `opts` (SpanOptions): Configuration for the span
* `name` (string): Name of the span
* `sessionId` (string, optional): Session ID to associate with the span
* `sessionName` (string, optional): Human-readable session name
* `tags` (object, optional): Tags to attach to the span
* `attributes` (object, optional): Additional attributes
* `inputData` (any, optional): Manual input data override
* `outputData` (any, optional): Manual output data override
* `fn` (Function): The function to execute within the span
**Example:**
```typescript
import * as ze from 'zeroeval';
// Basic usage
const result = await ze.withSpan(
{ name: 'fetch-user-data' },
async () => {
const user = await fetchUser(userId);
return user;
}
);
// With session and tags
const data = ze.withSpan(
{
name: 'process-payment',
sessionId: sessionId,
tags: { environment: 'production', version: '1.0' }
},
() => processPayment(amount)
);
```
### `@span` Decorator
Decorator for class methods to automatically create spans. Requires TypeScript with experimental decorators enabled.
```typescript
span(opts: SpanOptions): MethodDecorator
```
**Parameters:**
* `opts` (SpanOptions): Same configuration options as `withSpan()`
**Example:**
```typescript
import * as ze from 'zeroeval';
class UserService {
@ze.span({ name: 'get-user' })
async getUser(id: string): Promise {
// Method implementation
// Input (id) and output (User) are automatically captured
return await db.users.findById(id);
}
@ze.span({
name: 'update-user',
tags: { operation: 'update' }
})
async updateUser(id: string, data: Partial): Promise {
return await db.users.update(id, data);
}
}
```
**Note:** To use decorators, ensure your `tsconfig.json` includes:
```json
{
"compilerOptions": {
"experimentalDecorators": true
}
}
```
## Signals API
### `sendSignal()`
Send a signal to a specific entity.
```typescript
async function sendSignal(
entityType: 'session' | 'trace' | 'span' | 'completion',
entityId: string,
name: string,
value: string | boolean | number,
signalType?: 'boolean' | 'numerical'
): Promise
```
#### Parameters
* `entityType`: Type of entity to attach the signal to
* `entityId`: UUID of the entity
* `name`: Name of the signal
* `value`: Signal value (string, boolean, or number)
* `signalType` (optional): Signal type, auto-detected if not provided
### `sendTraceSignal()`
Send a signal to the current trace.
```typescript
function sendTraceSignal(
name: string,
value: string | boolean | number,
signalType?: 'boolean' | 'numerical'
): void
```
### `sendSessionSignal()`
Send a signal to the current session.
```typescript
function sendSessionSignal(
name: string,
value: string | boolean | number,
signalType?: 'boolean' | 'numerical'
): void
```
### `sendSpanSignal()`
Send a signal to the current span.
```typescript
function sendSpanSignal(
name: string,
value: string | boolean | number,
signalType?: 'boolean' | 'numerical'
): void
```
### `getEntitySignals()`
Retrieve signals for a specific entity.
```typescript
async function getEntitySignals(
entityType: 'session' | 'trace' | 'span' | 'completion',
entityId: string
): Promise
```
## LangChain Integration
### `ZeroEvalCallbackHandler`
A callback handler for integrating with LangChain.
```typescript
class ZeroEvalCallbackHandler extends BaseCallbackHandler
```
#### Constructor
```typescript
constructor(options?: ZeroEvalCallbackHandlerOptions)
```
#### Options
* `debug` (optional): `boolean` - Enable debug logging
* `excludeMetadataProps` (optional): `RegExp` - Pattern for metadata properties to exclude
* `maxConcurrentSpans` (optional): `number` - Maximum concurrent spans. Defaults to 1000
* `spanCleanupIntervalMs` (optional): `number` - Cleanup interval in milliseconds. Defaults to 60000
#### Example
```typescript
import { ZeroEvalCallbackHandler } from 'zeroeval/langchain';
const handler = new ZeroEvalCallbackHandler({
debug: true,
maxConcurrentSpans: 500
});
// Use with LangChain
const chain = new ConversationChain({
callbacks: [handler]
});
```
### `setGlobalCallbackHandler()`
Sets a global callback handler for LangChain.
```typescript
function setGlobalCallbackHandler(handler: ZeroEvalCallbackHandler): void
```
### `getGlobalHandler()`
Gets the current global callback handler.
```typescript
function getGlobalHandler(): BaseCallbackHandler | undefined
```
### `clearGlobalHandler()`
Clears the global callback handler.
```typescript
function clearGlobalHandler(): void
```
## Types
### `InitOptions`
Configuration options for SDK initialization.
```typescript
interface InitOptions {
apiKey?: string;
apiUrl?: string;
workspaceName?: string;
flushInterval?: number;
maxSpans?: number;
collectCodeDetails?: boolean;
integrations?: Record;
debug?: boolean;
}
```
### `SignalCreate`
Structure for creating a new signal.
```typescript
interface SignalCreate {
entity_type: 'session' | 'trace' | 'span' | 'completion';
entity_id: string;
name: string;
value: string | boolean | number;
signal_type?: 'boolean' | 'numerical';
}
```
### `Signal`
Structure representing a signal.
```typescript
interface Signal {
value: string | boolean | number;
type: 'boolean' | 'numerical';
}
```
### `ZeroEvalCallbackHandlerOptions`
Options for the LangChain callback handler.
```typescript
interface ZeroEvalCallbackHandlerOptions {
debug?: boolean;
excludeMetadataProps?: RegExp;
maxConcurrentSpans?: number;
spanCleanupIntervalMs?: number;
}
```
## Environment Variables
The SDK uses the following environment variables:
* `ZEROEVAL_API_KEY`: Your ZeroEval API key
* `ZEROEVAL_API_URL`: API endpoint URL (defaults to `https://api.zeroeval.com`)
* `ZEROEVAL_DEBUG`: Set to `true` to enable debug logging
# Setup
Source: https://docs.zeroeval.com/tracing/sdks/typescript/setup
Get started with ZeroEval tracing in TypeScript and JavaScript applications
The [ZeroEval TypeScript SDK](https://www.npmjs.com/package/zeroeval) provides tracing for Node.js and browser applications through wrapper functions and integration callbacks.
## Installation
```bash npm
npm install zeroeval
```
```bash yarn
yarn add zeroeval
```
```bash pnpm
pnpm add zeroeval
```
## Basic Setup
```ts
import * as ze from 'zeroeval';
// Option 1: ZEROEVAL_API_KEY in your environment variable file
ze.init();
// Option 2: API key
ze.init({ apiKey: 'YOUR_API_KEY' });
// Option 3: With additional configuration
ze.init({
apiKey: 'YOUR_API_KEY',
apiUrl: 'https://api.zeroeval.com', // optional
flushInterval: 10, // seconds
maxSpans: 100,
});
```
## Patterns
The SDK offers two ways to add tracing to your TypeScript/JavaScript code:
### Basic Usage
```ts Function Wrapping
import * as ze from 'zeroeval';
// Wrap synchronous functions
const fetchData = (userId: string) =>
ze.withSpan({ name: 'fetch_data' }, () => ({
userId,
name: 'John Doe'
}));
// Wrap async functions
const processData = async (data: { name: string }) =>
ze.withSpan(
{
name: 'process_data',
attributes: { version: '1.0' }
},
async () => {
const result = await transform(data);
return `Welcome, ${result.name}!`;
}
);
// Complex workflows with nested spans
async function complexWorkflow() {
return ze.withSpan({ name: 'data_pipeline' }, async () => {
const data = await ze.withSpan(
{ name: 'fetch_stage' },
fetchExternalData
);
const processed = await ze.withSpan(
{ name: 'process_stage' },
() => transformData(data)
);
const result = await ze.withSpan(
{ name: 'save_stage' },
() => saveToDatabase(processed)
);
return result;
});
}
```
```ts Decorators
import { span } from 'zeroeval';
class DataService {
@span({
name: 'fetch_user_data',
tags: { service: 'user_api' }
})
async fetchUser(userId: string) {
const response = await fetch(`/api/users/${userId}`);
return response.json();
}
@span({
name: 'process_order',
attributes: { version: '2.0' }
})
processOrder(orderId: string, items: string[]) {
return { orderId, processed: true };
}
}
// TypeScript Configuration Required:
// Add to your tsconfig.json:
// {
// "compilerOptions": {
// "experimentalDecorators": true,
// "emitDecoratorMetadata": true
// }
// }
// When using tsx or ts-node:
// tsx --experimental-decorators your-file.ts
// ts-node --experimental-decorators your-file.ts
```
**Decorators require TypeScript configuration**: Enable `experimentalDecorators` and `emitDecoratorMetadata` in your `tsconfig.json`. When using runtime tools like `tsx` or `ts-node`, pass the `--experimental-decorators` flag.
### Sessions
Group related spans into sessions:
```ts
import { v4 as uuidv4 } from 'uuid';
const sessionId = uuidv4();
async function userJourney(userId: string) {
return ze.withSpan(
{
name: 'user_journey',
sessionId: sessionId,
sessionName: `User ${userId} Session`
},
async () => {
await login(userId);
await browseProducts();
await checkout();
}
);
}
```
## Context
Access current context information:
```ts
import { getCurrentSpan, getCurrentTrace, getCurrentSession } from 'zeroeval';
function myFunction() {
// Get current span
const span = getCurrentSpan();
// Get current trace ID
const traceId = getCurrentTrace();
// Get current session ID
const sessionId = getCurrentSession();
}
```
# Sessions
Source: https://docs.zeroeval.com/tracing/sessions
Group related spans into sessions for better organization and analysis
Sessions provide a powerful way to group related spans together, making it easier to track and analyze complex workflows, user interactions, or multi-step processes. This guide covers everything you need to know about working with sessions.
For complete API documentation, see the [Python SDK Reference](/tracing/sdks/python/reference) or [TypeScript SDK Reference](/tracing/sdks/typescript/reference).
## Creating Sessions
### Basic Session with ID
The simplest way to create a session is by providing a session ID:
```python Python
import uuid
import zeroeval as ze
# Generate a unique session ID
session_id = str(uuid.uuid4())
@ze.span(name="process_request", session=session_id)
def process_request(data):
# This span belongs to the session
return transform_data(data)
```
```typescript TypeScript (Basic)
import { randomUUID } from 'crypto';
import * as ze from 'zeroeval';
// Generate a unique session ID
const sessionId = randomUUID();
function processRequest(data: any) {
return ze.withSpan({
name: "process_request",
sessionId
}, () => {
// This span belongs to the session
return transformData(data);
});
}
```
```typescript TypeScript (Decorators)
import { randomUUID } from 'crypto';
import { span } from 'zeroeval';
// Generate a unique session ID
const sessionId = randomUUID();
class RequestProcessor {
@span({ name: "process_request", sessionId })
processRequest(data: any) {
// This span belongs to the session
return transformData(data);
}
}
```
### Named Sessions
For better organization in the ZeroEval dashboard, you can provide both an ID and a descriptive name:
```python Python
@ze.span(
name="user_interaction",
session={
"id": session_id,
"name": "Customer Support Chat - User #12345"
}
)
def handle_support_chat(user_id, message):
# Process the support request
return generate_response(message)
```
```typescript TypeScript (Basic)
function handleSupportChat(userId: string, message: string) {
return ze.withSpan({
name: "user_interaction",
sessionId: sessionId,
sessionName: "Customer Support Chat - User #12345"
}, () => {
// Process the support request
return generateResponse(message);
});
}
```
```typescript TypeScript (Decorators)
class SupportHandler {
@span({
name: "user_interaction",
sessionId: sessionId,
sessionName: "Customer Support Chat - User #12345"
})
handleSupportChat(userId: string, message: string) {
// Process the support request
return generateResponse(message);
}
}
```
## Session Inheritance
Child spans automatically inherit the session from their parent span:
```python Python
session_info = {
"id": str(uuid.uuid4()),
"name": "Order Processing Pipeline"
}
@ze.span(name="process_order", session=session_info)
def process_order(order_id):
# These nested calls automatically belong to the same session
validate_order(order_id)
charge_payment(order_id)
fulfill_order(order_id)
@ze.span(name="validate_order")
def validate_order(order_id):
# Automatically part of the parent's session
return check_inventory(order_id)
@ze.span(name="charge_payment")
def charge_payment(order_id):
# Also inherits the session
return process_payment(order_id)
```
```typescript TypeScript (Basic)
const sessionInfo = {
id: randomUUID(),
name: "Order Processing Pipeline"
};
function processOrder(orderId: string) {
return ze.withSpan({
name: "process_order",
sessionId: sessionInfo.id,
sessionName: sessionInfo.name
}, () => {
// These nested calls automatically belong to the same session
validateOrder(orderId);
chargePayment(orderId);
fulfillOrder(orderId);
});
}
function validateOrder(orderId: string) {
return ze.withSpan({ name: "validate_order" }, () => {
// Automatically part of the parent's session
return checkInventory(orderId);
});
}
function chargePayment(orderId: string) {
return ze.withSpan({ name: "charge_payment" }, () => {
// Also inherits the session
return processPayment(orderId);
});
}
```
```typescript TypeScript (Decorators)
const sessionInfo = {
id: randomUUID(),
name: "Order Processing Pipeline"
};
class OrderProcessor {
@span({
name: "process_order",
sessionId: sessionInfo.id,
sessionName: sessionInfo.name
})
processOrder(orderId: string) {
// These nested calls automatically belong to the same session
this.validateOrder(orderId);
this.chargePayment(orderId);
this.fulfillOrder(orderId);
}
@span({ name: "validate_order" })
validateOrder(orderId: string) {
// Automatically part of the parent's session
return checkInventory(orderId);
}
@span({ name: "charge_payment" })
chargePayment(orderId: string) {
// Also inherits the session
return processPayment(orderId);
}
fulfillOrder(orderId: string) {
// Not traced
return fulfillOrder(orderId);
}
}
```
## Advanced Session Patterns
### Multi-Agent RAG System
Track complex retrieval-augmented generation workflows with multiple specialized agents:
```python Python
session = {
"id": str(uuid.uuid4()),
"name": "Multi-Agent RAG Pipeline"
}
@ze.span(name="rag_coordinator", session=session)
async def process_query(query):
# Retrieval
docs = await retrieval_agent(query)
# Reranking
ranked = await reranking_agent(query, docs)
# Generation
response = await generation_agent(query, ranked)
return response
@ze.span(name="retrieval_agent")
async def retrieval_agent(query):
# Inherits session from parent
embeddings = await embed(query)
return await vector_search(embeddings)
@ze.span(name="generation_agent")
async def generation_agent(query, context):
return await llm.generate(query, context)
```
```typescript TypeScript (Basic)
const session = {
id: randomUUID(),
name: "Multi-Agent RAG Pipeline"
};
async function processQuery(query: string) {
return ze.withSpan({
name: "rag_coordinator",
sessionId: session.id,
sessionName: session.name
}, async () => {
// Retrieval
const docs = await retrievalAgent(query);
// Reranking
const ranked = await rerankingAgent(query, docs);
// Generation
const response = await generationAgent(query, ranked);
return response;
});
}
async function retrievalAgent(query: string) {
return ze.withSpan({ name: "retrieval_agent" }, async () => {
// Inherits session from parent
const embeddings = await embed(query);
return await vectorSearch(embeddings);
});
}
async function generationAgent(query: string, context: any) {
return ze.withSpan({ name: "generation_agent" }, async () => {
return await llm.generate(query, context);
});
}
```
```typescript TypeScript (Decorators)
const session = {
id: randomUUID(),
name: "Multi-Agent RAG Pipeline"
};
class RAGPipeline {
@span({
name: "rag_coordinator",
sessionId: session.id,
sessionName: session.name
})
async processQuery(query: string) {
// Retrieval
const docs = await this.retrievalAgent(query);
// Reranking
const ranked = await this.rerankingAgent(query, docs);
// Generation
const response = await this.generationAgent(query, ranked);
return response;
}
@span({ name: "retrieval_agent" })
async retrievalAgent(query: string) {
// Inherits session from parent
const embeddings = await embed(query);
return await vectorSearch(embeddings);
}
@span({ name: "generation_agent" })
async generationAgent(query: string, context: any) {
return await llm.generate(query, context);
}
async rerankingAgent(query: string, docs: any[]) {
// Not traced
return await rerank(query, docs);
}
}
```
### Conversational AI Session
Track a complete conversation with an AI assistant:
```python Python
class ChatSession:
def __init__(self, user_id):
self.session = {
"id": f"chat-{user_id}-{uuid.uuid4()}",
"name": f"AI Chat - User {user_id}"
}
self.history = []
@ze.span(name="process_message", session=lambda self: self.session)
async def process_message(self, message):
# Add to history
self.history.append({"role": "user", "content": message})
# Generate response
response = await self.generate_response()
self.history.append({"role": "assistant", "content": response})
return response
@ze.span(name="generate_response", session=lambda self: self.session)
async def generate_response(self):
return await llm.chat(self.history)
```
```typescript TypeScript (Basic)
class ChatSession {
private session: { id: string; name: string };
private history: any[] = [];
constructor(userId: string) {
this.session = {
id: `chat-${userId}-${randomUUID()}`,
name: `AI Chat - User ${userId}`
};
}
async processMessage(message: string) {
return ze.withSpan({
name: "process_message",
sessionId: this.session.id,
sessionName: this.session.name
}, async () => {
// Add to history
this.history.push({ role: "user", content: message });
// Generate response
const response = await this.generateResponse();
this.history.push({ role: "assistant", content: response });
return response;
});
}
async generateResponse() {
return ze.withSpan({
name: "generate_response",
sessionId: this.session.id,
sessionName: this.session.name
}, async () => {
return await llm.chat(this.history);
});
}
}
```
```typescript TypeScript (Decorators)
import { span } from 'zeroeval';
class ChatSession {
private session: { id: string; name: string };
private history: any[] = [];
constructor(userId: string) {
this.session = {
id: `chat-${userId}-${randomUUID()}`,
name: `AI Chat - User ${userId}`
};
}
@span({
name: "process_message",
sessionId: function(this: ChatSession) { return this.session.id; },
sessionName: function(this: ChatSession) { return this.session.name; }
})
async processMessage(message: string) {
// Add to history
this.history.push({ role: "user", content: message });
// Generate response
const response = await this.generateResponse();
this.history.push({ role: "assistant", content: response });
return response;
}
@span({
name: "generate_response",
sessionId: function(this: ChatSession) { return this.session.id; },
sessionName: function(this: ChatSession) { return this.session.name; }
})
async generateResponse() {
return await llm.chat(this.history);
}
}
```
### Batch LLM Processing
Process multiple documents with LLMs in a single session:
```python Python
async def batch_summarize(documents):
session = {
"id": f"batch-{uuid.uuid4()}",
"name": f"Batch Summarization - {len(documents)} docs"
}
@ze.span(name="batch_processor", session=session)
async def process():
summaries = []
for i, doc in enumerate(documents):
with ze.span(name=f"summarize_doc_{i}", session=session) as span:
try:
summary = await llm.summarize(doc)
span.set_io(
input_data=f"Doc: {doc['title']}",
output_data=summary[:100]
)
summaries.append(summary)
except Exception as e:
span.set_error(
code=type(e).__name__,
message=str(e)
)
return summaries
return await process()
```
```typescript TypeScript (Basic)
async function batchSummarize(documents: any[]) {
const session = {
id: `batch-${randomUUID()}`,
name: `Batch Summarization - ${documents.length} docs`
};
return ze.withSpan({
name: "batch_processor",
sessionId: session.id,
sessionName: session.name
}, async () => {
const summaries = [];
for (let i = 0; i < documents.length; i++) {
await ze.withSpan({
name: `summarize_doc_${i}`,
sessionId: session.id,
sessionName: session.name
}, async () => {
try {
const summary = await llm.summarize(documents[i]);
const span = ze.getCurrentSpan();
if (span) {
span.setIO(
`Doc: ${documents[i].title}`,
summary.substring(0, 100)
);
}
summaries.push(summary);
} catch (e: any) {
const span = ze.getCurrentSpan();
if (span) {
span.setError({
code: e.constructor.name,
message: e.message
});
}
}
});
}
return summaries;
});
}
```
```typescript TypeScript (Decorators)
class BatchProcessor {
private session = {
id: `batch-${randomUUID()}`,
name: `Batch Summarization`
};
@span({
name: "batch_processor",
sessionId: function(this: BatchProcessor) { return this.session.id; },
sessionName: function(this: BatchProcessor) { return `${this.session.name} - ${this.documents.length} docs`; }
})
async batchSummarize(documents: any[]) {
const summaries = [];
for (let i = 0; i < documents.length; i++) {
const summary = await this.summarizeDoc(documents[i], i);
if (summary) {
summaries.push(summary);
}
}
return summaries;
}
@span({
name: function(this: BatchProcessor, _doc: any, index: number) { return `summarize_doc_${index}`; },
sessionId: function(this: BatchProcessor) { return this.session.id; },
sessionName: function(this: BatchProcessor) { return this.session.name; }
})
async summarizeDoc(doc: any, index: number) {
try {
const summary = await llm.summarize(doc);
const span = ze.getCurrentSpan();
if (span) {
span.setIO(
`Doc: ${doc.title}`,
summary.substring(0, 100)
);
}
return summary;
} catch (e: any) {
const span = ze.getCurrentSpan();
if (span) {
span.setError({
code: e.constructor.name,
message: e.message
});
}
return null;
}
}
private documents: any[] = [];
}
```
## Context Manager Sessions
You can also use sessions with the context manager pattern:
```python Python
session_info = {
"id": str(uuid.uuid4()),
"name": "Data Pipeline Run"
}
with ze.span(name="etl_pipeline", session=session_info) as pipeline_span:
# Extract phase
with ze.span(name="extract_data") as extract_span:
raw_data = fetch_from_source()
extract_span.set_io(output_data=f"Extracted {len(raw_data)} records")
# Transform phase
with ze.span(name="transform_data") as transform_span:
clean_data = transform_records(raw_data)
transform_span.set_io(
input_data=f"{len(raw_data)} raw records",
output_data=f"{len(clean_data)} clean records"
)
# Load phase
with ze.span(name="load_data") as load_span:
result = save_to_destination(clean_data)
load_span.set_io(output_data=f"Loaded to {result['location']}")
```
```typescript TypeScript (Basic)
const sessionInfo = {
id: randomUUID(),
name: "Data Pipeline Run"
};
await ze.withSpan({
name: "etl_pipeline",
sessionId: sessionInfo.id,
sessionName: sessionInfo.name
}, async () => {
// Extract phase
const rawData = await ze.withSpan({ name: "extract_data" }, async () => {
const data = await fetchFromSource();
const span = ze.getCurrentSpan();
if (span) {
span.setIO(undefined, `Extracted ${data.length} records`);
}
return data;
});
// Transform phase
const cleanData = await ze.withSpan({ name: "transform_data" }, async () => {
const data = transformRecords(rawData);
const span = ze.getCurrentSpan();
if (span) {
span.setIO(
`${rawData.length} raw records`,
`${data.length} clean records`
);
}
return data;
});
// Load phase
await ze.withSpan({ name: "load_data" }, async () => {
const result = await saveToDestination(cleanData);
const span = ze.getCurrentSpan();
if (span) {
span.setIO(undefined, `Loaded to ${result.location}`);
}
});
});
```
```typescript TypeScript (Decorators)
import { span } from 'zeroeval';
import * as ze from 'zeroeval';
class ETLPipeline {
private sessionInfo = {
id: randomUUID(),
name: "Data Pipeline Run"
};
@span({
name: "etl_pipeline",
sessionId: function(this: ETLPipeline) { return this.sessionInfo.id; },
sessionName: function(this: ETLPipeline) { return this.sessionInfo.name; }
})
async runPipeline() {
// Extract phase
const rawData = await this.extractData();
// Transform phase
const cleanData = await this.transformData(rawData);
// Load phase
await this.loadData(cleanData);
}
@span({ name: "extract_data" })
async extractData() {
const data = await fetchFromSource();
const span = ze.getCurrentSpan();
if (span) {
span.setIO(undefined, `Extracted ${data.length} records`);
}
return data;
}
@span({ name: "transform_data" })
transformData(rawData: any[]) {
const data = transformRecords(rawData);
const span = ze.getCurrentSpan();
if (span) {
span.setIO(
`${rawData.length} raw records`,
`${data.length} clean records`
);
}
return data;
}
@span({ name: "load_data" })
async loadData(cleanData: any[]) {
const result = await saveToDestination(cleanData);
const span = ze.getCurrentSpan();
if (span) {
span.setIO(undefined, `Loaded to ${result.location}`);
}
}
}
```
# Signals
Source: https://docs.zeroeval.com/tracing/signals
Capture real-world feedback and metrics to enrich your traces, spans, and sessions.
Signals are any piece of user feedback, behavior, or metric you care about – thumbs-up, a 5-star rating, dwell time, task completion, error rates … you name it. Signals help you understand how your AI system performs in the real world by connecting user outcomes to your traces.
You can attach signals to:
* **Completions** (LLM responses)
* **Spans** (individual operations)
* **Sessions** (user interactions)
* **Traces** (entire request flows)
For complete signals API documentation, see the [Python SDK Reference](/tracing/sdks/python/reference#signals) or [TypeScript SDK Reference](/tracing/sdks/typescript/reference#signals).
## Using signals in code
### With the Python SDK
```python Python
import zeroeval as ze
# Initialize the tracer
ze.init(api_key="your-api-key")
# Start a span and add a signal
with ze.trace("user_query") as span:
# Your AI logic here
response = process_user_query(query)
# Add a signal to the current span
ze.set_signal("user_satisfaction", True)
ze.set_signal("response_quality", 4.5)
ze.set_signal("task_completed", "success")
```
```typescript TypeScript (Basic)
import * as ze from 'zeroeval';
// Initialise the tracer
ze.init({ apiKey: "your-api-key" });
// Start a span and add signals
await ze.withSpan({ name: "user_query" }, async () => {
const response = await processUserQuery(query);
// --- Add signals on the current span ---
await ze.sendSpanSignal("user_satisfaction", true);
await ze.sendSpanSignal("response_quality", 4.5);
// --- Attach to the whole trace / session ---
await ze.sendTraceSignal("task_completed", "success");
await ze.sendSessionSignal("vip_user", true);
});
```
```typescript TypeScript (Decorators)
import { span } from 'zeroeval';
import * as ze from 'zeroeval';
// Initialise the tracer
ze.init({ apiKey: "your-api-key" });
class QueryProcessor {
@span({ name: "user_query" })
async processQuery(query: string) {
const response = await processUserQuery(query);
// --- Add signals on the current span ---
await ze.sendSpanSignal("user_satisfaction", true);
await ze.sendSpanSignal("response_quality", 4.5);
// --- Attach to the whole trace / session ---
await ze.sendTraceSignal("task_completed", "success");
await ze.sendSessionSignal("vip_user", true);
return response;
}
}
```
### Setting signals on different targets
```python Python
# On the current span
ze.set_signal("helpful", True)
# On a specific span
span = ze.current_span()
ze.set_signal(span, {"rating": 5, "category": "excellent"})
# On the current trace
ze.set_trace_signal("conversion", True)
# On the current session
ze.set_session_signal("user_engaged", True)
```
```typescript TypeScript
import * as ze from 'zeroeval';
// On the current span
await ze.sendSpanSignal("helpful", true);
// On a specific span object
const span = ze.getCurrentSpan();
span?.addSignal("rating", 5);
span?.addSignal("category", "excellent");
// On the current trace
await ze.sendTraceSignal("conversion", true);
// On the current session
await ze.sendSessionSignal("user_engaged", true);
```
## API endpoint
For direct API calls, send signals to:
```
POST https://api.zeroeval.com/workspaces//signals
```
Auth is the same bearer API key you use for tracing.
### Payload schema
| field | type | required | notes |
| -------------- | ------------------------------ | -------- | ---------------------------------------------- |
| completion\_id | string | ❌ | **OpenAI completion ID** (for LLM completions) |
| span\_id | string | ❌ | **Span ID** (for specific spans) |
| trace\_id | string | ❌ | **Trace ID** (for entire traces) |
| session\_id | string | ❌ | **Session ID** (for user sessions) |
| name | string | ✅ | e.g. `user_satisfaction` |
| value | string \| bool \| int \| float | ✅ | your data – see examples below |
You must provide at least one of: `completion_id`, `span_id`, `trace_id`, or
`session_id`.
## Common signal patterns
Below are some quick copy-pasta snippets for the most common cases.
### 1. Binary feedback (👍 / 👎)
```python Python SDK
import zeroeval as ze
# On current span
ze.set_signal("thumbs_up", True)
# On specific span
ze.set_signal(span, {"helpful": False})
```
```typescript TypeScript SDK
import * as ze from 'zeroeval';
// Thumbs-up on the current span
await ze.sendSpanSignal("thumbs_up", true);
// Thumbs-down
await ze.sendSpanSignal("thumbs_up", false);
```
```python API
import requests
payload = {
"span_id": span.id,
"name": "thumbs_up",
"value": True // or False
}
requests.post(
f"https://api.zeroeval.com/workspaces/{WORKSPACE_ID}/signals",
json=payload,
headers={"Authorization": f"Bearer {ZE_API_KEY}"}
)
```
### 2. Star rating (1–5)
```python Python SDK
ze.set_signal("star_rating", 4)
```
```typescript TypeScript SDK
import * as ze from 'zeroeval';
// Star rating (1–5) on the current trace
await ze.sendTraceSignal("star_rating", 4);
```
```js JavaScript API
fetch(`https://api.zeroeval.com/workspaces/${WORKSPACE_ID}/signals`, {
method: "POST",
headers: {
Authorization: `Bearer ${ZE_API_KEY}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
trace_id: trace.id,
name: "star_rating",
value: 4, // any integer 1–5
}),
});
```
### 3. Continuous metrics
```python Python SDK
# Response time
ze.set_signal("response_time_ms", 1250.5)
# Task completion time
ze.set_signal("time_on_task_sec", 12.85)
# Accuracy score
ze.set_signal("accuracy", 0.94)
```
```typescript TypeScript SDK
import * as ze from 'zeroeval';
// Response time on the current session
await ze.sendSessionSignal("response_time_ms", 1250.5);
// Accuracy score on current span
await ze.sendSpanSignal("accuracy", 0.94);
```
```python API
payload = {
"session_id": session.id,
"name": "time_on_task_sec",
"value": 12.85 // float works too
}
```
### 4. Categorical outcomes
```python Python SDK
ze.set_signal("task_status", "success")
ze.set_signal("error_type", "timeout")
ze.set_signal("user_intent", "purchase")
```
```typescript TypeScript SDK
import * as ze from 'zeroeval';
// Task status on current span
await ze.sendSpanSignal("task_status", "success");
```
```js JavaScript API
{
completion_id: completion.id,
name: "task_status",
value: "success" // could also be "retry" / "fail"
}
```
### 5. Session-level signals
```python Python
# Track user engagement across an entire session
ze.set_session_signal("pages_visited", 5)
ze.set_session_signal("converted", True)
ze.set_session_signal("user_tier", "premium")
```
```typescript TypeScript
import * as ze from 'zeroeval';
// Track user engagement across the session
await ze.sendSessionSignal("pages_visited", 5);
await ze.sendSessionSignal("converted", true);
await ze.sendSessionSignal("user_tier", "premium");
```
### 6. Trace-level signals
```python Python
# Track outcomes for an entire request flow
ze.set_trace_signal("request_successful", True)
ze.set_trace_signal("total_cost", 0.045)
ze.set_trace_signal("model_used", "gpt-4o")
```
```typescript TypeScript
import * as ze from 'zeroeval';
// Trace-level outcomes
await ze.sendTraceSignal("request_successful", true);
await ze.sendTraceSignal("total_cost", 0.045);
await ze.sendTraceSignal("model_used", "gpt-4o");
```
## Signal types
Signals are automatically categorized based on their values:
* **Boolean**: `true`/`false` values → useful for success/failure, yes/no feedback
* **Numerical**: integers and floats → useful for ratings, scores, durations, costs
* **Categorical**: strings → useful for status, categories, error types
## Putting it all together
```python Python
import zeroeval as ze
# Initialize tracing
ze.init(api_key="your-api-key")
# Start a session for user interaction
with ze.trace("user_chat_session", session_name="Customer Support") as session:
# Process user query
with ze.trace("process_query") as span:
response = llm_client.chat.completions.create(...)
# Signal on the LLM completion
ze.set_signal("response_generated", True)
ze.set_signal("response_length", len(response.choices[0].message.content))
# Capture user feedback
user_rating = get_user_feedback() # Your feedback collection logic
# Signal on the session
ze.set_session_signal("user_rating", user_rating)
ze.set_session_signal("issue_resolved", user_rating >= 4)
# Signal on the entire trace
ze.set_trace_signal("interaction_complete", True)
```
```typescript TypeScript (Basic)
import * as ze from 'zeroeval';
ze.init({ apiKey: "your-api-key" });
// Start a session for user interaction
await ze.withSpan({
name: "user_chat_session",
sessionName: "Customer Support",
}, async () => {
// Process user query
await ze.withSpan({ name: "process_query" }, async () => {
const response = await llmClient.chat.completions.create(...);
// Signal on the LLM completion
await ze.sendSpanSignal("response_generated", true);
await ze.sendSpanSignal("response_length", response.choices[0].message.content.length);
});
// Capture user feedback
const userRating = await getUserFeedback();
// Session-level signals
await ze.sendSessionSignal("user_rating", userRating);
await ze.sendSessionSignal("issue_resolved", userRating >= 4);
// Trace-level signal
await ze.sendTraceSignal("interaction_complete", true);
});
```
```typescript TypeScript (Decorators)
import { span } from 'zeroeval';
import * as ze from 'zeroeval';
ze.init({ apiKey: "your-api-key" });
class SupportChat {
@span({
name: "user_chat_session",
sessionName: "Customer Support"
})
async handleChatSession() {
// Process user query
await this.processQuery();
// Capture user feedback
const userRating = await getUserFeedback();
// Session-level signals
await ze.sendSessionSignal("user_rating", userRating);
await ze.sendSessionSignal("issue_resolved", userRating >= 4);
// Trace-level signal
await ze.sendTraceSignal("interaction_complete", true);
}
@span({ name: "process_query" })
async processQuery() {
const response = await llmClient.chat.completions.create(...);
// Signal on the LLM completion
await ze.sendSpanSignal("response_generated", true);
await ze.sendSpanSignal("response_length", response.choices[0].message.content.length);
return response;
}
}
```
That's it! Your signals will appear in the ZeroEval dashboard, helping you understand how your AI system performs in real-world scenarios.
# Tags
Source: https://docs.zeroeval.com/tracing/tagging
Simple ways to attach rich, query-able tags to your traces.
Tags are key–value pairs that can be attached to any **span**, **trace**, or **session**. They power the facet filters in the console so you can slice-and-dice your telemetry by *user*, *plan*, *model*, *tenant*, or anything else that matters to your business.
For complete tagging API documentation, see the [Python SDK Reference](/tracing/sdks/python/reference#tags) or [TypeScript SDK Reference](/tracing/sdks/typescript/reference#tags).
## 1. Tag once, inherit everywhere
When you add a `tags` dictionary to the **first** span you create, every child span automatically gets the same tags. That means you set them once and they flow down the entire call-stack.
```python Python
import zeroeval as ze
@ze.span(
name="handle_request",
tags={
"user_id": "42", # who triggered the request
"tenant": "acme-corp", # multi-tenant identifier
"plan": "enterprise" # commercial plan
}
)
def handle_request():
authenticate()
fetch_data()
process()
# Two nested child spans – they automatically inherit *all* the tags
with ze.span(name="fetch_data"):
...
with ze.span(name="process", tags={"stage": "post"}):
...
```
```typescript TypeScript (Basic)
import * as ze from 'zeroeval';
function handleRequest() {
ze.withSpan({
name: "handle_request",
tags: {
user_id: "42", // who triggered the request
tenant: "acme-corp", // multi-tenant identifier
plan: "enterprise" // commercial plan
}
}, () => {
authenticate();
fetchData();
process();
});
}
// Two nested child spans – they automatically inherit *all* the tags
function fetchData() {
ze.withSpan({ name: "fetch_data" }, () => {
// ...
});
}
function process() {
ze.withSpan({
name: "process",
tags: { stage: "post" }
}, () => {
// ...
});
}
```
```typescript TypeScript (Decorators)
import { span } from 'zeroeval';
class RequestHandler {
@span({
name: "handle_request",
tags: {
user_id: "42", // who triggered the request
tenant: "acme-corp", // multi-tenant identifier
plan: "enterprise" // commercial plan
}
})
handleRequest() {
this.authenticate();
this.fetchData();
this.process();
}
// Two nested child spans – they automatically inherit *all* the tags
@span({ name: "fetch_data" })
fetchData() {
// ...
}
@span({
name: "process",
tags: { stage: "post" }
})
process() {
// ...
}
authenticate() {
// Not traced
}
}
```
## 2. Tag a single span
If you want to tag only a **single** span (or override a tag inherited from a parent) simply provide the `tags` argument on that specific decorator or context manager.
```python Python
import zeroeval as ze
@ze.span(name="top_level")
def top_level():
# Child span with its own tags – *not* inherited by siblings
with ze.span(name="db_call", tags={"table": "customers", "operation": "SELECT"}):
query_database()
# Another child span without tags – it has no knowledge of the db_call tags
with ze.span(name="render"):
render_template()
```
```typescript TypeScript (Basic)
import * as ze from 'zeroeval';
function topLevel() {
ze.withSpan({ name: "top_level" }, () => {
// Child span with its own tags – *not* inherited by siblings
ze.withSpan({
name: "db_call",
tags: { table: "customers", operation: "SELECT" }
}, () => {
queryDatabase();
});
// Another child span without tags – it has no knowledge of the db_call tags
ze.withSpan({ name: "render" }, () => {
renderTemplate();
});
});
}
```
```typescript TypeScript (Decorators)
import { span } from 'zeroeval';
class DataService {
@span({ name: "top_level" })
topLevel() {
this.performDbCall();
this.renderOutput();
}
@span({
name: "db_call",
tags: { table: "customers", operation: "SELECT" }
})
performDbCall() {
// Child span with its own tags – *not* inherited by siblings
queryDatabase();
}
@span({ name: "render" })
renderOutput() {
// Another child span without tags – it has no knowledge of the db_call tags
renderTemplate();
}
}
```
Under the hood these tags live only on that single span, they are **not** copied to siblings or parents.
## 3. Granular tagging (session, trace, or span)
You can add granular tags at the session, trace, or span level after they've been created:
```python Python
import uuid
from langchain_core.messages import HumanMessage
import zeroeval as ze
DEMO_TAGS = {"example": "langgraph_tags_demo", "project": "zeroeval"}
SESSION_ID = str(uuid.uuid4())
SESSION_INFO = {"id": SESSION_ID, "name": "Tags Demo Session"}
with ze.span(
name="demo.root_invoke",
session=SESSION_INFO,
tags={**DEMO_TAGS, "run": "invoke"},
):
# 1️⃣ Tag the *current* span only
current_span = ze.get_current_span()
ze.set_tag(current_span, {"phase": "pre-run"})
# 2️⃣ Tag the whole trace – root + all children (past *and* future)
current_trace = ze.get_current_trace()
ze.set_tag(current_trace, {"run_mode": "invoke"})
# 3️⃣ Tag the entire session
current_session = ze.get_current_session()
ze.set_tag(current_session, {"env": "local"})
result = app.invoke({"messages": [HumanMessage(content="hello")], "count": 0})
```
```typescript TypeScript (Basic)
import { randomUUID } from 'crypto';
import { HumanMessage } from '@langchain/core/messages';
import * as ze from 'zeroeval';
const DEMO_TAGS = { example: "langgraph_tags_demo", project: "zeroeval" };
const SESSION_ID = randomUUID();
const SESSION_INFO = { id: SESSION_ID, name: "Tags Demo Session" };
ze.withSpan({
name: "demo.root_invoke",
sessionId: SESSION_INFO.id,
sessionName: SESSION_INFO.name,
tags: { ...DEMO_TAGS, run: "invoke" }
}, async () => {
// 1️⃣ Tag the *current* span only
const currentSpan = ze.getCurrentSpan();
if (currentSpan) {
ze.setTag(currentSpan, { phase: "pre-run" });
}
// 2️⃣ Tag the whole trace – root + all children (past *and* future)
const currentTrace = ze.getCurrentTrace();
if (currentTrace) {
ze.setTag(currentTrace, { run_mode: "invoke" });
}
// 3️⃣ Tag the entire session
const currentSession = ze.getCurrentSession();
if (currentSession) {
ze.setTag(currentSession, { env: "local" });
}
const result = await app.invoke({
messages: [new HumanMessage("hello")],
count: 0
});
});
```
```typescript TypeScript (Decorators)
import { randomUUID } from 'crypto';
import { HumanMessage } from '@langchain/core/messages';
import { span } from 'zeroeval';
import * as ze from 'zeroeval';
const DEMO_TAGS = { example: "langgraph_tags_demo", project: "zeroeval" };
class TaggingDemo {
private sessionId = randomUUID();
private sessionInfo = {
id: this.sessionId,
name: "Tags Demo Session"
};
@span({
name: "demo.root_invoke",
sessionId: function(this: TaggingDemo) { return this.sessionInfo.id; },
sessionName: function(this: TaggingDemo) { return this.sessionInfo.name; },
tags: { ...DEMO_TAGS, run: "invoke" }
})
async rootInvoke() {
// 1️⃣ Tag the *current* span only
const currentSpan = ze.getCurrentSpan();
if (currentSpan) {
ze.setTag(currentSpan, { phase: "pre-run" });
}
// 2️⃣ Tag the whole trace – root + all children (past *and* future)
const currentTrace = ze.getCurrentTrace();
if (currentTrace) {
ze.setTag(currentTrace, { run_mode: "invoke" });
}
// 3️⃣ Tag the entire session
const currentSession = ze.getCurrentSession();
if (currentSession) {
ze.setTag(currentSession, { env: "local" });
}
const result = await app.invoke({
messages: [new HumanMessage("hello")],
count: 0
});
return result;
}
}
```