---
title: Build a state machine for your AI agent in a weekend
canonical: "https://agenticup.dev/posts/ai-agent-state-machine/"
pubDate: "2026-06-06T00:00:00.000Z"
description: "Your agent crashes mid-conversation and doesn't recover. It runs the same tool call 10 times. It doesn't know when to stop. Those are all state machine problems. Here's how to build the FSM that fixes all of them. in a weekend, with no framework."
tags: [state-machine, orchestration, agent-loop, fsm, turn-orchestrator, production]
---

The first time I shipped an agent without a state machine, it worked perfectly: for about 20 minutes. Then a user triggered a rare edge case, the agent ran the same tool call 11 times in a row, and I woke up to a $90 API bill.

The agent didn't know when to stop. It didn't know where it was in the turn. It didn't have a state machine.

A per-turn state machine is the orchestrator that drives a single user request through its lifecycle. It knows what state the agent is in, what transitions are valid, what to do when things go wrong, and when to clean up and stop. Without one, your agent is held together with if statements and hope.

Here's how to build one in a weekend. No framework required.

**TL;DR:** The agent turn FSM has 6 states: start, provision, assistant_stream, function_execute, steering_check, and teardown. Each state has valid transitions and error handlers. The steering_check is where the loop decides to continue or stop. Teardown is where resources get freed and the session gets persisted. You can implement the whole thing in about 100 lines of Python.

> **Key takeaways:**
> - The state machine is the conductor: it knows where the turn is and when to transition
> - 6 states: start → provision → assistant_stream → function_execute → steering_check → teardown
> - steering_check is the loop decision point: continue, stop, or max_turns reached
> - teardown is inlined into every exit path: resources freed, session persisted
> - Error handlers at each state: retry → fallback → fail, with circuit breaker to prevent runaway loops

## The head chef's pass as state machine

Think about how a restaurant kitchen works on a busy night.

The head chef stands at the pass and knows exactly where every dish is: "the risotto is plating, the steak is resting, the soup is on the burner." When a dish finishes one stage: the risotto has been plated: the chef calls out "risotto out" and the server takes it to the dining room. When something goes wrong: the fish is overcooked: the chef decides what to do: remake it, substitute, or tell the table there's a delay.

The kitchen's pass is a state machine. The chef knows the current state of every dish, knows what transitions are valid, and knows how to handle errors.

Your agent needs the same thing.

## The 6 states

Every agent turn goes through these 6 states:

```
start → provision → assistant_stream → function_execute → steering_check → teardown
 ↑ ↓
 └────────────────────── continue ────────────────────────────────────────────┘
```

**start**: Receive the turn request, give it a unique ID, persist it to the session store. Seed the initial TurnStateRecord. Return immediately: the actual work happens when the turn is woken by the next transition.

**provision**: Boot the sandbox if the run needs isolated execution. Download skill bodies for the namespaces the run uses. Assemble the system prompt from three layers: the mode paragraph (plan/ask/agent), the identity preamble (teaches the model the trigger convention and on-demand skill discovery pattern), and the appended skill index. The caller can override the whole prompt by passing system_prompt on start; otherwise the orchestrator builds it.

**assistant_stream**: Call the provider worker (Anthropic, OpenAI, whatever you're using), stream the response into the channel, emit message_update events for the UI fanout. When the assistant returns tool calls, transition to function_execute. When the assistant returns a final message with no tools, transition to teardown.

**function_execute**: Every tool call goes through dispatchWithHook: the single chokepoint. The policy check (allow/deny/needs_approval) runs here. If needs_approval, the call parks and the turn waits. When all calls in the batch are resolved, transition to steering_check.

**steering_check**: After the batch completes, decide: continue (back to assistant_stream with the tool results), stop (clean exit), or max_turns reached (stop with a reason). This is the loop gate.

**teardown**: Clean up the sandbox, free resources, emit agent_end event, persist the final session state. This is inlined into every exit path: not a separate enqueued step.

## The state machine class

Here's the implementation:

```python
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional, Callable
from datetime import datetime

class TurnState(Enum):
 START = "start"
 PROVISION = "provision"
 ASSISTANT_STREAM = "assistant_stream"
 FUNCTION_EXECUTE = "function_execute"
 STEERING_CHECK = "steering_check"
 STOPPED = "stopped"
 FAILED = "failed"

@dataclass
class TurnContext:
 turn_id: str
 session_id: str
 current_state: TurnState = TurnState.START
 tool_calls: list = field(default_factory=list)
 tool_results: list = field(default_factory=list)
 turn_count: int = 0
 max_turns: int = 20
 error: Optional[str] = None
 pending_approvals: list = field(default_factory=list)

class AgentTurnMachine:
 def __init__(self, config: dict):
 self.config = config
 self.state_handlers = {
 TurnState.START: self.handle_start,
 TurnState.PROVISION: self.handle_provision,
 TurnState.ASSISTANT_STREAM: self.handle_assistant_stream,
 TurnState.FUNCTION_EXECUTE: self.handle_function_execute,
 TurnState.STEERING_CHECK: self.handle_steering_check,
 }

 def run(self, turn_context: TurnContext) -> dict:
 """Run the turn machine until a terminal state is reached."""
 while turn_context.current_state not in (TurnState.STOPPED, TurnState.FAILED):
 handler = self.state_handlers.get(turn_context.current_state)
 if not handler:
 turn_context.current_state = TurnState.FAILED
 turn_context.error = f"No handler for state {turn_context.current_state}"
 break

 try:
 next_state = handler(turn_context)
 if next_state == turn_context.current_state:
 raise ValueError(f"State handler {handler.__name__} returned same state without advancing")
 turn_context.current_state = next_state
 except Exception as e:
 return self.handle_error(turn_context, e)

 return self.build_response(turn_context)

 def handle_start(self, ctx: TurnContext) -> TurnState:
 """Persist the turn request and seed the initial state."""
 # Persist the run request to the session store
 persist_turn_request(
 turn_id=ctx.turn_id,
 session_id=ctx.session_id,
 timestamp=datetime.utcnow().isoformat()
 )
 # Seed the TurnStateRecord
 seed_turn_state(session_id=ctx.session_id, turn_id=ctx.turn_id)
 return TurnState.PROVISION

 def handle_provision(self, ctx: TurnContext) -> TurnState:
 """Boot sandbox, download skills, assemble system prompt."""
 # Boot sandbox if needed
 if self.config.get("sandbox_enabled"):
 boot_sandbox(session_id=ctx.session_id)

 # Download skill bodies for configured namespaces
 for namespace in self.config.get("system_default_skills", ["iii://iii-directory/index"]):
 download_skills(namespace=namespace)

 # Assemble system prompt from mode + identity + skills
 ctx.system_prompt = assemble_system_prompt(
 mode=self.config.get("mode", "agent"),
 identity_preamble=self.config.get("identity_preamble"),
 default_skills=self.config.get("default_skills", [])
 )

 return TurnState.ASSISTANT_STREAM

 def handle_assistant_stream(self, ctx: TurnContext) -> TurnState:
 """Call the AI model and stream the response."""
 ctx.turn_count += 1

 # Check max turns
 if ctx.turn_count > ctx.max_turns:
 return TurnState.STOPPED

 # Call the provider and stream the response
 response = call_provider(
 provider=ctx.provider,
 prompt=ctx.system_prompt,
 messages=ctx.messages,
 tools=ctx.available_tools,
 stream=True
 )

 # Collect tool calls from the response
 ctx.tool_calls = response.tool_calls or []
 ctx.pending_approvals = []

 if not ctx.tool_calls:
 # No tools: final response, we're done
 ctx.final_response = response.content
 return TurnState.STOPPED

 return TurnState.FUNCTION_EXECUTE

 def handle_function_execute(self, ctx: TurnContext) -> TurnState:
 """Run each tool call through the policy gate, execute, collect results."""
 ctx.tool_results = []

 for call in ctx.tool_calls:
 # consultBefore: the policy gate
 outcome = consultBefore(call, timeout=5.0)

 if outcome["decision"] == "allow":
 result = execute_tool(call)
 ctx.tool_results.append({"call_id": call.id, "result": result})
 elif outcome["decision"] == "deny":
 ctx.tool_results.append({"call_id": call.id, "denied": True, "reason": outcome["reason"]})
 elif outcome["decision"] == "needs_approval":
 ctx.pending_approvals.append({"call": call, "reason": outcome.get("reason")})

 # If there are pending approvals, wait for them
 if ctx.pending_approvals:
 # Transition to a waiting state: this would be handled by the approval trigger
 # For now, we park and return control to the caller
 return TurnState.STEERING_CHECK # simplified for this example

 return TurnState.STEERING_CHECK

 def handle_steering_check(self, ctx: TurnContext) -> TurnState:
 """Decide: continue the loop, stop, or fail."""
 # Check if any tool called a stop
 for result in ctx.tool_results:
 if result.get("stop_signal"):
 return TurnState.STOPPED

 # Check if all tool results are in
 if len(ctx.tool_results) == len(ctx.tool_calls):
 # Loop back to assistant_stream with the tool results
 ctx.messages.append({"role": "assistant", "tool_calls": ctx.tool_calls})
 for result in ctx.tool_results:
 ctx.messages.append({"role": "tool", "tool_call_id": result["call_id"], "content": result.get("result", "")})
 return TurnState.ASSISTANT_STREAM

 return TurnState.STOPPED

 def handle_error(self, ctx: TurnContext, error: Exception) -> dict:
 """Handle errors: retry, fallback, or fail."""
 ctx.error = str(error)
 ctx.current_state = TurnState.FAILED

 # Ack the queue so it stops retrying
 ack_queue(ctx.turn_id)

 return {
 "stop_reason": "error",
 "error": ctx.error,
 "agent_end": True
 }

 def build_response(self, ctx: TurnContext) -> dict:
 """Build the final response for the client."""
 return {
 "stop_reason": "stopped" if ctx.current_state == TurnState.STOPPED else "error",
 "final_response": getattr(ctx, "final_response", ""),
 "turn_count": ctx.turn_count,
 "error": ctx.error
 }
```

## The steering check: where the loop decides

The most important part of the state machine is the steering check: where the agent decides whether to keep going or stop.

The logic is simple:

```python
def steering_check(ctx: TurnContext) -> TurnState:
 # Did the model signal a stop?
 if ctx.stop_signal:
 return TurnState.STOPPED

 # Did we hit max turns?
 if ctx.turn_count >= ctx.max_turns:
 return TurnState.STOPPED

 # Do we have tool results to process?
 if ctx.tool_results and len(ctx.tool_results) == len(ctx.tool_calls):
 # Continue the loop: append results and go back to the model
 append_tool_results_to_messages(ctx)
 return TurnState.ASSISTANT_STREAM

 # Nothing more to do
 return TurnState.STOPPED
```

The key property: the steering check is a single point where the loop decision is made. You can add any condition here, budget limits, user signals, quality gates, without changing the rest of the state machine.

## Teardown: inlined, not a step

Teardown is not a separate enqueued step. It's inlined into every exit path. When the state machine reaches STOPPED or FAILED, teardown runs immediately:

```python
def teardown(ctx: TurnContext):
 """Clean up resources: inlined into every exit path."""
 # Free the sandbox
 if ctx.sandbox_id:
 free_sandbox(ctx.sandbox_id)

 # Emit agent_end event
 emit_event("agent_end", {
 "session_id": ctx.session_id,
 "turn_id": ctx.turn_id,
 "turn_count": ctx.turn_count,
 "stop_reason": ctx.current_state.value
 })

 # Persist the final session state
 persist_session_state(
 session_id=ctx.session_id,
 turn_id=ctx.turn_id,
 final_state=ctx.current_state.value,
 turn_count=ctx.turn_count
 )
```

Inlining teardown removes one durable queue hop per turn. Every exit path, stop, fail, max_turns, calls the same teardown function. Resources get freed and the session gets persisted consistently, regardless of how the turn ended.

## Error handling and circuit breakers

The state machine needs error handlers at every state. The pattern:

```python
def handle_function_execute_with_retry(self, ctx: TurnContext) -> TurnState:
 max_retries = 3
 retry_count = 0

 while retry_count < max_retries:
 try:
 result = execute_tool(ctx.tool_call)
 return TurnState.STEERING_CHECK
 except ToolExecutionError as e:
 retry_count += 1
 if retry_count >= max_retries:
 # Circuit breaker: stop retrying
 ctx.error = f"Tool failed after {max_retries} retries: {e}"
 return TurnState.FAILED
 # Exponential backoff before retry
 sleep(2 ** retry_count)
 except Exception as e:
 ctx.error = f"Unexpected error: {e}"
 return TurnState.FAILED
```

The circuit breaker prevents runaway loops. After 3 consecutive failures in function_execute, the state machine stops retrying and transitions to FAILED. This is what stops the $90 bill scenario: the agent can't loop forever.

## The state transition table

| From state | Valid transitions | Error transition |
|---|---|---|
| START | → PROVISION | → FAILED |
| PROVISION | → ASSISTANT_STREAM | → FAILED |
| ASSISTANT_STREAM | → FUNCTION_EXECUTE (tool calls), STOPPED (final response) | → FAILED |
| FUNCTION_EXECUTE | → STEERING_CHECK | → FAILED |
| STEERING_CHECK | → ASSISTANT_STREAM (continue), STOPPED (stop), FAILED (unrecoverable) | → FAILED |
| STOPPED / FAILED | (terminal) |. |

Every transition is explicit. Every state has a handler. Every error has a path to FAILED.

## What this enables

**Reliable recovery**: If the agent crashes mid-turn, the state machine knows which state it was in and can resume from that point. The turn is persisted before the transition happens.

**Observability**: Every state transition can be logged with the context at that moment. You can reconstruct exactly what happened, state by state, for debugging.

**Controlled loops**: The steering check is the gate. The agent can't loop forever without hitting max_turns. Budget limits, quality gates, user signals: all checked at the same point.

**Clean teardown**: Resources are always freed. Sessions are always persisted. The agent doesn't leak sandboxes or lose conversation history on exit.

## The weekend project

Here's the sequence to build this in a weekend:

**Saturday morning**: Set up the TurnState enum and TurnContext dataclass. Get the state machine skeleton running with empty handlers.

**Saturday afternoon**: Implement start → provision → assistant_stream. Get a turn flowing through the first three states with a real model call.

**Saturday evening**: Implement function_execute with the policy gate. Get tool calls executing through consultBefore.

**Sunday morning**: Implement steering_check and the loop. Get the agent looping correctly: model → tools → model → tools → stop.

**Sunday afternoon**: Add error handling, circuit breakers, and teardown. Test the failure modes: what happens when a tool fails, when the model times out, when max_turns is hit.

By Sunday evening, you have a working state machine that handles real turns reliably.

> **Agent mode:** The state machine is the foundation of agent reliability. It knows where the turn is, where it can go, and what to do when things break. Without it, you're running if statements. With it, you're running a production system.

## Related Posts

Read [AI agent multi-step workflows](/posts/ai-agent-multi-step-workflows/) for how workflow patterns (sequential, parallel, conditional) layer on top of the state machine: the state machine manages the turn lifecycle, the workflow manages the step execution.

Read [AI agent error handling patterns](/posts/ai-agent-error-handling-patterns/) for the companion topic: retry strategies, circuit breakers, and structured error responses that work with the state machine.

Read [AI agent policy gates](/posts/ai-agent-policy-gates/) for how the function_execute state uses the policy gate (consultBefore) to check every tool call before execution.

Read [AI agent branching sessions](/posts/ai-agent-branching-sessions/) for how the session store works with the state machine: each turn is a node in a branching tree, and the state machine drives the turn through its lifecycle.


[Zylos AI's guide to finite state machines for agent orchestration](https://zylos.ai/research/2026-04-02-finite-state-machines-statecharts-ai-agent-orchestration/) covers FSM design patterns for production agents.
MLflow's guide to building production-ready AI agents (https://mlflow.org/articles/building-production-ready-ai-agents-in-2026/) covers state management and deployment patterns.



[Zylos AI's FSM guide](https://zylos.ai/research/2026-04-02-finite-state-machines-statecharts-ai-agent-orchestration/) covers state machine design patterns for production agents.
[MLflow's production guide](https://mlflow.org/articles/building-production-ready-ai-agents-in-2026/) covers state management and deployment patterns.


---

This article was published on Agentic Up (https://agenticup.dev): practical guides for developers and founders building with AI agents. Reach me at hello@agenticup.dev.
