Skip to main content

What is an Agent?

An agent is a long-running async event handler. Our platform handles all the HTTP plumbing, filesystem management, sandboxing, state persistence, and message routing. You just write three async functions that react to events.

Creating an Agent

Scaffold a new agent project with tu init:
tu init
This interactive command will prompt you for:
  • Namespace slug (e.g., acme-corp)
  • Agent name (e.g., my-agent)
It creates a ready-to-deploy project:
my-agent/
├── src/
    └── agent.py          # Your agent code
├── config.yaml           # Deployment configuration
├── pyproject.toml        # Python dependencies
├── Dockerfile            # Container definition
└── dev.ipynb             # Development notebook

Your First Agent

Here’s a complete, working agent:
from terminaluse import AgentServer, TaskContext, Event

server = AgentServer()

@server.on_create
async def handle_create(ctx: TaskContext, params: dict):
    await ctx.messages.send("Task created! Send me a message.")

@server.on_event
async def handle_event(ctx: TaskContext, event: Event):
    await ctx.messages.send(f"You said: {event.message}")

@server.on_complete
async def handle_complete(ctx: TaskContext):
    await ctx.messages.send("Task completed. Cleaning up...")
That’s a functioning agent. What happens when this runs:
  1. User creates a task → on_create fires, agent sends “Task created!”
  2. User sends “Hello” → on_event fires, agent sends “You said: Hello”
  3. Task completes → on_complete fires, agent sends “Task completed. Cleaning up…”

The Three Handlers

Every agent has exactly three entry points:

@server.on_create

Fires once when a task starts. Use it to:
  • Parse creation parameters
  • Initialize state (such as cloning a github repo / downloading files)
  • Send a welcome message
@server.on_create
async def handle_create(ctx: TaskContext, params: dict):
    # params contains whatever the client passed when creating the task
    user_id = params.get("user_id")
    user = your_db.get("user_id")
    # download files and set up filesystem if required
    # optionally, send a message to user confirming task creation
    await ctx.messages.send(f"Hi {user.name}!")

@server.on_event

Fires every time the user sends a message. This is where most of your logic lives.
@server.on_event
async def handle_event(ctx: TaskContext, event: Event):
    # Handle user messages
    user_message = event.message.content
    await ctx.messages.send(f"Processing: {user_message}")

    # Alternatively, clients can emit structured data events
    data_event = event.message.data

@server.on_complete

Fires when the task is completed / canceled. Use it for cleanup.
@server.on_complete
async def handle_complete(ctx: TaskContext):
    await ctx.messages.send("Goodbye!")

TaskContext: Your Request Context

Every handler receives a TaskContext. It’s a convenience wrapper that:
  • Holds the current task and agent objects
  • Provides modules (messages, state, events) bounded to the current task and agent
@server.on_event
async def handle_event(ctx: TaskContext, event: Event):
    # Using TaskContext modules (recommended)
    await ctx.messages.send("Hello")

    # Using SDK modules directly (when you need more control)
    from terminaluse import messages
    await messages.send(task_id=ctx.task.id, content="Hello")
Use ctx in handlers. Use the SDK modules directly in helper functions where ctx isn’t available.

TaskContext Reference

AttributeWhat it is
ctx.taskThe current Task object
ctx.agentThe current Agent object
ctx.messagesPre-bound message module
ctx.statePre-bound state module

Sending Messages

You have two ways to send messages:
# Simple text
await ctx.messages.send("Plain text message")
await ctx.messages.send("**Markdown** works too", format="markdown")

# Typed content
from terminaluse import TextContent, DataContent

await ctx.messages.send(content=TextContent(
    content="Hello world",
    author="agent",
    format="markdown"  # | "plain" | | "code"
))

# Structured output, for example for generative UIs
await ctx.messages.send(content=DataContent(
    data={"temperature": 72, "unit": "fahrenheit"},
    author="agent"
))
from terminaluse import messages, TextContent

await messages.send(
    task_id=ctx.task.id,
    content=TextContent(content="Hello", author="agent")
)
ctx.messages avoids the need to pass IDs around.
They both do the same thing. Use the simplest one that works for your case.

Streaming with Claude Agent SDK

When using the Claude Agent SDK, messages are automatically streamed:
from claude_agent_sdk import query, ClaudeAgentOptions

@server.on_event
async def handle_event(ctx: TaskContext, event: Event):
    user_message = event.content.content
    options = ClaudeAgentOptions(
        include_partial_messages=True,  # Enable streaming
        permission_mode="bypassPermissions",
    )

    async for message in query(prompt=user_message, options=options):
        await ctx.messages.send(message)

Managing State

State lets your agent remember things across events. It’s a key-value store scoped to each task.

The Pattern: Create Once, Update Many

@server.on_create
async def handle_create(ctx: TaskContext, params: dict):
    # Initialize state when task starts
    await ctx.state.create(state={
        "user_id": params.get("user_id"),
        "message_count": 0,
    })

@server.on_event
async def handle_event(ctx: TaskContext, event: Event):
    # Read current state
    state = await ctx.state.get()

    # Update values
    count = state.get("message_count", 0) + 1

    # Persist changes
    await ctx.state.update({
        "message_count": count,
    })

State API

MethodWhat it does
ctx.state.create(state={...})Initialize state (call once in on_create)
ctx.state.get()Retrieve current state
ctx.state.update({...})Merge updates into state
ctx.state.delete()Remove state entirely
State is scoped to (task_id, agent_id). Each task has its own isolated state. Two users talking to your agent have completely separate state.

Message Content Types

The platform supports two content types:
from terminaluse import TextContent

await ctx.messages.send(content=TextContent(
    content="Hello **world**",
    author="agent",
    format="markdown"  # "plain" | "markdown" | "code"
))

Custom Dependencies

Python Dependencies

Add Python packages using uv:
uv add requests
This updates your pyproject.toml automatically. Dependencies are installed during the Docker build.

System Dependencies

For system-level dependencies (e.g., ffmpeg, imagemagick), modify your Dockerfile directly:
# Add system dependencies
RUN apt-get update && apt-get install -y \
    ffmpeg \
    imagemagick \
    && rm -rf /var/lib/apt/lists/*
Do not modify the ENTRYPOINT or CMD in your Dockerfile. The platform uses these to register and run your agent correctly. Changing them will cause deployment failures.

Error Handling

Wrap your handler logic in try/except to gracefully handle errors and give users feedback:
@server.on_event
async def handle_event(ctx: TaskContext, event: Event):
    try:
        # Your agent logic here
        result = await process_request(event.content)
        await ctx.messages.send(result)
    except Exception as e:
        await ctx.messages.send(f"Something went wrong. We're looking into it.")
        raise
You should re-raise the errors. We catch them and give you the ability to drill down into logs on our platform. We currently don’t give you the ability to give you notifications about errors. We recommend adding a product like Sentry to your agent for this.

CLI Commands

tu init

tu init
Interactive command to scaffold a new agent project. Prompts for namespace slug, agent name, and description.

Next Steps

You now know the fundamentals:
  • Three handlers: on_create, on_event, on_complete
  • TaskContext for convenience
  • State for memory
  • Messages for responses

Deploy Your Agent

Get your agent running in production