import requests
import json
import time
import uuid
from datetime import datetime, timezone
class GeminiTracer:
def **init**(self, api_key: str, zeroeval_api_key: str):
self.gemini_api_key = api_key
self.zeroeval_api_key = zeroeval_api_key
self.zeroeval_url = "https://api.zeroeval.com/api/v1/spans"
def generate_content_with_tracing(self, messages: list, model: str = "gemini-1.5-flash", **kwargs):
"""Make Gemini API call with full ZeroEval instrumentation"""
trace_id = str(uuid.uuid4())
span_id = str(uuid.uuid4())
start_time = time.time()
# Convert OpenAI-style messages to Gemini contents format
contents, system_instruction = self._convert_messages_to_contents(messages)
# Prepare Gemini request payload
gemini_payload = {
"contents": contents
}
# Add generation config
generation_config = {}
if kwargs.get("temperature") is not None:
generation_config["temperature"] = kwargs["temperature"]
if kwargs.get("max_tokens"):
generation_config["maxOutputTokens"] = kwargs["max_tokens"]
if kwargs.get("top_p") is not None:
generation_config["topP"] = kwargs["top_p"]
if kwargs.get("top_k") is not None:
generation_config["topK"] = kwargs["top_k"]
if kwargs.get("stop"):
stop = kwargs["stop"]
generation_config["stopSequences"] = stop if isinstance(stop, list) else [stop]
if generation_config:
gemini_payload["generationConfig"] = generation_config
# Add system instruction if present
if system_instruction:
gemini_payload["systemInstruction"] = {"parts": [{"text": system_instruction}]}
# Add tools if provided
if kwargs.get("tools"):
gemini_payload["tools"] = kwargs["tools"]
if kwargs.get("tool_choice"):
gemini_payload["toolConfig"] = {
"functionCallingConfig": {"mode": kwargs["tool_choice"]}
}
# Choose endpoint based on streaming
is_streaming = kwargs.get("stream", False)
endpoint = "streamGenerateContent" if is_streaming else "generateContent"
url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:{endpoint}"
try:
response = requests.post(
url,
headers={
"x-goog-api-key": self.gemini_api_key,
"Content-Type": "application/json"
},
json=gemini_payload,
stream=is_streaming
)
response.raise_for_status()
end_time = time.time()
duration_ms = (end_time - start_time) * 1000
if is_streaming:
# Handle streaming response
full_response = ""
input_tokens = 0
output_tokens = 0
finish_reason = None
model_version = None
first_token_time = None
for line in response.iter_lines():
if line:
try:
# Gemini streaming sends JSON objects separated by newlines
data = json.loads(line.decode('utf-8'))
if 'candidates' in data and data['candidates']:
candidate = data['candidates'][0]
# Extract content
if 'content' in candidate and 'parts' in candidate['content']:
for part in candidate['content']['parts']:
if 'text' in part:
if first_token_time is None:
first_token_time = time.time()
full_response += part['text']
# Extract finish reason
if 'finishReason' in candidate:
finish_reason = candidate['finishReason']
# Extract usage metadata (usually in final chunk)
if 'usageMetadata' in data:
usage = data['usageMetadata']
input_tokens = usage.get('promptTokenCount', 0)
output_tokens = usage.get('candidatesTokenCount', 0)
# Extract model version
if 'modelVersion' in data:
model_version = data['modelVersion']
except json.JSONDecodeError:
continue
self._send_span(
span_id=span_id, trace_id=trace_id, model=model,
original_messages=messages, response_text=full_response,
input_tokens=input_tokens, output_tokens=output_tokens,
duration_ms=duration_ms, start_time=start_time,
finish_reason=finish_reason, model_version=model_version,
streaming=True, first_token_time=first_token_time,
**kwargs
)
return full_response
else:
# Handle non-streaming response
response_data = response.json()
# Extract response content
content = ""
if 'candidates' in response_data and response_data['candidates']:
candidate = response_data['candidates'][0]
if 'content' in candidate and 'parts' in candidate['content']:
content_parts = []
for part in candidate['content']['parts']:
if 'text' in part:
content_parts.append(part['text'])
content = ''.join(content_parts)
# Extract usage
usage = response_data.get('usageMetadata', {})
input_tokens = usage.get('promptTokenCount', 0)
output_tokens = usage.get('candidatesTokenCount', 0)
# Extract other metadata
finish_reason = None
if 'candidates' in response_data and response_data['candidates']:
finish_reason = response_data['candidates'][0].get('finishReason')
model_version = response_data.get('modelVersion')
self._send_span(
span_id=span_id, trace_id=trace_id, model=model,
original_messages=messages, response_text=content,
input_tokens=input_tokens, output_tokens=output_tokens,
duration_ms=duration_ms, start_time=start_time,
finish_reason=finish_reason, model_version=model_version,
streaming=False, **kwargs
)
return content
except Exception as e:
end_time = time.time()
duration_ms = (end_time - start_time) * 1000
self._send_error_span(
span_id=span_id, trace_id=trace_id, model=model,
original_messages=messages, duration_ms=duration_ms,
start_time=start_time, error=e, **kwargs
)
raise
def _convert_messages_to_contents(self, messages: list) -> tuple:
"""Convert OpenAI-style messages to Gemini contents format"""
contents = []
system_instruction = None
for msg in messages:
role = msg.get('role', 'user') if isinstance(msg, dict) else msg.role
content = msg.get('content', '') if isinstance(msg, dict) else msg.content
if role == 'system':
# Collect system instructions
if system_instruction:
system_instruction += f"\n{content}"
else:
system_instruction = content
continue
# Convert content to parts
if isinstance(content, list):
# Handle multimodal content
parts = []
for item in content:
if isinstance(item, dict) and item.get('type') == 'text':
parts.append({"text": item['text']})
# Add support for images, etc. if needed
else:
parts = [{"text": str(content)}]
# Convert role
gemini_role = "user" if role == "user" else "model"
contents.append({"role": gemini_role, "parts": parts})
return contents, system_instruction
def _send_span(self, span_id: str, trace_id: str, model: str,
original_messages: list, response_text: str,
input_tokens: int, output_tokens: int, duration_ms: float,
start_time: float, finish_reason: str = None,
model_version: str = None, streaming: bool = False,
first_token_time: float = None, **kwargs):
"""Send successful span to ZeroEval"""
# Calculate performance metrics
throughput = output_tokens / (duration_ms / 1000) if duration_ms > 0 else 0
ttft_ms = None
if streaming and first_token_time:
ttft_ms = (first_token_time - start_time) * 1000
# Prepare attributes following ZeroEval's expected format
attributes = {
# Core attributes for cost calculation (use provider naming)
"provider": "gemini", # Key for cost calculation
"model": model, # Key for cost calculation
"inputTokens": input_tokens, # Key for cost calculation
"outputTokens": output_tokens, # Key for cost calculation
# Gemini-specific attributes
"temperature": kwargs.get("temperature"),
"max_tokens": kwargs.get("max_tokens"), # maxOutputTokens
"top_p": kwargs.get("top_p"),
"top_k": kwargs.get("top_k"),
"stop_sequences": kwargs.get("stop"),
"streaming": streaming,
"finish_reason": finish_reason,
"model_version": model_version,
# Performance metrics
"throughput": throughput,
"duration_ms": duration_ms,
}
if ttft_ms:
attributes["ttft_ms"] = ttft_ms
# Include tool information if present
if kwargs.get("tools"):
attributes["tools_count"] = len(kwargs["tools"])
attributes["tool_choice"] = kwargs.get("tool_choice")
# Clean up None values
attributes = {k: v for k, v in attributes.items() if v is not None}
# Format original messages for display (convert back to OpenAI format for consistency)
formatted_messages = self._format_messages_for_display(original_messages)
span_data = {
"id": span_id,
"trace_id": trace_id,
"name": f"{model}_completion",
"kind": "llm", # Critical: must be "llm" for cost calculation
"started_at": datetime.fromtimestamp(start_time, timezone.utc).isoformat(),
"ended_at": datetime.fromtimestamp(start_time + duration_ms/1000, timezone.utc).isoformat(),
"status": "ok",
"attributes": attributes,
"input_data": json.dumps(formatted_messages),
"output_data": response_text,
"tags": {
"provider": "gemini",
"model": model,
"streaming": str(streaming).lower()
}
}
# Send to ZeroEval
response = requests.post(
self.zeroeval_url,
headers={
"Authorization": f"Bearer {self.zeroeval_api_key}",
"Content-Type": "application/json"
},
json=[span_data]
)
if response.status_code != 200:
print(f"Warning: Failed to send span to ZeroEval: {response.text}")
def _send_error_span(self, span_id: str, trace_id: str, model: str,
original_messages: list, duration_ms: float,
start_time: float, error: Exception, **kwargs):
"""Send error span to ZeroEval"""
attributes = {
"provider": "gemini",
"model": model,
"temperature": kwargs.get("temperature"),
"max_tokens": kwargs.get("max_tokens"),
"streaming": kwargs.get("stream", False),
"error_type": type(error).__name__,
"error_message": str(error),
"duration_ms": duration_ms,
}
# Clean up None values
attributes = {k: v for k, v in attributes.items() if v is not None}
formatted_messages = self._format_messages_for_display(original_messages)
span_data = {
"id": span_id,
"trace_id": trace_id,
"name": f"{model}_completion",
"kind": "llm",
"started_at": datetime.fromtimestamp(start_time, timezone.utc).isoformat(),
"ended_at": datetime.fromtimestamp(start_time + duration_ms/1000, timezone.utc).isoformat(),
"status": "error",
"attributes": attributes,
"input_data": json.dumps(formatted_messages),
"output_data": "",
"error_message": str(error),
"tags": {
"provider": "gemini",
"model": model,
"error": "true"
}
}
requests.post(
self.zeroeval_url,
headers={
"Authorization": f"Bearer {self.zeroeval_api_key}",
"Content-Type": "application/json"
},
json=[span_data]
)
def _format_messages_for_display(self, messages: list) -> list:
"""Format messages for optimal display in ZeroEval UI"""
formatted = []
for msg in messages:
if hasattr(msg, 'role'):
role = msg.role
content = msg.content
else:
role = msg.get('role', 'user')
content = msg.get('content', '')
# Handle multimodal content
if isinstance(content, list):
text_parts = []
for part in content:
if isinstance(part, dict) and part.get('type') == 'text':
text_parts.append(part['text'])
elif isinstance(part, str):
text_parts.append(part)
content = '\n'.join(text_parts) if text_parts else '[Multimodal content]'
formatted.append({
"role": role,
"content": content
})
return formatted
# Usage example
tracer = GeminiTracer(
api_key="your-gemini-api-key",
zeroeval_api_key="your-zeroeval-api-key"
)
# Non-streaming call
response = tracer.generate_content_with_tracing([
{"role": "user", "content": "What is the capital of France?"}
], model="gemini-1.5-flash", temperature=0.7)
# Streaming call
response = tracer.generate_content_with_tracing([
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Write a short story"}
], model="gemini-1.5-flash", stream=True, temperature=0.9)