This notebook shows the smallest useful Everruns SDK flow: create an agent, start a Generic harness session, send one message, and read the reply.
It defaults to https://app.everruns.com/api. Override EVERRUNS_API_URL only when you want to point the same notebook at a local or self-hosted Everruns deployment.
Install the SDK
Run this cell once in the notebook environment.
%pip install -q everruns-sdk
Configure the client
Set EVERRUNS_API_KEY before running against app.everruns.com. For local dev mode you can point EVERRUNS_API_URL at your local server and use dev as the key.
import os
import uuid
from everruns_sdk import Everruns
BASE_URL = os.environ.get("EVERRUNS_API_URL", "https://app.everruns.com/api")
API_KEY = os.environ.get("EVERRUNS_API_KEY", "")
if BASE_URL.startswith("https://app.everruns.com") and not API_KEY:
raise RuntimeError("Set EVERRUNS_API_KEY before running this notebook against app.everruns.com.")
if not API_KEY:
API_KEY = "dev"
client = Everruns(api_key=API_KEY, base_url=BASE_URL)
Create the agent
This uses the server's default model. The unique suffix keeps the notebook rerunnable.
run_suffix = uuid.uuid4().hex[:8]
agent = await client.agents.create(
name=f"sdk-notebook-{run_suffix}",
system_prompt=(
"You are a concise demo assistant. "
"Answer in short bullet points and keep responses short."
),
)
agent.id
Start a session
A Generic harness session is the quickest way to run the agent without adding extra capabilities or files.
session = await client.sessions.create(
harness_name="generic",
agent_id=agent.id,
title=f"SDK notebook demo {run_suffix}",
)
session.id
Send a message and read the reply
This cell sends one message, prints the assistant's response as it accumulates, and returns a small result object with the IDs you might want to keep.
await client.messages.create(
session.id,
"What does Everruns do, and why would a product team use it?",
)
# Poll the event log until the turn completes. We use `events.list()` rather
# than `events.stream()` here because the SSE stream only yields events that
# arrive *after* it subscribes. Under fast models (or `llmsim` in tests) the
# turn can complete before the SSE connection opens, so polling is race-safe
# and gives the same observable output in a notebook cell.
import asyncio
streamed_chunks = []
completed_text = None
seen_ids: set[str] = set()
while True:
page = await client.events.list(session.id, limit=200)
# `events.list()` returns events oldest-first. Skip ones we already saw,
# tracked by event id rather than position (the page length grows as the
# turn progresses, so positional indices into the page are not stable).
new_events = [e for e in page if e.id not in seen_ids]
seen_ids.update(e.id for e in page)
done = False
for event in new_events:
if event.type == "output.message.delta":
delta = event.data.get("delta", "")
streamed_chunks.append(delta)
print(delta, end="", flush=True)
elif event.type == "output.message.completed":
message = event.data.get("message", {})
completed_text = "\n".join(
part.get("text", "")
for part in message.get("content", [])
if part.get("type") == "text"
).strip()
elif event.type == "turn.completed":
done = True
break
elif event.type == "turn.failed":
raise RuntimeError(event.data.get("error", "turn failed"))
if done:
break
await asyncio.sleep(0.5)
result = {
"base_url": BASE_URL,
"agent_id": agent.id,
"session_id": session.id,
"response": "".join(streamed_chunks).strip() or completed_text or "",
}
result
Next steps
- Change the prompt and rerun the last three cells
- Swap the message text for your own task
- Move the same flow into a script or service once the notebook feels right