This guide covers how to manually instrument your code to create spans, particularly for LLM operations. You’ll learn how to use both the SDK and direct API calls to send trace data to ZeroEval.

SDK Manual Instrumentation

Basic LLM Span with SDK

The simplest way to create an LLM span is using the SDK’s span decorator or context manager:
import zeroeval as ze
import openai

client = openai.OpenAI()

@ze.span(name="chat_completion", kind="llm")
def generate_response(messages: list) -> str:
"""Create an LLM span with automatic input/output capture"""
response = client.chat.completions.create(
model="gpt-4",
messages=messages,
temperature=0.7
)

    # The SDK automatically captures function arguments as input
    # and return values as output
    return response.choices[0].message.content

Advanced LLM Span with Metrics

For production use, capture comprehensive metrics for better observability:
import zeroeval as ze
import openai
import time
import json

@ze.span(name="chat_completion_advanced", kind="llm")
def generate_with_metrics(messages: list, \*\*kwargs):
"""Create a comprehensive LLM span with all metrics"""

    # Get the current span to add attributes
    span = ze.get_current_span()

    # Track timing
    start_time = time.time()
    first_token_time = None

    # Prepare the request
    model = kwargs.get("model", "gpt-4")
    temperature = kwargs.get("temperature", 0.7)
    max_tokens = kwargs.get("max_tokens", None)

    # Set pre-request attributes
    span.set_attributes({
        "llm.model": model,
        "llm.provider": "openai",
        "llm.temperature": temperature,
        "llm.max_tokens": max_tokens,
        "llm.streaming": kwargs.get("stream", False)
    })

    # Store input messages in the expected format
    span.set_io(input_data=json.dumps([
        {"role": msg["role"], "content": msg["content"]}
        for msg in messages
    ]))

    try:
        client = openai.OpenAI()

        # Handle streaming responses
        if kwargs.get("stream", False):
            stream = client.chat.completions.create(
                model=model,
                messages=messages,
                temperature=temperature,
                max_tokens=max_tokens,
                stream=True
            )

            full_response = ""
            tokens = 0

            for chunk in stream:
                if chunk.choices[0].delta.content:
                    if first_token_time is None:
                        first_token_time = time.time()
                        ttft_ms = (first_token_time - start_time) * 1000
                        span.set_attributes({"llm.ttft_ms": ttft_ms})

                    full_response += chunk.choices[0].delta.content
                    tokens += 1

            # Calculate throughput
            total_time = time.time() - start_time
            span.set_attributes({
                "llm.output_tokens": tokens,
                "llm.throughput_tokens_per_sec": tokens / total_time if total_time > 0 else 0,
                "llm.duration_ms": total_time * 1000
            })

            span.set_io(output_data=full_response)
            return full_response

        else:
            # Non-streaming response
            response = client.chat.completions.create(
                model=model,
                messages=messages,
                temperature=temperature,
                max_tokens=max_tokens
            )

            # Capture all response metadata
            span.set_attributes({
                "llm.input_tokens": response.usage.prompt_tokens,
                "llm.output_tokens": response.usage.completion_tokens,
                "llm.total_tokens": response.usage.total_tokens,
                "llm.finish_reason": response.choices[0].finish_reason,
                "llm.system_fingerprint": response.system_fingerprint,
                "llm.response_id": response.id,
                "llm.duration_ms": (time.time() - start_time) * 1000
            })

            content = response.choices[0].message.content
            span.set_io(output_data=content)

            return content

    except Exception as e:
        # Capture error details
        span.set_status("error")
        span.set_attributes({
            "error.type": type(e).__name__,
            "error.message": str(e)
        })
        raise

Provider-Specific Manual Instrumentation

For users making direct API calls to OpenAI or Gemini without using the SDK’s automatic instrumentation, here are comprehensive guides to properly instrument your calls with cost calculation and conversation formatting.

OpenAI API Manual Instrumentation

When calling the OpenAI API directly (using requests, httpx, or similar), you’ll want to capture all the metrics that the automatic integration would provide:
import requests
import json
import time
import uuid
from datetime import datetime, timezone

class OpenAITracer:
def **init**(self, api_key: str, zeroeval_api_key: str):
self.openai_api_key = api_key
self.zeroeval_api_key = zeroeval_api_key
self.zeroeval_url = "https://api.zeroeval.com/api/v1/spans"

    def chat_completion_with_tracing(self, messages: list, model: str = "gpt-4o", **kwargs):
        """Make OpenAI API call with full ZeroEval instrumentation"""

        # Generate span identifiers
        trace_id = str(uuid.uuid4())
        span_id = str(uuid.uuid4())

        # Track timing
        start_time = time.time()

        # Prepare OpenAI request
        openai_payload = {
            "model": model,
            "messages": messages,
            **kwargs  # temperature, max_tokens, etc.
        }

        # Add stream_options for token usage in streaming calls
        is_streaming = kwargs.get("stream", False)
        if is_streaming and "stream_options" not in kwargs:
            openai_payload["stream_options"] = {"include_usage": True}

        try:
            # Make the OpenAI API call
            response = requests.post(
                "https://api.openai.com/v1/chat/completions",
                headers={
                    "Authorization": f"Bearer {self.openai_api_key}",
                    "Content-Type": "application/json"
                },
                json=openai_payload,
                stream=is_streaming
            )
            response.raise_for_status()

            end_time = time.time()
            duration_ms = (end_time - start_time) * 1000

            if is_streaming:
                # Handle streaming response
                full_response = ""
                input_tokens = 0
                output_tokens = 0
                finish_reason = None
                response_id = None
                system_fingerprint = None
                first_token_time = None

                for line in response.iter_lines():
                    if line:
                        line = line.decode('utf-8')
                        if line.startswith('data: '):
                            data_str = line[6:]
                            if data_str == '[DONE]':
                                break

                            try:
                                data = json.loads(data_str)

                                # Capture first token timing
                                if data.get('choices') and data['choices'][0].get('delta', {}).get('content'):
                                    if first_token_time is None:
                                        first_token_time = time.time()
                                    full_response += data['choices'][0]['delta']['content']

                                # Capture final metadata
                                if 'usage' in data:
                                    input_tokens = data['usage']['prompt_tokens']
                                    output_tokens = data['usage']['completion_tokens']

                                if data.get('choices') and data['choices'][0].get('finish_reason'):
                                    finish_reason = data['choices'][0]['finish_reason']

                                if 'id' in data:
                                    response_id = data['id']

                                if 'system_fingerprint' in data:
                                    system_fingerprint = data['system_fingerprint']

                            except json.JSONDecodeError:
                                continue

                # Send ZeroEval span for streaming
                self._send_span(
                    span_id=span_id,
                    trace_id=trace_id,
                    model=model,
                    messages=messages,
                    response_text=full_response,
                    input_tokens=input_tokens,
                    output_tokens=output_tokens,
                    duration_ms=duration_ms,
                    start_time=start_time,
                    finish_reason=finish_reason,
                    response_id=response_id,
                    system_fingerprint=system_fingerprint,
                    streaming=True,
                    first_token_time=first_token_time,
                    **kwargs
                )

                return full_response

            else:
                # Handle non-streaming response
                response_data = response.json()

                # Extract response details
                content = response_data['choices'][0]['message']['content']
                usage = response_data.get('usage', {})

                # Send ZeroEval span
                self._send_span(
                    span_id=span_id,
                    trace_id=trace_id,
                    model=model,
                    messages=messages,
                    response_text=content,
                    input_tokens=usage.get('prompt_tokens', 0),
                    output_tokens=usage.get('completion_tokens', 0),
                    duration_ms=duration_ms,
                    start_time=start_time,
                    finish_reason=response_data['choices'][0].get('finish_reason'),
                    response_id=response_data.get('id'),
                    system_fingerprint=response_data.get('system_fingerprint'),
                    streaming=False,
                    **kwargs
                )

                return content

        except Exception as e:
            # Send error span
            end_time = time.time()
            duration_ms = (end_time - start_time) * 1000

            self._send_error_span(
                span_id=span_id,
                trace_id=trace_id,
                model=model,
                messages=messages,
                duration_ms=duration_ms,
                start_time=start_time,
                error=e,
                **kwargs
            )
            raise

    def _send_span(self, span_id: str, trace_id: str, model: str, messages: list,
                   response_text: str, input_tokens: int, output_tokens: int,
                   duration_ms: float, start_time: float, finish_reason: str = None,
                   response_id: str = None, system_fingerprint: str = None,
                   streaming: bool = False, first_token_time: float = None, **kwargs):
        """Send successful span to ZeroEval"""

        # Calculate throughput metrics
        throughput = output_tokens / (duration_ms / 1000) if duration_ms > 0 else 0
        ttft_ms = None
        if streaming and first_token_time:
            ttft_ms = (first_token_time - start_time) * 1000

        # Prepare span attributes following ZeroEval's expected format
        attributes = {
            # Core LLM attributes (these are used for cost calculation)
            "provider": "openai",  # Key for cost calculation
            "model": model,        # Key for cost calculation
            "inputTokens": input_tokens,   # Key for cost calculation
            "outputTokens": output_tokens, # Key for cost calculation

            # OpenAI-specific attributes
            "temperature": kwargs.get("temperature"),
            "max_tokens": kwargs.get("max_tokens"),
            "top_p": kwargs.get("top_p"),
            "frequency_penalty": kwargs.get("frequency_penalty"),
            "presence_penalty": kwargs.get("presence_penalty"),
            "streaming": streaming,
            "finish_reason": finish_reason,
            "response_id": response_id,
            "system_fingerprint": system_fingerprint,

            # Performance metrics
            "throughput": throughput,
            "duration_ms": duration_ms,
        }

        if ttft_ms:
            attributes["ttft_ms"] = ttft_ms

        # Clean up None values
        attributes = {k: v for k, v in attributes.items() if v is not None}

        # Format messages for good conversation display
        formatted_messages = self._format_messages_for_display(messages)

        span_data = {
            "id": span_id,
            "trace_id": trace_id,
            "name": f"{model}_completion",
            "kind": "llm",  # Critical: must be "llm" for cost calculation
            "started_at": datetime.fromtimestamp(start_time, timezone.utc).isoformat(),
            "ended_at": datetime.fromtimestamp(start_time + duration_ms/1000, timezone.utc).isoformat(),
            "status": "ok",
            "attributes": attributes,
            "input_data": json.dumps(formatted_messages),
            "output_data": response_text,
            "tags": {
                "provider": "openai",
                "model": model,
                "streaming": str(streaming).lower()
            }
        }

        # Send to ZeroEval
        response = requests.post(
            self.zeroeval_url,
            headers={
                "Authorization": f"Bearer {self.zeroeval_api_key}",
                "Content-Type": "application/json"
            },
            json=[span_data]
        )

        if response.status_code != 200:
            print(f"Warning: Failed to send span to ZeroEval: {response.text}")

    def _send_error_span(self, span_id: str, trace_id: str, model: str,
                        messages: list, duration_ms: float, start_time: float,
                        error: Exception, **kwargs):
        """Send error span to ZeroEval"""

        attributes = {
            "provider": "openai",
            "model": model,
            "temperature": kwargs.get("temperature"),
            "max_tokens": kwargs.get("max_tokens"),
            "streaming": kwargs.get("stream", False),
            "error_type": type(error).__name__,
            "error_message": str(error),
            "duration_ms": duration_ms,
        }

        # Clean up None values
        attributes = {k: v for k, v in attributes.items() if v is not None}

        formatted_messages = self._format_messages_for_display(messages)

        span_data = {
            "id": span_id,
            "trace_id": trace_id,
            "name": f"{model}_completion",
            "kind": "llm",
            "started_at": datetime.fromtimestamp(start_time, timezone.utc).isoformat(),
            "ended_at": datetime.fromtimestamp(start_time + duration_ms/1000, timezone.utc).isoformat(),
            "status": "error",
            "attributes": attributes,
            "input_data": json.dumps(formatted_messages),
            "output_data": "",
            "error_message": str(error),
            "tags": {
                "provider": "openai",
                "model": model,
                "error": "true"
            }
        }

        requests.post(
            self.zeroeval_url,
            headers={
                "Authorization": f"Bearer {self.zeroeval_api_key}",
                "Content-Type": "application/json"
            },
            json=[span_data]
        )

    def _format_messages_for_display(self, messages: list) -> list:
        """Format messages for optimal display in ZeroEval UI"""
        formatted = []
        for msg in messages:
            # Handle both dict and object formats
            if hasattr(msg, 'role'):
                role = msg.role
                content = msg.content
            else:
                role = msg.get('role', 'user')
                content = msg.get('content', '')

            # Handle multimodal content
            if isinstance(content, list):
                # Extract text parts for display
                text_parts = []
                for part in content:
                    if isinstance(part, dict) and part.get('type') == 'text':
                        text_parts.append(part['text'])
                    elif isinstance(part, str):
                        text_parts.append(part)
                content = '\n'.join(text_parts) if text_parts else '[Multimodal content]'

            formatted.append({
                "role": role,
                "content": content
            })

        return formatted

# Usage example

tracer = OpenAITracer(
api_key="your-openai-api-key",
zeroeval_api_key="your-zeroeval-api-key"
)

# Non-streaming call

response = tracer.chat_completion_with_tracing([
{"role": "user", "content": "What is the capital of France?"}
], model="gpt-4o", temperature=0.7)

# Streaming call

response = tracer.chat_completion_with_tracing([
{"role": "user", "content": "Write a short story"}
], model="gpt-4o", stream=True, temperature=0.9)

Gemini API Manual Instrumentation

Gemini has a different API structure with contents instead of messages and different parameter names. Here’s how to instrument Gemini API calls:
import requests
import json
import time
import uuid
from datetime import datetime, timezone

class GeminiTracer:
def **init**(self, api_key: str, zeroeval_api_key: str):
self.gemini_api_key = api_key
self.zeroeval_api_key = zeroeval_api_key
self.zeroeval_url = "https://api.zeroeval.com/api/v1/spans"

    def generate_content_with_tracing(self, messages: list, model: str = "gemini-1.5-flash", **kwargs):
        """Make Gemini API call with full ZeroEval instrumentation"""

        trace_id = str(uuid.uuid4())
        span_id = str(uuid.uuid4())
        start_time = time.time()

        # Convert OpenAI-style messages to Gemini contents format
        contents, system_instruction = self._convert_messages_to_contents(messages)

        # Prepare Gemini request payload
        gemini_payload = {
            "contents": contents
        }

        # Add generation config
        generation_config = {}
        if kwargs.get("temperature") is not None:
            generation_config["temperature"] = kwargs["temperature"]
        if kwargs.get("max_tokens"):
            generation_config["maxOutputTokens"] = kwargs["max_tokens"]
        if kwargs.get("top_p") is not None:
            generation_config["topP"] = kwargs["top_p"]
        if kwargs.get("top_k") is not None:
            generation_config["topK"] = kwargs["top_k"]
        if kwargs.get("stop"):
            stop = kwargs["stop"]
            generation_config["stopSequences"] = stop if isinstance(stop, list) else [stop]

        if generation_config:
            gemini_payload["generationConfig"] = generation_config

        # Add system instruction if present
        if system_instruction:
            gemini_payload["systemInstruction"] = {"parts": [{"text": system_instruction}]}

        # Add tools if provided
        if kwargs.get("tools"):
            gemini_payload["tools"] = kwargs["tools"]
            if kwargs.get("tool_choice"):
                gemini_payload["toolConfig"] = {
                    "functionCallingConfig": {"mode": kwargs["tool_choice"]}
                }

        # Choose endpoint based on streaming
        is_streaming = kwargs.get("stream", False)
        endpoint = "streamGenerateContent" if is_streaming else "generateContent"
        url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:{endpoint}"

        try:
            response = requests.post(
                url,
                headers={
                    "x-goog-api-key": self.gemini_api_key,
                    "Content-Type": "application/json"
                },
                json=gemini_payload,
                stream=is_streaming
            )
            response.raise_for_status()

            end_time = time.time()
            duration_ms = (end_time - start_time) * 1000

            if is_streaming:
                # Handle streaming response
                full_response = ""
                input_tokens = 0
                output_tokens = 0
                finish_reason = None
                model_version = None
                first_token_time = None

                for line in response.iter_lines():
                    if line:
                        try:
                            # Gemini streaming sends JSON objects separated by newlines
                            data = json.loads(line.decode('utf-8'))

                            if 'candidates' in data and data['candidates']:
                                candidate = data['candidates'][0]

                                # Extract content
                                if 'content' in candidate and 'parts' in candidate['content']:
                                    for part in candidate['content']['parts']:
                                        if 'text' in part:
                                            if first_token_time is None:
                                                first_token_time = time.time()
                                            full_response += part['text']

                                # Extract finish reason
                                if 'finishReason' in candidate:
                                    finish_reason = candidate['finishReason']

                            # Extract usage metadata (usually in final chunk)
                            if 'usageMetadata' in data:
                                usage = data['usageMetadata']
                                input_tokens = usage.get('promptTokenCount', 0)
                                output_tokens = usage.get('candidatesTokenCount', 0)

                            # Extract model version
                            if 'modelVersion' in data:
                                model_version = data['modelVersion']

                        except json.JSONDecodeError:
                            continue

                self._send_span(
                    span_id=span_id, trace_id=trace_id, model=model,
                    original_messages=messages, response_text=full_response,
                    input_tokens=input_tokens, output_tokens=output_tokens,
                    duration_ms=duration_ms, start_time=start_time,
                    finish_reason=finish_reason, model_version=model_version,
                    streaming=True, first_token_time=first_token_time,
                    **kwargs
                )

                return full_response

            else:
                # Handle non-streaming response
                response_data = response.json()

                # Extract response content
                content = ""
                if 'candidates' in response_data and response_data['candidates']:
                    candidate = response_data['candidates'][0]
                    if 'content' in candidate and 'parts' in candidate['content']:
                        content_parts = []
                        for part in candidate['content']['parts']:
                            if 'text' in part:
                                content_parts.append(part['text'])
                        content = ''.join(content_parts)

                # Extract usage
                usage = response_data.get('usageMetadata', {})
                input_tokens = usage.get('promptTokenCount', 0)
                output_tokens = usage.get('candidatesTokenCount', 0)

                # Extract other metadata
                finish_reason = None
                if 'candidates' in response_data and response_data['candidates']:
                    finish_reason = response_data['candidates'][0].get('finishReason')

                model_version = response_data.get('modelVersion')

                self._send_span(
                    span_id=span_id, trace_id=trace_id, model=model,
                    original_messages=messages, response_text=content,
                    input_tokens=input_tokens, output_tokens=output_tokens,
                    duration_ms=duration_ms, start_time=start_time,
                    finish_reason=finish_reason, model_version=model_version,
                    streaming=False, **kwargs
                )

                return content

        except Exception as e:
            end_time = time.time()
            duration_ms = (end_time - start_time) * 1000

            self._send_error_span(
                span_id=span_id, trace_id=trace_id, model=model,
                original_messages=messages, duration_ms=duration_ms,
                start_time=start_time, error=e, **kwargs
            )
            raise

    def _convert_messages_to_contents(self, messages: list) -> tuple:
        """Convert OpenAI-style messages to Gemini contents format"""
        contents = []
        system_instruction = None

        for msg in messages:
            role = msg.get('role', 'user') if isinstance(msg, dict) else msg.role
            content = msg.get('content', '') if isinstance(msg, dict) else msg.content

            if role == 'system':
                # Collect system instructions
                if system_instruction:
                    system_instruction += f"\n{content}"
                else:
                    system_instruction = content
                continue

            # Convert content to parts
            if isinstance(content, list):
                # Handle multimodal content
                parts = []
                for item in content:
                    if isinstance(item, dict) and item.get('type') == 'text':
                        parts.append({"text": item['text']})
                    # Add support for images, etc. if needed
            else:
                parts = [{"text": str(content)}]

            # Convert role
            gemini_role = "user" if role == "user" else "model"
            contents.append({"role": gemini_role, "parts": parts})

        return contents, system_instruction

    def _send_span(self, span_id: str, trace_id: str, model: str,
                   original_messages: list, response_text: str,
                   input_tokens: int, output_tokens: int, duration_ms: float,
                   start_time: float, finish_reason: str = None,
                   model_version: str = None, streaming: bool = False,
                   first_token_time: float = None, **kwargs):
        """Send successful span to ZeroEval"""

        # Calculate performance metrics
        throughput = output_tokens / (duration_ms / 1000) if duration_ms > 0 else 0
        ttft_ms = None
        if streaming and first_token_time:
            ttft_ms = (first_token_time - start_time) * 1000

        # Prepare attributes following ZeroEval's expected format
        attributes = {
            # Core attributes for cost calculation (use provider naming)
            "provider": "gemini",     # Key for cost calculation
            "model": model,           # Key for cost calculation
            "inputTokens": input_tokens,   # Key for cost calculation
            "outputTokens": output_tokens, # Key for cost calculation

            # Gemini-specific attributes
            "temperature": kwargs.get("temperature"),
            "max_tokens": kwargs.get("max_tokens"),  # maxOutputTokens
            "top_p": kwargs.get("top_p"),
            "top_k": kwargs.get("top_k"),
            "stop_sequences": kwargs.get("stop"),
            "streaming": streaming,
            "finish_reason": finish_reason,
            "model_version": model_version,

            # Performance metrics
            "throughput": throughput,
            "duration_ms": duration_ms,
        }

        if ttft_ms:
            attributes["ttft_ms"] = ttft_ms

        # Include tool information if present
        if kwargs.get("tools"):
            attributes["tools_count"] = len(kwargs["tools"])
            attributes["tool_choice"] = kwargs.get("tool_choice")

        # Clean up None values
        attributes = {k: v for k, v in attributes.items() if v is not None}

        # Format original messages for display (convert back to OpenAI format for consistency)
        formatted_messages = self._format_messages_for_display(original_messages)

        span_data = {
            "id": span_id,
            "trace_id": trace_id,
            "name": f"{model}_completion",
            "kind": "llm",  # Critical: must be "llm" for cost calculation
            "started_at": datetime.fromtimestamp(start_time, timezone.utc).isoformat(),
            "ended_at": datetime.fromtimestamp(start_time + duration_ms/1000, timezone.utc).isoformat(),
            "status": "ok",
            "attributes": attributes,
            "input_data": json.dumps(formatted_messages),
            "output_data": response_text,
            "tags": {
                "provider": "gemini",
                "model": model,
                "streaming": str(streaming).lower()
            }
        }

        # Send to ZeroEval
        response = requests.post(
            self.zeroeval_url,
            headers={
                "Authorization": f"Bearer {self.zeroeval_api_key}",
                "Content-Type": "application/json"
            },
            json=[span_data]
        )

        if response.status_code != 200:
            print(f"Warning: Failed to send span to ZeroEval: {response.text}")

    def _send_error_span(self, span_id: str, trace_id: str, model: str,
                        original_messages: list, duration_ms: float,
                        start_time: float, error: Exception, **kwargs):
        """Send error span to ZeroEval"""

        attributes = {
            "provider": "gemini",
            "model": model,
            "temperature": kwargs.get("temperature"),
            "max_tokens": kwargs.get("max_tokens"),
            "streaming": kwargs.get("stream", False),
            "error_type": type(error).__name__,
            "error_message": str(error),
            "duration_ms": duration_ms,
        }

        # Clean up None values
        attributes = {k: v for k, v in attributes.items() if v is not None}

        formatted_messages = self._format_messages_for_display(original_messages)

        span_data = {
            "id": span_id,
            "trace_id": trace_id,
            "name": f"{model}_completion",
            "kind": "llm",
            "started_at": datetime.fromtimestamp(start_time, timezone.utc).isoformat(),
            "ended_at": datetime.fromtimestamp(start_time + duration_ms/1000, timezone.utc).isoformat(),
            "status": "error",
            "attributes": attributes,
            "input_data": json.dumps(formatted_messages),
            "output_data": "",
            "error_message": str(error),
            "tags": {
                "provider": "gemini",
                "model": model,
                "error": "true"
            }
        }

        requests.post(
            self.zeroeval_url,
            headers={
                "Authorization": f"Bearer {self.zeroeval_api_key}",
                "Content-Type": "application/json"
            },
            json=[span_data]
        )

    def _format_messages_for_display(self, messages: list) -> list:
        """Format messages for optimal display in ZeroEval UI"""
        formatted = []
        for msg in messages:
            if hasattr(msg, 'role'):
                role = msg.role
                content = msg.content
            else:
                role = msg.get('role', 'user')
                content = msg.get('content', '')

            # Handle multimodal content
            if isinstance(content, list):
                text_parts = []
                for part in content:
                    if isinstance(part, dict) and part.get('type') == 'text':
                        text_parts.append(part['text'])
                    elif isinstance(part, str):
                        text_parts.append(part)
                content = '\n'.join(text_parts) if text_parts else '[Multimodal content]'

            formatted.append({
                "role": role,
                "content": content
            })

        return formatted

# Usage example

tracer = GeminiTracer(
api_key="your-gemini-api-key",
zeroeval_api_key="your-zeroeval-api-key"
)

# Non-streaming call

response = tracer.generate_content_with_tracing([
{"role": "user", "content": "What is the capital of France?"}
], model="gemini-1.5-flash", temperature=0.7)

# Streaming call

response = tracer.generate_content_with_tracing([
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Write a short story"}
], model="gemini-1.5-flash", stream=True, temperature=0.9)

Key Attributes for Cost Calculation

For accurate cost calculation, ZeroEval requires these specific attributes in your span:
AttributeRequiredDescriptionExample Values
providerProvider identifier for pricing lookup"openai", "gemini", "anthropic"
modelModel identifier for pricing lookup"gpt-4o", "gemini-1.5-flash"
inputTokensNumber of input tokens consumed150
outputTokensNumber of output tokens generated75
kindMust be set to "llm""llm"
Cost Calculation Process:
  1. ZeroEval looks up pricing in the provider_models table using provider and model
  2. Calculates: (inputTokens × inputPrice + outputTokens × outputPrice) / 1,000,000
  3. Stores the result in the span’s cost field
  4. Cost is displayed in cents, automatically converted to dollars in the UI
Current Supported Models for Cost Calculation:
  • OpenAI: gpt-4o, gpt-4o-mini, gpt-4-turbo, gpt-3.5-turbo
  • Gemini: gemini-1.5-flash, gemini-1.5-pro, gemini-1.0-pro
  • Anthropic: claude-3-5-sonnet, claude-3-haiku, claude-3-opus
If your model isn’t listed, the cost will be 0 and you’ll see a warning in the logs. Contact support to add pricing for new models.

Conversation Formatting Best Practices

To ensure your conversations display properly in the ZeroEval UI, follow these formatting guidelines:
def format_messages_for_zeroeval(messages: list) -> list:
    """Format messages for optimal display in ZeroEval UI"""
    formatted = []

    for msg in messages:
        # Handle both dict and object formats
        if hasattr(msg, 'role'):
            role = msg.role
            content = msg.content
        else:
            role = msg.get('role', 'user')
            content = msg.get('content', '')

        # Standardize role names
        if role in ['assistant', 'bot', 'ai']:
            role = 'assistant'
        elif role in ['human', 'user']:
            role = 'user'
        elif role == 'system':
            role = 'system'

        # Handle multimodal content - extract text for display
        if isinstance(content, list):
            text_parts = []
            for part in content:
                if isinstance(part, dict):
                    if part.get('type') == 'text':
                        text_parts.append(part['text'])
                    elif part.get('type') == 'image_url':
                        text_parts.append(f"[Image: {part.get('image_url', {}).get('url', 'Unknown')}]")
                elif isinstance(part, str):
                    text_parts.append(part)

            # Join text parts with newlines for readability
            content = '\n'.join(text_parts) if text_parts else '[Multimodal content]'

        # Ensure content is a string
        if not isinstance(content, str):
            content = str(content)

        # Trim excessive whitespace but preserve meaningful formatting
        content = content.strip()

        formatted.append({
            "role": role,
            "content": content
        })

    return formatted

# Usage in span creation
span_data = {
    "input_data": json.dumps(format_messages_for_zeroeval(original_messages)),
    "output_data": response_text.strip(),  # Clean response text too
    # ... other fields
}
Key Formatting Rules:
  1. Standardize Role Names: Use "user", "assistant", and "system" consistently
  2. Handle Multimodal Content: Extract text content and add descriptive placeholders for non-text elements
  3. Clean Whitespace: Trim excessive whitespace while preserving intentional formatting
  4. Ensure String Types: Convert all content to strings to avoid serialization issues
  5. Preserve Conversation Flow: Maintain the original message order and context
UI Display Features:
  • Message Bubbles: Conversations appear as chat bubbles with clear role distinction
  • Token Counts: Hover over messages to see token usage breakdown
  • Copy Functionality: Users can copy individual messages or entire conversations
  • Search: Well-formatted messages are easily searchable within traces
  • Export: Clean formatting ensures readable exports to various formats
Common Formatting Issues to Avoid:
  • ❌ Mixed role naming (bot vs assistant)
  • ❌ Nested objects in content fields
  • ❌ Excessive line breaks or whitespace
  • ❌ Empty or null content fields
  • ❌ Non-string data types in content
Pro Tips:
  • Keep system messages concise but informative
  • Use consistent formatting across your application
  • Include relevant context in message content for better debugging
  • Consider truncating very long messages (>10k characters) with ellipsis

Creating Child Spans

Create nested spans to track sub-operations within an LLM call:
import zeroeval as ze

@ze.span(name="rag_pipeline", kind="generic")
def answer_with_context(question: str) -> str:
    # Retrieval step
    with ze.span(name="retrieve_context", kind="vector_store") as retrieval_span:
        context = vector_db.search(question, k=5)
        retrieval_span.set_attributes({
            "vector_store.query": question,
            "vector_store.k": 5,
            "vector_store.results": len(context)
        })

    # LLM generation step
    with ze.span(name="generate_answer", kind="llm") as llm_span:
        messages = [
            {"role": "system", "content": f"Context: {context}"},
            {"role": "user", "content": question}
        ]

        response = generate_response(messages)

        llm_span.set_attributes({
            "llm.model": "gpt-4",
            "llm.context_length": len(str(context))
        })

        return response

Direct API Instrumentation

If you prefer to send spans directly to the API without using an SDK, here’s how to do it:

API Authentication

First, obtain an API key from your Settings → API Keys page. Include the API key in your request headers:
Authorization: Bearer YOUR_API_KEY

Basic Span Creation

Send a POST request to /api/v1/spans with your span data:
curl -X POST https://api.zeroeval.com/api/v1/spans \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '[{
    "id": "550e8400-e29b-41d4-a716-446655440000",
    "trace_id": "550e8400-e29b-41d4-a716-446655440001",
    "name": "chat_completion",
    "kind": "llm",
    "started_at": "2024-01-15T10:30:00Z",
    "ended_at": "2024-01-15T10:30:02Z",
    "status": "ok",
    "attributes": {
      "llm.model": "gpt-4",
      "llm.provider": "openai",
      "llm.temperature": 0.7,
      "llm.input_tokens": 150,
      "llm.output_tokens": 230,
      "llm.total_tokens": 380
    },
    "input_data": "[{\"role\": \"user\", \"content\": \"What is the capital of France?\"}]",
    "output_data": "The capital of France is Paris."
  }]'

Complete LLM Span with Session

Create a full trace with session context:
import requests
import json
from datetime import datetime, timezone
import uuid
import time

class ZeroEvalClient:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.zeroeval.com/api/v1"
        self.session_id = str(uuid.uuid4())

    def create_llm_span(
        self,
        messages: list,
        response: dict,
        model: str = "gpt-4",
        trace_id: str = None,
        parent_span_id: str = None,
        start_time: float = None,
        end_time: float = None
    ):
        """Create a comprehensive LLM span with all metadata"""

        if not trace_id:
            trace_id = str(uuid.uuid4())

        if not start_time:
            start_time = time.time()
        if not end_time:
            end_time = time.time()

        span_id = str(uuid.uuid4())

        # Calculate duration
        duration_ms = (end_time - start_time) * 1000

        # Prepare comprehensive span data
        span_data = {
            "id": span_id,
            "trace_id": trace_id,
            "parent_span_id": parent_span_id,
            "name": f"{model}_completion",
            "kind": "llm",
            "started_at": datetime.fromtimestamp(start_time, timezone.utc).isoformat(),
            "ended_at": datetime.fromtimestamp(end_time, timezone.utc).isoformat(),
            "duration_ms": duration_ms,
            "status": "ok",

            # Session context
            "session": {
                "id": self.session_id,
                "name": "API Client Session"
            },

            # Core attributes
            "attributes": {
                "llm.model": model,
                "llm.provider": "openai",
                "llm.temperature": 0.7,
                "llm.max_tokens": 1000,
                "llm.streaming": False,

                # Token metrics
                "llm.input_tokens": response.get("usage", {}).get("prompt_tokens"),
                "llm.output_tokens": response.get("usage", {}).get("completion_tokens"),
                "llm.total_tokens": response.get("usage", {}).get("total_tokens"),

                # Performance metrics
                "llm.duration_ms": duration_ms,
                "llm.throughput_tokens_per_sec": (
                    response.get("usage", {}).get("completion_tokens", 0) /
                    (duration_ms / 1000) if duration_ms > 0 else 0
                ),

                # Response metadata
                "llm.finish_reason": response.get("choices", [{}])[0].get("finish_reason"),
                "llm.response_id": response.get("id"),
                "llm.system_fingerprint": response.get("system_fingerprint")
            },

            # Tags for filtering
            "tags": {
                "environment": "production",
                "version": "1.0.0",
                "user_id": "user_123"
            },

            # Input/Output
            "input_data": json.dumps(messages),
            "output_data": response.get("choices", [{}])[0].get("message", {}).get("content", ""),

            # Cost calculation (optional - will be calculated server-side if not provided)
            "cost": self.calculate_cost(
                model,
                response.get("usage", {}).get("prompt_tokens", 0),
                response.get("usage", {}).get("completion_tokens", 0)
            )
        }

        # Send the span
        response = requests.post(
            f"{self.base_url}/spans",
            headers={
                "Authorization": f"Bearer {self.api_key}",
                "Content-Type": "application/json"
            },
            json=[span_data]
        )

        if response.status_code != 200:
            raise Exception(f"Failed to send span: {response.text}")

        return span_id

    def calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
        """Calculate cost based on model and token usage"""
        # Example pricing (adjust based on actual pricing)
        pricing = {
            "gpt-4": {"input": 0.03 / 1000, "output": 0.06 / 1000},
            "gpt-3.5-turbo": {"input": 0.001 / 1000, "output": 0.002 / 1000}
        }

        if model in pricing:
            input_cost = input_tokens * pricing[model]["input"]
            output_cost = output_tokens * pricing[model]["output"]
            return input_cost + output_cost

        return 0.0

Span Schema Reference

Required Fields

FieldTypeDescription
trace_idstring (UUID)Unique identifier for the trace
namestringDescriptive name for the span
started_atISO 8601 datetimeWhen the span started
FieldTypeDescription
idstring (UUID)Unique span identifier (auto-generated if not provided)
kindstringSet to "llm" for LLM spans
ended_atISO 8601 datetimeWhen the span completed
statusstring"ok", "error", or "unset"
input_datastringJSON string of input messages
output_datastringGenerated text response
duration_msnumberTotal duration in milliseconds
costnumberCalculated cost (auto-calculated if not provided)

LLM-Specific Attributes

Store these in the attributes field:
AttributeTypeDescription
llm.modelstringModel identifier (e.g., “gpt-4”, “claude-3”)
llm.providerstringProvider name (e.g., “openai”, “anthropic”)
llm.temperaturenumberTemperature parameter
llm.max_tokensnumberMaximum tokens limit
llm.input_tokensnumberNumber of input tokens
llm.output_tokensnumberNumber of output tokens
llm.total_tokensnumberTotal tokens used
llm.streamingbooleanWhether response was streamed
llm.ttft_msnumberTime to first token (streaming only)
llm.throughput_tokens_per_secnumberToken generation rate
llm.finish_reasonstringWhy generation stopped
llm.response_idstringProvider’s response ID
llm.system_fingerprintstringModel version identifier

Optional Context Fields

FieldTypeDescription
parent_span_idstring (UUID)Parent span for nested operations
sessionobjectSession context with id and optional name
tagsobjectKey-value pairs for filtering
signalsobjectCustom signals for alerting
error_messagestringError description if status is “error”
error_stackstringStack trace for debugging

Best Practices

  1. Always set the kind field: Use "llm" for LLM spans to enable specialized features like embeddings and cost tracking.
  2. Include token counts: These are essential for cost calculation and performance monitoring.
  3. Capture timing metrics: For streaming responses, track TTFT (time to first token) and throughput.
  4. Use consistent naming: Follow a pattern like {model}_completion or {provider}_{operation}.
  5. Add context with tags: Use tags for environment, version, user ID, etc., to enable powerful filtering.
  6. Handle errors gracefully: Set status to “error” and include error details in attributes.
  7. Link related spans: Use parent_span_id to create hierarchical traces for complex workflows.
  8. Batch span submissions: When sending multiple spans, include them in a single API call as an array.

Examples

Multi-Step LLM Pipeline

Here’s a complete example of tracking a RAG (Retrieval-Augmented Generation) pipeline:
import zeroeval as ze
import time
import json

@ze.span(name="rag_query", kind="generic")
def rag_pipeline(user_query: str) -> dict:
    trace_id = ze.get_current_trace()

    # Step 1: Query embedding
    with ze.span(name="embed_query", kind="llm") as embed_span:
        start = time.time()
        embedding = create_embedding(user_query)
        embed_span.set_attributes({
            "llm.model": "text-embedding-3-small",
            "llm.provider": "openai",
            "llm.input_tokens": len(user_query.split()),
            "llm.duration_ms": (time.time() - start) * 1000
        })

    # Step 2: Vector search
    with ze.span(name="vector_search", kind="vector_store") as search_span:
        results = vector_db.similarity_search(embedding, k=5)
        search_span.set_attributes({
            "vector_store.index": "knowledge_base",
            "vector_store.k": 5,
            "vector_store.results_count": len(results)
        })

    # Step 3: Rerank results
    with ze.span(name="rerank_results", kind="llm") as rerank_span:
        reranked = rerank_documents(user_query, results)
        rerank_span.set_attributes({
            "llm.model": "rerank-english-v2.0",
            "llm.provider": "cohere",
            "rerank.input_documents": len(results),
            "rerank.output_documents": len(reranked)
        })

    # Step 4: Generate response
    with ze.span(name="generate_response", kind="llm") as gen_span:
        context = "\n".join([doc.content for doc in reranked[:3]])
        messages = [
            {"role": "system", "content": f"Use this context to answer: {context}"},
            {"role": "user", "content": user_query}
        ]

        response = generate_with_metrics(messages, model="gpt-4")

        gen_span.set_attributes({
            "llm.context_documents": 3,
            "llm.context_length": len(context)
        })

        return {
            "answer": response,
            "sources": [doc.metadata for doc in reranked[:3]],
            "trace_id": trace_id
        }
This comprehensive instrumentation provides full visibility into your LLM operations, enabling you to monitor performance, track costs, and debug issues effectively.
For automatic instrumentation of popular LLM libraries, check out our SDK integrations which handle all of this automatically.