Skip to content

Continuous Evaluation

Continuous evaluation lets you run Attest assertions on live production traces without blocking request handling. The ContinuousEvalRunner samples traces, evaluates them in the background, and dispatches alerts when assertions fail.

Production Traffic
┌─────────┐ sample_rate ┌──────────────┐
│ Agent │ ───────────────► │ ContinuousEval│
│ Output │ │ Runner │
└─────────┘ └──────┬───────┘
┌────────┼────────┐
▼ ▼ ▼
Engine Sampler AlertDispatcher
(evaluate) (filter) (webhook/Slack)
from attest import config
config(
sample_rate=0.1, # Evaluate 10% of traces
alert_webhook="https://hooks.example.com/attest", # Generic webhook
alert_slack_url="https://hooks.slack.com/services/T.../B.../xxx", # Slack
)
VariableDescriptionDefault
ATTEST_SAMPLE_RATEFraction of traces to evaluate (0.0-1.0)0.0
ATTEST_ALERT_WEBHOOKWebhook URL for drift alertsNone
ATTEST_ALERT_SLACK_URLSlack webhook URL for alertsNone

Programmatic config() calls take priority over environment variables.

from attest import Assertion, ContinuousEvalRunner
from attest.client import AttestClient
from attest.engine_manager import EngineManager
# Start engine
engine = EngineManager()
await engine.start()
client = AttestClient(engine)
# Define production assertions
assertions = [
Assertion(
assertion_id="prod_content_01",
type="content",
spec={"target": "output.message", "check": "forbidden", "values": ["error", "stacktrace"]},
),
Assertion(
assertion_id="prod_constraint_01",
type="constraint",
spec={"field": "metadata.latency_ms", "operator": "lte", "value": 5000},
),
Assertion(
assertion_id="prod_constraint_02",
type="constraint",
spec={"field": "metadata.cost_usd", "operator": "lte", "value": 0.10},
),
]
# Create runner
runner = ContinuousEvalRunner(
client=client,
assertions=assertions,
sample_rate=0.1,
alert_webhook="https://hooks.example.com/attest",
alert_slack_url="https://hooks.slack.com/services/T.../B.../xxx",
)
ParameterTypeDefaultDescription
clientAttestClientrequiredProtocol client connected to the engine
assertionslist[Assertion]requiredAssertions to evaluate on every sampled trace
sample_ratefloat1.0Fraction of traces to evaluate (0.0-1.0)
alert_webhookstr | NoneNoneGeneric webhook URL for alerts
alert_slack_urlstr | NoneNoneSlack incoming webhook URL

Start a background asyncio task that dequeues and evaluates traces:

# Start background loop
await runner.start()
# Submit traces from your request handler
async def handle_request(request):
result = await my_agent(request.query)
await runner.submit(result.trace) # Non-blocking enqueue
return result
# When shutting down
await runner.stop()

Evaluate a single trace synchronously (respects the sampler):

eval_result = await runner.evaluate_trace(trace)
if eval_result is None:
# Trace was not sampled
pass
elif eval_result.total_cost > 0:
# Assertion used a paid layer
log_cost(eval_result.total_cost)

evaluate_trace returns None when the sampler skips the trace, or an EvaluateBatchResult with assertion outcomes.

The Sampler class implements probabilistic filtering:

from attest import Sampler
sampler = Sampler(rate=0.05) # 5% sample rate
if sampler.should_sample():
# This trace was selected
...
RateBehavior
0.0Never sample (evaluation disabled)
0.550% of traces evaluated
1.0Every trace evaluated

Routes failure notifications to webhooks and Slack.

from attest import AlertDispatcher
dispatcher = AlertDispatcher(
webhook_url="https://hooks.example.com/attest",
slack_url="https://hooks.slack.com/services/T.../B.../xxx",
)
await dispatcher.dispatch({
"drift_type": "content_violation",
"score": 0.3,
"trace_id": "trc_abc123def456",
"assertion_id": "prod_content_01",
"explanation": "Output contained forbidden term 'stacktrace'",
})

Generic webhooks receive the raw alert dict as a JSON POST body:

{
"drift_type": "content_violation",
"score": 0.3,
"trace_id": "trc_abc123def456",
"assertion_id": "prod_content_01",
"explanation": "Output contained forbidden term 'stacktrace'"
}

Slack webhooks receive a formatted text message:

[attest] drift alert — type=content_violation score=0.3 trace_id=trc_abc123def456

Alert dispatch errors are logged but never raised. A failed webhook does not block evaluation or crash the runner.

Full FastAPI integration:

import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI
from attest import Assertion, ContinuousEvalRunner
from attest.client import AttestClient
from attest.engine_manager import EngineManager
runner: ContinuousEvalRunner | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global runner
engine = EngineManager()
await engine.start()
client = AttestClient(engine)
runner = ContinuousEvalRunner(
client=client,
assertions=[
Assertion(
assertion_id="latency",
type="constraint",
spec={"field": "metadata.latency_ms", "operator": "lte", "value": 3000},
),
Assertion(
assertion_id="no_errors",
type="content",
spec={
"target": "output.message",
"check": "forbidden",
"values": ["internal error", "traceback"],
},
),
],
sample_rate=0.05,
alert_slack_url="https://hooks.slack.com/services/T.../B.../xxx",
)
await runner.start()
yield
await runner.stop()
await engine.stop()
app = FastAPI(lifespan=lifespan)
@app.post("/chat")
async def chat(query: str):
result = await my_agent(query)
if runner:
await runner.submit(result.trace)
return {"response": result.trace.output.get("message")}

Layers 1-4 are free and add no latency cost. Prioritize these for production:

LayerTypeProduction Use
2constraintLatency SLOs, cost budgets, token limits
3traceRequired tools called, no infinite loops
4contentForbidden terms, required keywords, regex patterns

Use layers 5-6 sparingly in production. Set sample_rate low (0.01-0.05) when using embedding or LLM judge assertions.