---
title: "AI agent multi-step workflows: building complex pipelines"
canonical: "https://agenticup.dev/posts/ai-agent-multi-step-workflows/"
pubDate: "2026-06-01T00:00:00.000Z"
description: "Four workflow patterns every agent developer needs — sequential, parallel fan-out, conditional branching, and loop with human-in-the-loop. With real code and production lessons."
tags: [workflows, multi-step, orchestration, agents, production]
---

Research from the [ReAct paper](https://arxiv.org/abs/2210.03629) (Yao et al., 2022) shows that interleaving reasoning steps with tool use — the core of multi-step workflows — significantly outperforms single-step LLM calls on complex tasks.

The [LangGraph documentation](https://langchain-ai.github.io/langgraph/) provides the graph-based architecture for multi-step agent workflows — sequential chains, parallel execution, and conditional branching all map to graph nodes and edges.


**TL;DR:** Four workflow patterns power every multi-step agent: sequential chains, parallel fan-out, conditional branching, and loop with human-in-the-loop checkpoints. State management is the hardest part — persist state between steps so workflows survive failures. Multi-step agents cost 3-10x more than single-step agents, so budget accordingly.

A single LLM call is not an agent workflow. It's a completion. The real power — and the real complexity — starts when you chain multiple steps together: fetch data, analyse it, make decisions, take actions, verify results.

I've built multi-step workflows for document processing, content generation, customer support triage, and code review agents. The patterns repeat across all of them. Here's what I've learned about orchestrating complex agent pipelines.

> **Key takeaways:**
> - Four core patterns: sequential, parallel fan-out, conditional branching, and loop with HITL checkpoints
> - State management is the hardest part — persist state between steps so workflows survive failures
> - Error handling at each step: retry → fallback → flag for human review
> - LangGraph works well for complex state machines; custom works better for cost-sensitive or tightly integrated workflows
> - Multi-step workflows cost 3-10x more than single-step agents — budget accordingly

## The four workflow patterns

Every multi-step agent workflow is a combination of these four patterns. Master these, and you can orchestrate anything.

### 1. Sequential chain

The simplest pattern: step A feeds into step B, which feeds into step C. Each step depends on the previous one.

```python
class SequentialWorkflow:
    def __init__(self, steps: list):
        self.steps = steps  # List of (name, handler) tuples

    async def run(self, initial_input: dict) -> dict:
        context = initial_input

        for step_name, handler in self.steps:
            print(f"  → Running step: {step_name}")

            try:
                result = await handler(context)
                context[step_name] = result
                context["last_step"] = step_name
            except Exception as e:
                return {
                    "success": False,
                    "error": f"Step '{step_name}' failed: {str(e)}",
                    "context": context
                }

        return {"success": True, "context": context}
```

**When to use:** Any workflow where each step builds on the previous one. Document processing (extract → classify → redact → store), content generation (research → outline → draft → review), data pipelines.

**Watch out for:** Long chains where an error in step 2 wastes the work of step 1. Always check whether earlier steps can be rolled back or compensated.

### 2. Parallel fan-out

One agent analyses the input, determines sub-tasks, and spawns multiple worker agents that run in parallel. The results are collected and merged.

```python
import asyncio

class ParallelFanOutWorkflow:
    def __init__(self, planner, workers: list, merger):
        self.planner = planner      # Determines sub-tasks
        self.workers = workers      # List of worker agents
        self.merger = merger        # Combines results

    async def run(self, task: str) -> dict:
        # Step 1: Plan — break task into sub-tasks
        sub_tasks = await self.planner.plan(task)
        print(f"  → Generated {len(sub_tasks)} sub-tasks")

        # Step 2: Execute all sub-tasks in parallel
        async def execute_worker(sub_task):
            worker = self.workers[sub_task.type]
            return await worker.run(sub_task)

        results = await asyncio.gather(
            *[execute_worker(st) for st in sub_tasks],
            return_exceptions=True
        )

        # Step 3: Merge results
        successful = [r for r in results if not isinstance(r, Exception)]
        failed = [r for r in results if isinstance(r, Exception)]

        final = await self.merger.merge(successful)

        return {
            "success": len(failed) == 0,
            "result": final,
            "stats": {"total": len(sub_tasks), "succeeded": len(successful), "failed": len(failed)}
        }
```

**When to use:** Research agents that search multiple sources, code review agents that analyse multiple files, content agents that generate multiple variations. Any task that can be decomposed into independent sub-tasks.

**Watch out for:** Cost explosion. If each worker makes multiple LLM calls, a 10-worker fan-out can generate 30-50 LLM calls per workflow run. Set budget limits per worker.

<div class="callout">
  <div class="callout-title">Pro tip</div>
  <p>Always set a timeout for parallel workers. One stuck worker should not block the entire workflow. I use <code>asyncio.wait_for(worker.run(task), timeout=30)</code> per worker.</p>
</div>

### 3. Conditional branching

The agent evaluates a condition and routes to different paths based on the result. If-else logic for agents.

```python
class ConditionalBranchingWorkflow:
    def __init__(self, router, branches: dict):
        self.router = router          # Evaluates conditions
        self.branches = branches      # {"condition_name": handler}

    async def run(self, context: dict) -> dict:
        # Evaluate routing condition
        decision = await self.router.evaluate(context)
        print(f"  → Routing decision: {decision}")

        # Execute the matching branch
        handler = self.branches.get(decision)
        if not handler:
            return {"success": False, "error": f"No handler for decision: {decision}"}

        result = await handler.run(context)
        return {"success": True, "decision": decision, "result": result}
```

**When to use:** Support ticket triage (route to billing, technical, or account team), content moderation (allow, flag, or reject), dynamic workflow routing where the next step depends on data quality or content type.

**Real example from a document processing pipeline I built:**

```python
async def route_document(context):
    """Router: decides which branch to take based on document type and confidence."""
    doc_type = context.get("classification", {}).get("type")
    confidence = context.get("classification", {}).get("confidence", 0)

    if confidence < 0.6:
        return "manual_review"           # Low confidence — human needs to look
    elif doc_type == "invoice":
        return "invoice_processing"       # Standard invoice path
    elif doc_type == "contract":
        return "contract_review"          # Contract needs legal review
    else:
        return "general_processing"       # Everything else
```

### 4. Loop with human-in-the-loop

The agent runs autonomously until it reaches a checkpoint that requires human approval. It pauses, waits for input, then continues based on the human's decision.

```python
class HumanInTheLoopWorkflow:
    def __init__(self, agent, checkpoints: list):
        self.agent = agent
        self.checkpoints = checkpoints  # Steps that need human approval

    async def run(self, task: str, notify_human, wait_for_approval):
        context = {"task": task, "step": 0}

        while True:
            context["step"] += 1

            # Run the agent for one step
            result = await self.agent.step(context)

            # Check if this step needs human approval
            if result.get("checkpoint"):
                # Notify human and wait
                await notify_human({
                    "step": context["step"],
                    "summary": result.get("summary"),
                    "decision_needed": result.get("decision_point")
                })

                # This blocks until the human responds
                approval = await wait_for_approval()

                if approval.get("action") == "approve":
                    context["human_feedback"] = approval.get("notes", "")
                    continue
                elif approval.get("action") == "reject":
                    return {"success": False, "reason": "Rejected by human", "context": context}
                elif approval.get("action") == "modify":
                    context["modifications"] = approval.get("changes", {})
                    continue
            else:
                context["result"] = result
                return {"success": True, "context": context}
```

**When to use:** Any workflow where mistakes have significant cost. Content publishing (review before publish), financial operations (approve before executing payments), code deployment (approve before merging), email campaigns (review before sending to 10K subscribers).

**How I implement notifications:** For production systems, I use Telegram bot notifications with inline buttons (Approve / Reject / Modify). For internal tools, a simple Slack message with threaded replies works. The key is making the human response asynchronous — the agent shouldn't block waiting for a response; it should save state and resume when the human responds.

## Building with LangGraph vs building custom

I've used both approaches extensively. Here's my framework for deciding:

**Use LangGraph when:**
- Your workflow has complex state transitions (many possible states, conditional edges)
- You need built-in persistence (checkpointing, save/restore)
- Your team has existing LangChain experience
- The workflow has 5+ distinct stages

**Build custom when:**
- You need per-step cost tracking (LangGraph doesn't have built-in budget management)
- Your workflow integrates with existing systems (queues, databases, monitoring)
- Error recovery requirements are specific (not just "retry 3 times")
- You want to control which model each step uses

Here's a custom state machine that I use for most production workflows:

```python
import json
from enum import Enum
from dataclasses import dataclass, field
from typing import Any, Callable

class WorkflowStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    AWAITING_HUMAN = "awaiting_human"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class WorkflowState:
    workflow_id: str
    status: WorkflowStatus
    current_step: str = ""
    step_history: list = field(default_factory=list)
    data: dict = field(default_factory=dict)
    errors: list = field(default_factory=list)
    total_cost: float = 0.0
    total_steps: int = 0

class WorkflowEngine:
    def __init__(self, persistence=None):
        self.persistence = persistence  # Optional DB/s3 persistence

    async def run(self, workflow_id: str, steps: dict, initial_data: dict):
        """steps: {"step_name": {"handler": callable, "next": str or callable}}"""
        state = WorkflowState(
            workflow_id=workflow_id,
            status=WorkflowStatus.RUNNING,
            data=initial_data
        )

        current = "start"

        while current and state.status == WorkflowStatus.RUNNING:
            step_def = steps.get(current)
            if not step_def:
                state.status = WorkflowStatus.FAILED
                state.errors.append(f"Unknown step: {current}")
                break

            state.current_step = current
            await self._persist(state)

            try:
                result = await step_def["handler"](state.data)

                state.step_history.append({
                    "step": current,
                    "result": result.get("summary", "completed"),
                    "cost": result.get("cost", 0),
                    "timestamp": "2026-06-01T00:00:00Z"
                })
                state.total_cost += result.get("cost", 0)
                state.total_steps += 1

                # Check for human-in-the-loop checkpoint
                if result.get("awaiting_human"):
                    state.status = WorkflowStatus.AWAITING_HUMAN
                    await self._persist(state)
                    # Workflow paused — will resume when human responds
                    return {"status": "awaiting_human", "state": state}

                # Determine next step
                next_step = step_def.get("next")
                if callable(next_step):
                    current = next_step(result)
                else:
                    current = next_step

            except Exception as e:
                state.errors.append({"step": current, "error": str(e)})

                # Check for retry logic
                retry = step_def.get("retry", 0)
                if len([e for e in state.errors if e.get("step") == current]) <= retry:
                    continue  # Retry the same step

                if step_def.get("fallback"):
                    current = step_def["fallback"]
                else:
                    state.status = WorkflowStatus.FAILED
                    break

        if state.status == WorkflowStatus.RUNNING:
            state.status = WorkflowStatus.COMPLETED

        await self._persist(state)
        return {"status": state.status.value, "state": state}

    async def _persist(self, state: WorkflowState):
        if self.persistence:
            await self.persistence.save(state)
```

## Error handling across steps

The hardest problem in multi-step workflows: what happens when step 2 fails after step 1 succeeded?

You have three options:

**1. Rollback.** Undo the effects of earlier steps. This works when steps have clear compensation actions (e.g., if email step fails, delete the draft). It's hard when steps have side effects that can't be undone.

**2. Compensate.** Execute a compensating action instead of rolling back. If an API call to create a resource failed, archive the created resource instead of deleting it.

**3. Flag for manual review.** The safest option. Save the state, mark the workflow as needing human attention, and let a human decide what to do.

I use a combination: automatic retry for transient errors (3 retries with exponential backoff), compensation for known failure modes, and manual review for everything else.

```python
async def execute_with_recovery(step_name: str, handler, context: dict, retries=3):
    last_error = None

    for attempt in range(retries):
        try:
            return await handler(context)
        except TemporaryError as e:
            wait = 2 ** attempt  # Exponential backoff: 1s, 2s, 4s
            print(f"  → Step {step_name} temporary failure (attempt {attempt+1}/{retries}), retrying in {wait}s")
            await asyncio.sleep(wait)
            last_error = e
        except PermanentError as e:
            print(f"  → Step {step_name} permanent failure: {e}")
            raise

    # All retries exhausted — try compensation
    print(f"  → Step {step_name} failed after {retries} attempts, executing fallback")
    return {
        "success": False,
        "error": str(last_error),
        "fallback": "Flagged for manual review",
        "context": context
    }
```

## State management

State is the backbone of any multi-step workflow. Every step reads from it and writes to it. Getting state management right is the difference between a workflow you can debug and one you can't.

**What to include in state:**

```python
@dataclass
class WorkflowState:
    # Identity
    workflow_id: str
    workflow_type: str

    # Progress
    status: str           # pending, running, awaiting_human, completed, failed
    current_step: str
    completed_steps: list

    # Data
    input: dict           # Original input
    intermediate: dict    # Step outputs, keyed by step name
    final_output: dict    # Final result

    # Costs
    total_cost: float
    step_costs: dict      # Per-step cost breakdown

    # Errors
    errors: list          # Structured error log
    retry_count: int

    # Control
    max_steps: int = 50
    max_cost: float = 10.0
```

**Persistence:** I save state to a database (SQLite for simple workflows, Postgres for production) after every step. This means if the server crashes mid-workflow, we can resume from the last checkpoint.

```python
# Save checkpoint after each step
await db.execute(
    "INSERT INTO workflow_checkpoints (workflow_id, step, state) VALUES (?, ?, ?)
     ON CONFLICT(workflow_id) DO UPDATE SET step = excluded.step, state = excluded.state",
    [state.workflow_id, state.current_step, json.dumps(asdict(state))]
)
```

## Cost implications

Multi-step workflows are expensive. Here's a real example from a content creation agent I built:

| Step | Model | Calls | Cost per call | Total |
|------|-------|-------|---------------|-------|
| Research brief | gpt-4o | 1 | $0.03 | $0.03 |
| Search execution | gpt-4o | 3 (parallel) | $0.02 | $0.06 |
| Outline generation | gpt-4o | 1 | $0.04 | $0.04 |
| Draft section 1 | gpt-4o | 1 | $0.06 | $0.06 |
| Draft section 2 | gpt-4o | 1 | $0.05 | $0.05 |
| Draft section 3 | gpt-4o | 1 | $0.07 | $0.07 |
| Review and polish | gpt-4o | 1 | $0.03 | $0.03 |
| **Total** | | **9** | | **$0.34** |

A single article costs $0.34 in API calls. Generate 100 articles: $34. That's manageable.

But add retries (each retry reruns the step), add branching (some paths are longer than others), add human review loops (resume generates more calls), and the effective cost can be 3-5x the base estimate.

I set cost limits per workflow:

```python
class BudgetAwareWorkflow:
    def __init__(self, max_cost_per_run=2.0):
        self.max_cost = max_cost_per_run
        self.running_cost = 0.0

    async def step(self, handler, context):
        if self.running_cost >= self.max_cost:
            return {"error": "Budget exceeded", "cost": self.running_cost}

        result = await handler(context)
        self.running_cost += result.get("cost", 0)
        return result
```

---

*Related: [AI agent logging and monitoring: seeing inside your agent's head](/posts/ai-agent-logging-monitoring/) — how to log, trace, and monitor multi-step workflows in production.*

*Related: [CrewAI vs LangGraph: which AI agent framework should you use?](/posts/crewai-vs-langgraph-comparison/) — how the choice between CrewAI and LangGraph affects multi-step workflow design.*

## Putting it all together

Here's a real multi-step agent I built for content generation. It uses sequential, parallel, and conditional patterns together:

```python
# Content Generation Agent Workflow
# Pattern: Sequential + Parallel + Conditional + HITL

async def content_workflow(topic: str, publish: bool = False):
    workflow = WorkflowEngine(persistence=Database())

    steps = {
        "research": {
            "handler": research_topic,
            "next": "outline"
        },
        "outline": {
            "handler": generate_outline,
            "next": "write_sections"
        },
        "write_sections": {
            # Parallel fan-out: write each section independently
            "handler": parallel_write_sections,
            "next": "review"
        },
        "review": {
            # Conditional: if quality check fails, loop back
            "handler": quality_check,
            "next": lambda r: "rewrite" if r.get("quality") < 0.8 else "human_review"
        },
        "rewrite": {
            "handler": rewrite_section,
            "next": "review",  # Loop back for re-check
            "retry": 2         # Max 3 attempts (initial + 2 retries)
        },
        "human_review": {
            # HITL checkpoint
            "handler": request_human_approval,
            "next": lambda r: "publish" if r.get("approved") else "rejected"
        },
        "publish": {
            "handler": publish_article if publish else save_draft,
            "next": None  # End
        },
        "rejected": {
            "handler": notify_rejection,
            "next": None
        }
    }

    return await workflow.run(f"content-{slugify(topic)}", steps, {"topic": topic})
```

The key insight: all four patterns compose naturally. A sequential workflow can have parallel steps within it. A parallel fan-out can have conditional branches per worker. A HITL checkpoint can appear at any point.

Master the patterns individually, then compose them freely. That's how you build production agent workflows that handle real complexity.
