Skip to content

Building Agents Using the Everruns SDK

import { Tabs, TabItem } from “@astrojs/starlight/components”;

This tutorial walks you through building AI agents on Everruns using the official everruns-sdk Python package — from creating your first agent to orchestrating multi-turn conversations with tool execution and real-time event streaming.

Everruns is a durable agentic harness engine. It provides the infrastructure layer between your application and LLM providers, handling the agent loop — the cycle of reasoning (calling an LLM) and acting (executing tools) — with durability guarantees backed by PostgreSQL.

Think of it as the runtime that turns a language model into a reliable, stateful agent:

Unlike calling an LLM API directly, Everruns gives you:

  • Durability — Turns survive worker crashes and are automatically retried
  • Streaming events — Real-time SSE stream of every step the agent takes
  • Modular capabilities — File systems, bash execution, web fetch, and more — composed per-agent
  • Multi-provider support — Switch between OpenAI, Anthropic, or any OpenAI-compatible provider
  • Full observability — Every LLM call, tool execution, and state transition is recorded as an immutable event

Before writing code, let’s understand the entities you’ll work with.

ConceptWhat it isAnalogy
AgentConfiguration for the agentic loop — system prompt, default model, and enabled capabilitiesA job description
SessionA running conversation with an agent. Holds state, history, and filesystemA work session
CapabilityA modular unit that provides tools and/or system prompt additionsA plugin
TurnOne iteration of the reason-act loop within a sessionA single task cycle
EventAn immutable, append-only record of everything that happensAn audit log entry

Key insight: Agents and capabilities are configuration. Sessions and turns are runtime. Events are data. Your application creates configuration, starts runtime, and consumes data.

You need:

  • A running Everruns instance (see Docker Compose quickstart)
  • Python 3.10+
  • An LLM provider configured (OpenAI or Anthropic API key set in the Everruns UI)
Terminal window
pip install everruns-sdk

The Everruns client is the entry point for all API interactions. It organizes methods into sub-clients: agents, sessions, messages, events, and more.

```python import asyncio from everruns_sdk import Everruns
# Reads EVERRUNS_API_KEY from environment
# Reads EVERRUNS_API_URL from environment (default: http://localhost:9300/api)
client = Everruns()
```
```python from everruns_sdk import Everruns
client = Everruns(
api_key="your-api-key",
base_url="http://localhost:9300/api"
)
```
```python from everruns_sdk import Everruns
# Dev mode (just start-dev) requires no real key
client = Everruns(api_key="dev", base_url="http://localhost:9300/api")
```

An agent defines what the AI should do (system prompt) and what tools it has access to (capabilities).

async def main():
client = Everruns(api_key="dev", base_url="http://localhost:9300/api")
agent = await client.agents.create(
name="Research Assistant",
system_prompt=(
"You are a research assistant. When given a topic, you:\n"
"1. Create a task list to track your work\n"
"2. Fetch relevant web pages for information\n"
"3. Save your notes to files in /workspace\n"
"4. Produce a concise summary of your findings"
),
)
print(f"Agent created: {agent.id} ({agent.name})")
asyncio.run(main())

Agents can be defined as markdown files with YAML front matter — a portable, version-controllable format. The front matter specifies name, capabilities, and tags. The body becomes the system prompt.

---
name: "HackerNews Reader"
description: "An agent that browses HackerNews autonomously"
tags:
- demo
- hackernews
capabilities:
- web_fetch
- current_time
- session_file_system
---
You are a HackerNews reader agent. You autonomously browse
Hacker News to find interesting stories, read discussions,
and research authors.

Import it with a single SDK call:

with open("hackernews-reader.md") as f:
agent = await client.agents.import_agent(f.read())
print(f"Imported: {agent.id} ({agent.name})")

This is equivalent to creating the agent with create() but the markdown format makes agents easy to share, review in PRs, and store in git.

Each capability adds tools and/or system prompt context to the agent. Common capabilities:

CapabilityTools ProvidedPurpose
web_fetchweb_fetchFetch URLs and convert HTML to markdown
session_file_systemread_file, write_file, list_directory, grep_files, delete_fileIsolated per-session virtual filesystem
virtual_bashbashSandboxed bash shell execution
stateless_todo_listwrite_todosStructured task tracking in conversation
current_timeget_current_timeCurrent date/time awareness
session_storagekv_store, secret_storeKey-value storage and encrypted secrets
sample_data(mounts only)Sample JSON/YAML files in /samples

A session is a working instance — it holds the conversation, filesystem, and execution state.

session = await client.sessions.create(
agent_id=agent.id,
title="Research: Durable Execution Engines",
)
print(f"Session created: {session.id}")

Sending a user message triggers the agentic loop. The SDK queues a durable workflow that runs the full reason-act cycle.

await client.messages.create(
session.id,
"Research the concept of durable execution engines. "
"What are the main approaches and trade-offs?"
)

The call returns immediately — it confirms the message was stored and the workflow was queued. To get the agent’s response, you consume events.

This is where the SDK shines. Instead of manually polling or parsing SSE, the SDK provides an async iterator that handles reconnection, heartbeat detection, and event typing automatically.

async for event in client.events.stream(session.id):
if event.type == "output.message.delta":
# Streaming text — print token by token
print(event.data.get("delta", ""), end="", flush=True)
elif event.type == "output.message.completed":
# Final response — extract full text
message = event.data.get("message", {})
for part in message.get("content", []):
if part.get("type") == "text":
print(f"\n\nAgent: {part['text']}")
elif event.type == "tool.started":
# Tool invocation — show what the agent is doing
tool_call = event.data.get("tool_call", {})
tool_name = tool_call.get("name", "unknown")
print(f"\n [Tool] {tool_name}")
elif event.type == "tool.completed":
status = "done" if event.data.get("success") else "error"
print(f" [Tool] {event.data.get('tool_name', '?')}: {status}")
elif event.type == "turn.completed":
print("\n[Turn completed]")
break
elif event.type == "turn.failed":
print(f"\n[Turn failed: {event.data.get('error', 'unknown')}]")
break

The client.events.stream() method handles:

  • Automatic reconnection — When the server cycles connections (every 5 minutes), the SDK reconnects transparently using since_id
  • Heartbeat-based stale detection — Server sends heartbeat comments every 30s; if none arrive within 45s, the SDK reconnects
  • Event typing — Each event has .type and .data attributes parsed from SSE
  • Retry with backoff — Network errors trigger exponential backoff with jitter

When you send a message, here’s the typical event sequence:

Sessions persist across messages. Send follow-up messages to continue the conversation — the agent retains full context.

# Follow-up question
await client.messages.create(
session.id,
"Compare Temporal.io vs the PostgreSQL-based approach. "
"Save your analysis to /workspace/comparison.md"
)
# Stream the response
async for event in client.events.stream(session.id):
if event.type == "output.message.completed":
message = event.data.get("message", {})
for part in message.get("content", []):
if part.get("type") == "text":
print(part["text"])
break
elif event.type == "turn.failed":
print(f"Error: {event.data.get('error')}")
break

If the agent wrote files during the session, retrieve them via the SDK’s filesystem sub-client:

# List files in the session workspace
listing = await client.filesystem.list(session.id, "/workspace")
for entry in listing.entries:
kind = "dir" if entry.is_directory else "file"
print(f" [{kind}] {entry.name}")
# Read a specific file
file = await client.filesystem.read(session.id, "/workspace/comparison.md")
print(file.content)

Here’s a complete, runnable example that creates an agent from a markdown definition, starts a session, sends a prompt, and streams the response — all using the SDK.

#!/usr/bin/env python3
"""HackerNews Reader Agent — demonstrates the everruns-sdk."""
import asyncio
import os
from everruns_sdk import Everruns
AGENT_MARKDOWN = """
---
name: "HackerNews Reader"
description: "An agent that browses HackerNews autonomously"
tags:
- demo
capabilities:
- web_fetch
- current_time
- session_file_system
---
You are a HackerNews reader agent. Use the public Firebase API:
- Top stories: https://hacker-news.firebaseio.com/v0/topstories.json
- Item detail: https://hacker-news.firebaseio.com/v0/item/{id}.json
- User profile: https://hacker-news.firebaseio.com/v0/user/{username}.json
Fetch only what's needed. Present stories in a numbered list with title,
points, and comment count.
""".strip()
async def main():
client = Everruns(
api_key=os.environ.get("EVERRUNS_API_KEY", "dev"),
base_url=os.environ.get("EVERRUNS_API_URL", "http://localhost:9300/api"),
)
# Import agent from markdown
agent = await client.agents.import_agent(AGENT_MARKDOWN)
print(f"Agent: {agent.id} ({agent.name})")
# Create session
session = await client.sessions.create(
agent_id=agent.id,
title="HackerNews Reader Session",
)
print(f"Session: {session.id}\n")
try:
# Send message
prompt = "What are the top 5 stories on HackerNews right now?"
await client.messages.create(session.id, prompt)
print(f"You: {prompt}\n")
# Stream events
async for event in client.events.stream(session.id):
if event.type == "output.message.completed":
message = event.data.get("message", {})
for part in message.get("content", []):
if part.get("type") == "text":
print(f"Agent: {part['text']}")
elif event.type == "tool.started":
tool_call = event.data.get("tool_call", {})
name = tool_call.get("name", "unknown")
args = tool_call.get("arguments", {})
if name == "web_fetch":
print(f" -> Fetching: {args.get('url', '?')}")
else:
print(f" -> Tool: {name}")
elif event.type == "turn.completed":
print("\n[Turn completed]")
break
elif event.type == "turn.failed":
print(f"\n[Turn failed: {event.data.get('error', 'unknown')}]")
break
# Interactive follow-up loop
while True:
try:
follow_up = input("\nYou (or 'quit'): ").strip()
except (EOFError, KeyboardInterrupt):
break
if not follow_up or follow_up.lower() in ("quit", "exit", "q"):
break
await client.messages.create(session.id, follow_up)
print()
async for event in client.events.stream(session.id):
if event.type == "output.message.completed":
message = event.data.get("message", {})
for part in message.get("content", []):
if part.get("type") == "text":
print(f"Agent: {part['text']}")
elif event.type == "turn.completed":
break
elif event.type == "turn.failed":
print(f"Error: {event.data.get('error')}")
break
finally:
# Cleanup
await client.sessions.delete(session.id)
await client.agents.delete(agent.id)
await client.close()
print("\nCleaned up.")
if __name__ == "__main__":
asyncio.run(main())

Run it:

Terminal window
# Start the Everruns server
just start-dev
# Run the example
pip install everruns-sdk
python hackernews_reader.py

Events follow the {category}.{action} naming convention (e.g., turn.started, output.message.delta, tool.completed). For the complete event type catalog with field descriptions and JSON examples, see the Event Reference.

Chain agents by passing output from one session into another:

async def multi_agent_pipeline(client: Everruns):
# Step 1: Research agent gathers information
researcher = await client.agents.create(
name="Researcher",
system_prompt="Research the given topic thoroughly. Write detailed notes.",
)
research_session = await client.sessions.create(agent_id=researcher.id)
await client.messages.create(
research_session.id, "Research Python async frameworks"
)
# Wait for research to complete
research_output = None
async for event in client.events.stream(research_session.id):
if event.type == "output.message.completed":
message = event.data.get("message", {})
parts = message.get("content", [])
research_output = "\n".join(
p["text"] for p in parts if p.get("type") == "text"
)
elif event.type in ("turn.completed", "turn.failed"):
break
# Step 2: Writer agent creates a polished article
writer = await client.agents.create(
name="Technical Writer",
system_prompt="Write clear, well-structured technical articles.",
)
writer_session = await client.sessions.create(agent_id=writer.id)
await client.messages.create(
writer_session.id,
f"Write a blog post based on this research:\n\n{research_output}",
)
async for event in client.events.stream(writer_session.id):
if event.type == "output.message.delta":
print(event.data.get("delta", ""), end="", flush=True)
elif event.type in ("turn.completed", "turn.failed"):
break
# Cleanup
for sid in (research_session.id, writer_session.id):
await client.sessions.delete(sid)
for aid in (researcher.id, writer.id):
await client.agents.delete(aid)

Cancel a long-running turn via the SDK:

import asyncio
# Send a message that might take a while
await client.messages.create(session.id, "Analyze every Python package on PyPI")
# Changed your mind? Cancel it
await asyncio.sleep(2)
await client.sessions.cancel(session.id)

The cancellation flow emits turn.cancelled, adds a user message noting the cancellation, and the worker emits a final agent message confirming the work was stopped.

Inject files into the session filesystem, then ask the agent to process them:

# Upload a file into the session
await client.filesystem.create(
session.id,
"/workspace/input.csv",
content="name,value\nalpha,1\nbeta,2\ngamma,3",
)
# Ask the agent to process it
await client.messages.create(
session.id,
"Read /workspace/input.csv and create a summary."
)

Understanding the architecture helps you make better integration decisions.

Control Plane (Server) owns all state. It exposes two interfaces:

  • REST API (port 9000) — Internal API server
  • gRPC (port 9001) — Workers connect here (internal)

A Caddy reverse proxy (port 9300) unifies the API and UI behind a single port. The SDK connects to http://localhost:9300/api.

Workers are stateless executors. They claim tasks from a durable queue, execute the reason-act loop, and report results back via gRPC. If a worker crashes, the task is automatically re-queued.

Your application only talks to the REST API via the SDK through the Caddy proxy. You never interact with workers or the database directly.