BUILD · Jun 6, 2026

Build a state machine for your AI agent in a weekend

The 6-state per-turn FSM that makes agents reliable. How to build it: start → provision → assistant_stream → function_execute → steering_check → teardown.

Agent-ready — drop this post into Claude Code or Codex

The first time I shipped an agent without a state machine, it worked perfectly — for about 20 minutes. Then a user triggered a rare edge case, the agent ran the same tool call 11 times in a row, and I woke up to a $90 API bill.

The agent didn’t know when to stop. It didn’t know where it was in the turn. It didn’t have a state machine.

A per-turn state machine is the orchestrator that drives a single user request through its lifecycle. It knows what state the agent is in, what transitions are valid, what to do when things go wrong, and when to clean up and stop. Without one, your agent is held together with if statements and hope.

Here’s how to build one in a weekend. No framework required.

TL;DR: The agent turn FSM has 6 states — start, provision, assistant_stream, function_execute, steering_check, and teardown. Each state has valid transitions and error handlers. The steering_check is where the loop decides to continue or stop. Teardown is where resources get freed and the session gets persisted. You can implement the whole thing in about 100 lines of Python.

Key takeaways:

  • The state machine is the conductor — it knows where the turn is and when to transition
  • 6 states: start → provision → assistant_stream → function_execute → steering_check → teardown
  • steering_check is the loop decision point — continue, stop, or max_turns reached
  • teardown is inlined into every exit path — resources freed, session persisted
  • Error handlers at each state: retry → fallback → fail, with circuit breaker to prevent runaway loops

The head chef’s pass as state machine

Think about how a restaurant kitchen works on a busy night.

The head chef stands at the pass and knows exactly where every dish is: “the risotto is plating, the steak is resting, the soup is on the burner.” When a dish finishes one stage — the risotto has been plated — the chef calls out “risotto out” and the server takes it to the dining room. When something goes wrong — the fish is overcooked — the chef decides what to do: remake it, substitute, or tell the table there’s a delay.

The kitchen’s pass is a state machine. The chef knows the current state of every dish, knows what transitions are valid, and knows how to handle errors.

Your agent needs the same thing.

The 6 states

Every agent turn goes through these 6 states:

start → provision → assistant_stream → function_execute → steering_check → teardown
         ↑                                                                             ↓
         └────────────────────── continue ────────────────────────────────────────────┘

start: Receive the turn request, give it a unique ID, persist it to the session store. Seed the initial TurnStateRecord. Return immediately — the actual work happens when the turn is woken by the next transition.

provision: Boot the sandbox if the run needs isolated execution. Download skill bodies for the namespaces the run uses. Assemble the system prompt from three layers: the mode paragraph (plan/ask/agent), the identity preamble (teaches the model the trigger convention and on-demand skill discovery pattern), and the appended skill index. The caller can override the whole prompt by passing system_prompt on start; otherwise the orchestrator builds it.

assistant_stream: Call the provider worker (Anthropic, OpenAI, whatever you’re using), stream the response into the channel, emit message_update events for the UI fanout. When the assistant returns tool calls, transition to function_execute. When the assistant returns a final message with no tools, transition to teardown.

function_execute: Every tool call goes through dispatchWithHook — the single chokepoint. The policy check (allow/deny/needs_approval) runs here. If needs_approval, the call parks and the turn waits. When all calls in the batch are resolved, transition to steering_check.

steering_check: After the batch completes, decide: continue (back to assistant_stream with the tool results), stop (clean exit), or max_turns reached (stop with a reason). This is the loop gate.

teardown: Clean up the sandbox, free resources, emit agent_end event, persist the final session state. This is inlined into every exit path — not a separate enqueued step.

The state machine class

Here’s the implementation:

from enum import Enum
from dataclasses import dataclass, field
from typing import Optional, Callable
from datetime import datetime

class TurnState(Enum):
    START = "start"
    PROVISION = "provision"
    ASSISTANT_STREAM = "assistant_stream"
    FUNCTION_EXECUTE = "function_execute"
    STEERING_CHECK = "steering_check"
    STOPPED = "stopped"
    FAILED = "failed"

@dataclass
class TurnContext:
    turn_id: str
    session_id: str
    current_state: TurnState = TurnState.START
    tool_calls: list = field(default_factory=list)
    tool_results: list = field(default_factory=list)
    turn_count: int = 0
    max_turns: int = 20
    error: Optional[str] = None
    pending_approvals: list = field(default_factory=list)

class AgentTurnMachine:
    def __init__(self, config: dict):
        self.config = config
        self.state_handlers = {
            TurnState.START: self.handle_start,
            TurnState.PROVISION: self.handle_provision,
            TurnState.ASSISTANT_STREAM: self.handle_assistant_stream,
            TurnState.FUNCTION_EXECUTE: self.handle_function_execute,
            TurnState.STEERING_CHECK: self.handle_steering_check,
        }

    def run(self, turn_context: TurnContext) -> dict:
        """Run the turn machine until a terminal state is reached."""
        while turn_context.current_state not in (TurnState.STOPPED, TurnState.FAILED):
            handler = self.state_handlers.get(turn_context.current_state)
            if not handler:
                turn_context.current_state = TurnState.FAILED
                turn_context.error = f"No handler for state {turn_context.current_state}"
                break

            try:
                next_state = handler(turn_context)
                if next_state == turn_context.current_state:
                    raise ValueError(f"State handler {handler.__name__} returned same state without advancing")
                turn_context.current_state = next_state
            except Exception as e:
                return self.handle_error(turn_context, e)

        return self.build_response(turn_context)

    def handle_start(self, ctx: TurnContext) -> TurnState:
        """Persist the turn request and seed the initial state."""
        # Persist the run request to the session store
        persist_turn_request(
            turn_id=ctx.turn_id,
            session_id=ctx.session_id,
            timestamp=datetime.utcnow().isoformat()
        )
        # Seed the TurnStateRecord
        seed_turn_state(session_id=ctx.session_id, turn_id=ctx.turn_id)
        return TurnState.PROVISION

    def handle_provision(self, ctx: TurnContext) -> TurnState:
        """Boot sandbox, download skills, assemble system prompt."""
        # Boot sandbox if needed
        if self.config.get("sandbox_enabled"):
            boot_sandbox(session_id=ctx.session_id)

        # Download skill bodies for configured namespaces
        for namespace in self.config.get("system_default_skills", ["iii://iii-directory/index"]):
            download_skills(namespace=namespace)

        # Assemble system prompt from mode + identity + skills
        ctx.system_prompt = assemble_system_prompt(
            mode=self.config.get("mode", "agent"),
            identity_preamble=self.config.get("identity_preamble"),
            default_skills=self.config.get("default_skills", [])
        )

        return TurnState.ASSISTANT_STREAM

    def handle_assistant_stream(self, ctx: TurnContext) -> TurnState:
        """Call the AI model and stream the response."""
        ctx.turn_count += 1

        # Check max turns
        if ctx.turn_count > ctx.max_turns:
            return TurnState.STOPPED

        # Call the provider and stream the response
        response = call_provider(
            provider=ctx.provider,
            prompt=ctx.system_prompt,
            messages=ctx.messages,
            tools=ctx.available_tools,
            stream=True
        )

        # Collect tool calls from the response
        ctx.tool_calls = response.tool_calls or []
        ctx.pending_approvals = []

        if not ctx.tool_calls:
            # No tools — final response, we're done
            ctx.final_response = response.content
            return TurnState.STOPPED

        return TurnState.FUNCTION_EXECUTE

    def handle_function_execute(self, ctx: TurnContext) -> TurnState:
        """Run each tool call through the policy gate, execute, collect results."""
        ctx.tool_results = []

        for call in ctx.tool_calls:
            # consultBefore — the policy gate
            outcome = consultBefore(call, timeout=5.0)

            if outcome["decision"] == "allow":
                result = execute_tool(call)
                ctx.tool_results.append({"call_id": call.id, "result": result})
            elif outcome["decision"] == "deny":
                ctx.tool_results.append({"call_id": call.id, "denied": True, "reason": outcome["reason"]})
            elif outcome["decision"] == "needs_approval":
                ctx.pending_approvals.append({"call": call, "reason": outcome.get("reason")})

        # If there are pending approvals, wait for them
        if ctx.pending_approvals:
            # Transition to a waiting state — this would be handled by the approval trigger
            # For now, we park and return control to the caller
            return TurnState.STEERING_CHECK  # simplified for this example

        return TurnState.STEERING_CHECK

    def handle_steering_check(self, ctx: TurnContext) -> TurnState:
        """Decide: continue the loop, stop, or fail."""
        # Check if any tool called a stop
        for result in ctx.tool_results:
            if result.get("stop_signal"):
                return TurnState.STOPPED

        # Check if all tool results are in
        if len(ctx.tool_results) == len(ctx.tool_calls):
            # Loop back to assistant_stream with the tool results
            ctx.messages.append({"role": "assistant", "tool_calls": ctx.tool_calls})
            for result in ctx.tool_results:
                ctx.messages.append({"role": "tool", "tool_call_id": result["call_id"], "content": result.get("result", "")})
            return TurnState.ASSISTANT_STREAM

        return TurnState.STOPPED

    def handle_error(self, ctx: TurnContext, error: Exception) -> dict:
        """Handle errors — retry, fallback, or fail."""
        ctx.error = str(error)
        ctx.current_state = TurnState.FAILED

        # Ack the queue so it stops retrying
        ack_queue(ctx.turn_id)

        return {
            "stop_reason": "error",
            "error": ctx.error,
            "agent_end": True
        }

    def build_response(self, ctx: TurnContext) -> dict:
        """Build the final response for the client."""
        return {
            "stop_reason": "stopped" if ctx.current_state == TurnState.STOPPED else "error",
            "final_response": getattr(ctx, "final_response", ""),
            "turn_count": ctx.turn_count,
            "error": ctx.error
        }

The steering check: where the loop decides

The most important part of the state machine is the steering check — where the agent decides whether to keep going or stop.

The logic is simple:

def steering_check(ctx: TurnContext) -> TurnState:
    # Did the model signal a stop?
    if ctx.stop_signal:
        return TurnState.STOPPED

    # Did we hit max turns?
    if ctx.turn_count >= ctx.max_turns:
        return TurnState.STOPPED

    # Do we have tool results to process?
    if ctx.tool_results and len(ctx.tool_results) == len(ctx.tool_calls):
        # Continue the loop — append results and go back to the model
        append_tool_results_to_messages(ctx)
        return TurnState.ASSISTANT_STREAM

    # Nothing more to do
    return TurnState.STOPPED

The key property: the steering check is a single point where the loop decision is made. You can add any condition here — budget limits, user signals, quality gates — without changing the rest of the state machine.

Teardown: inlined, not a step

Teardown is not a separate enqueued step. It’s inlined into every exit path. When the state machine reaches STOPPED or FAILED, teardown runs immediately:

def teardown(ctx: TurnContext):
    """Clean up resources — inlined into every exit path."""
    # Free the sandbox
    if ctx.sandbox_id:
        free_sandbox(ctx.sandbox_id)

    # Emit agent_end event
    emit_event("agent_end", {
        "session_id": ctx.session_id,
        "turn_id": ctx.turn_id,
        "turn_count": ctx.turn_count,
        "stop_reason": ctx.current_state.value
    })

    # Persist the final session state
    persist_session_state(
        session_id=ctx.session_id,
        turn_id=ctx.turn_id,
        final_state=ctx.current_state.value,
        turn_count=ctx.turn_count
    )

Inlining teardown removes one durable queue hop per turn. Every exit path — stop, fail, max_turns — calls the same teardown function. Resources get freed and the session gets persisted consistently, regardless of how the turn ended.

Error handling and circuit breakers

The state machine needs error handlers at every state. The pattern:

def handle_function_execute_with_retry(self, ctx: TurnContext) -> TurnState:
    max_retries = 3
    retry_count = 0

    while retry_count < max_retries:
        try:
            result = execute_tool(ctx.tool_call)
            return TurnState.STEERING_CHECK
        except ToolExecutionError as e:
            retry_count += 1
            if retry_count >= max_retries:
                # Circuit breaker — stop retrying
                ctx.error = f"Tool failed after {max_retries} retries: {e}"
                return TurnState.FAILED
            # Exponential backoff before retry
            sleep(2 ** retry_count)
        except Exception as e:
            ctx.error = f"Unexpected error: {e}"
            return TurnState.FAILED

The circuit breaker prevents runaway loops. After 3 consecutive failures in function_execute, the state machine stops retrying and transitions to FAILED. This is what stops the $90 bill scenario — the agent can’t loop forever.

The state transition table

From stateValid transitionsError transition
START→ PROVISION→ FAILED
PROVISION→ ASSISTANT_STREAM→ FAILED
ASSISTANT_STREAM→ FUNCTION_EXECUTE (tool calls), STOPPED (final response)→ FAILED
FUNCTION_EXECUTE→ STEERING_CHECK→ FAILED
STEERING_CHECK→ ASSISTANT_STREAM (continue), STOPPED (stop), FAILED (unrecoverable)→ FAILED
STOPPED / FAILED(terminal)

Every transition is explicit. Every state has a handler. Every error has a path to FAILED.

What this enables

Reliable recovery: If the agent crashes mid-turn, the state machine knows which state it was in and can resume from that point. The turn is persisted before the transition happens.

Observability: Every state transition can be logged with the context at that moment. You can reconstruct exactly what happened, state by state, for debugging.

Controlled loops: The steering check is the gate. The agent can’t loop forever without hitting max_turns. Budget limits, quality gates, user signals — all checked at the same point.

Clean teardown: Resources are always freed. Sessions are always persisted. The agent doesn’t leak sandboxes or lose conversation history on exit.

The weekend project

Here’s the sequence to build this in a weekend:

Saturday morning: Set up the TurnState enum and TurnContext dataclass. Get the state machine skeleton running with empty handlers.

Saturday afternoon: Implement start → provision → assistant_stream. Get a turn flowing through the first three states with a real model call.

Saturday evening: Implement function_execute with the policy gate. Get tool calls executing through consultBefore.

Sunday morning: Implement steering_check and the loop. Get the agent looping correctly — model → tools → model → tools → stop.

Sunday afternoon: Add error handling, circuit breakers, and teardown. Test the failure modes — what happens when a tool fails, when the model times out, when max_turns is hit.

By Sunday evening, you have a working state machine that handles real turns reliably.

Agent mode: The state machine is the foundation of agent reliability. It knows where the turn is, where it can go, and what to do when things break. Without it, you’re running if statements. With it, you’re running a production system.

Read AI agent multi-step workflows for how workflow patterns (sequential, parallel, conditional) layer on top of the state machine — the state machine manages the turn lifecycle, the workflow manages the step execution.

Read AI agent error handling patterns for the companion topic — retry strategies, circuit breakers, and structured error responses that work with the state machine.

Read AI agent policy gates for how the function_execute state uses the policy gate (consultBefore) to check every tool call before execution.

Read AI agent branching sessions for how the session store works with the state machine — each turn is a node in a branching tree, and the state machine drives the turn through its lifecycle.


This article was published on Agentic Up (https://agenticup.dev) — practical guides for developers and founders building with AI agents. Reach me at [email protected].

Newsletter

Get the brief on AI agents

Practical posts on shipping agents, automating work, and building in public. No hype, no fluff.

Contact: [email protected]