TL;DR

  1. Pull (or create) a dataset
  2. Write a task using @ze.task – any function that processes a row
  3. Write evaluations using @ze.evaluation – functions that score your results
  4. Run: dataset.run(task).eval([evaluations])

Quick Start

import zeroeval as ze

ze.init()
dataset = ze.Dataset.pull("spam_emails")

@ze.task(outputs=["prediction", "confidence"])
def detect_spam(row):
    # Your ML model here
    return {"prediction": 1, "confidence": 0.85}

@ze.evaluation(mode="row", outputs=["correct"])
def accuracy_per_row(row):
    return {"correct": int(row["prediction"] == row["label"])}

# Run task and evaluate
run = dataset.run(detect_spam)
run.eval([accuracy_per_row])

Task Functions

Tasks are your core ML functions decorated with @ze.task:
@ze.task(outputs=["prediction", "confidence"])
def spam_detector(row):
    """Simple spam detection"""
    text = row["text"]
    spam_words = ["FREE", "WIN", "URGENT"]

    score = sum(1 for word in spam_words if word in text.upper())
    confidence = min(0.95, score / 3 + 0.2)
    prediction = 1 if confidence > 0.5 else 0

    return {
        "prediction": prediction,
        "confidence": confidence
    }
Key points:
  • Must return a dictionary with all declared outputs
  • Receives a single row from your dataset
  • Can access row fields using dot notation: row.text or row["text"]

Evaluations

Evaluations score your task results. Three types available:

Row Evaluations

Score each individual result:
@ze.evaluation(mode="row", outputs=["correct", "error"])
def binary_accuracy(row):
    """Check if prediction matches label"""
    correct = int(row["prediction"] == row["label"])
    error = abs(row["prediction"] - row["label"])

    return {"correct": correct, "error": error}

Column Evaluations

Compute aggregate metrics across the entire dataset:
@ze.evaluation(mode="column", outputs=["accuracy", "precision", "recall", "f1"])
def classification_metrics(dataset):
    """Calculate aggregate classification metrics"""
    tp = sum(row.get("true_positive", 0) for row in dataset)
    fp = sum(row.get("false_positive", 0) for row in dataset)
    tn = sum(row.get("true_negative", 0) for row in dataset)
    fn = sum(row.get("false_negative", 0) for row in dataset)

    accuracy = (tp + tn) / (tp + fp + tn + fn)
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

    return {"accuracy": accuracy, "precision": precision, "recall": recall, "f1": f1}

Run Evaluations

Compare results across multiple runs:
@ze.evaluation(mode="run", outputs=["consistency", "avg_accuracy"])
def cross_run_analysis(runs):
    """Analyze consistency across multiple runs"""
    accuracies = [run.metrics.get("accuracy", 0) for run in runs]
    avg_accuracy = sum(accuracies) / len(accuracies)
    consistency = 1 - (max(accuracies) - min(accuracies))

    return {"consistency": consistency, "avg_accuracy": avg_accuracy}

Column & Run Metrics

For simpler aggregate calculations, use dedicated metric decorators:
@ze.column_metric(outputs=["accuracy"])
def dataset_accuracy(dataset):
    """Simple accuracy calculation"""
    correct = sum(row.get("correct", 0) for row in dataset)
    total = len(dataset)
    return {"accuracy": correct / total}

@ze.run_metric(outputs=["stability"])
def run_stability(runs):
    """Measure stability across runs"""
    import numpy as np
    scores = [run.metrics.get("accuracy", 0) for run in runs]
    return {"stability": 1 - np.std(scores)}
Apply them separately:
run.column_metrics([dataset_accuracy])
run.run_metrics([run_stability], all_runs)

Complete Example

Here’s a full spam detection experiment:
import zeroeval as ze

ze.init()

# Create or pull dataset
dataset = ze.Dataset.pull("spam_detection")

@ze.task(outputs=["prediction", "confidence"])
def detect_spam(row):
    text = row["text"]
    spam_indicators = ["FREE", "WIN", "URGENT", "Click here", "Act now"]

    score = sum(1 for word in spam_indicators if word.upper() in text.upper())
    confidence = min(0.95, max(0.05, score / 3 + 0.3))
    prediction = 1 if confidence > 0.5 else 0

    return {"prediction": prediction, "confidence": confidence}

@ze.evaluation(mode="row", outputs=["correct", "true_positive", "false_positive"])
def binary_metrics(row):
    pred = row["prediction"]
    label = row["label"]

    return {
        "correct": int(pred == label),
        "true_positive": int(pred == 1 and label == 1),
        "false_positive": int(pred == 1 and label == 0)
    }

@ze.column_metric(outputs=["accuracy", "precision"])
def aggregate_metrics(dataset):
    tp = sum(row.get("true_positive", 0) for row in dataset)
    fp = sum(row.get("false_positive", 0) for row in dataset)
    correct = sum(row.get("correct", 0) for row in dataset)

    accuracy = correct / len(dataset)
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0

    return {"accuracy": accuracy, "precision": precision}

# Run experiment
run = dataset.run(detect_spam)
run.eval([binary_metrics])
run.column_metrics([aggregate_metrics])

# Multiple runs for stability analysis
all_runs = run.repeat(3)  # Creates 3 total runs

print(f"Final accuracy: {run.metrics.get('accuracy', 0):.2%}")

Multiple Runs

Test stability by running multiple times:
# Run the same task 5 times
run1 = dataset.run(detect_spam)
all_runs = run1.repeat(5)  # Creates 5 total runs

# Apply run-level metrics to analyze consistency
@ze.run_metric(outputs=["mean_accuracy", "std_accuracy"])
def accuracy_stats(runs):
    import numpy as np
    accuracies = [r.metrics.get("accuracy", 0) for r in runs]
    return {
        "mean_accuracy": np.mean(accuracies),
        "std_accuracy": np.std(accuracies)
    }

all_runs[0].run_metrics([accuracy_stats], all_runs)

Subsets & Debugging

Test on smaller subsets while developing:
# Run on first 10 rows only
small_dataset = dataset[:10]
small_run = small_dataset.run(detect_spam)
small_run.eval([binary_metrics])

# Or create a custom subset
subset_indices = [0, 5, 10, 15, 20]
subset_dataset = ze.Dataset(
    name="test_subset",
    data=[dataset[i] for i in subset_indices]
)
test_run = subset_dataset.run(detect_spam)

Automatic Tracing

Tasks are automatically traced with spans for observability:
@ze.task(outputs=["result"])
def my_task(row):
    # This will be automatically traced as span "task:my_task"
    result = call_my_model(row["input"])
    return {"result": result}

# Any @ze.span decorators inside your task are also captured
from zeroeval.observability import span

@ze.task(outputs=["answer"])
def complex_task(row):
    with span(name="preprocessing"):
        cleaned = preprocess(row["text"])

    with span(name="model_call"):
        prediction = model.predict(cleaned)

    return {"answer": prediction}