> ## Documentation Index
> Fetch the complete documentation index at: https://docs.sirenspec.dev/llms.txt
> Use this file to discover all available pages before exploring further.

# Python SDK

> Use SirenSpec programmatically from Python code.

## Installation

```bash theme={null}
uv add sirenspec
# or
pip install sirenspec
```

Python 3.11 or later is required.

## Public API

Everything you need is exported at the top level:

```python theme={null}
from sirenspec import (
    load_workflow,
    execute,
    execute_streaming,
    Workflow,
    WorkflowRegistry,
    BudgetConfig,
    HumanNode,
    NodeCompleteEvent,
    SummaryEvent,
    TokenUsage,
    LLMProvider,
    Guardrail,
    WorkflowGuardrail,
    GuardrailSpec,
    SirenSpecError,
    ProviderError,
    GuardrailError,
    GuardrailViolation,
    BudgetExceededError,
    HumanInputError,
    ValidationError,
    RetryExhaustedError,
    SwrmAgentError,
    ToolError,
    WorkflowLintError,
)
```

`PIIDetectedError`, `InterpolationError`, and `FactoryNodeError` are not re-exported at the top level — import them from `sirenspec.exceptions` when needed.

| Name                | Type                     | Description                                                                                                            |
| ------------------- | ------------------------ | ---------------------------------------------------------------------------------------------------------------------- |
| `load_workflow`     | function                 | Load and validate a YAML workflow file.                                                                                |
| `execute`           | async function           | Execute a workflow and return the full trace dict.                                                                     |
| `execute_streaming` | async generator function | Execute a workflow and yield typed events as each node completes.                                                      |
| `Workflow`          | class                    | Pydantic model representing a workflow.                                                                                |
| `WorkflowRegistry`  | class                    | Registry mapping names to `Workflow` instances for nested workflow resolution.                                         |
| `BudgetConfig`      | class                    | Pydantic model for the workflow-level `budget:` block (`max_tokens`, `max_cost_usd`, `max_duration_s`, `on_exceeded`). |
| `HumanNode`         | class                    | Pydantic model for `type: human` nodes.                                                                                |
| `NodeCompleteEvent` | dataclass                | Event emitted once per node during streaming execution.                                                                |
| `SummaryEvent`      | dataclass                | Event emitted once at the end of streaming execution.                                                                  |
| `TokenUsage`        | dataclass                | Prompt and completion token counts with a `.total` property.                                                           |
| `LLMProvider`       | Protocol                 | Implement this to add a custom LLM provider.                                                                           |
| `Guardrail`         | ABC                      | Subclass this to write a custom per-node guardrail.                                                                    |
| `WorkflowGuardrail` | Protocol                 | Implement this to write a custom workflow-level guardrail.                                                             |
| `GuardrailSpec`     | class                    | Pydantic model for inline guardrail configuration (`name` + `config`).                                                 |

***

## `load_workflow`

Load and validate a workflow file, returning a `Workflow` instance.

```python theme={null}
from sirenspec import load_workflow

workflow = load_workflow("workflow.yaml")
```

**Signature:**

```python theme={null}
def load_workflow(filepath: str | Path) -> Workflow: ...
```

**Raises:**

| Exception           | When                                                                                          |
| ------------------- | --------------------------------------------------------------------------------------------- |
| `FileNotFoundError` | The workflow file — or a declared `env_file` — does not exist.                                |
| `ValueError`        | YAML is malformed or fails schema validation.                                                 |
| `WorkflowLintError` | The load-time linter found a blocking issue, such as a `{{ working.<node_id>.* }}` reference. |

***

## `execute`

Execute a workflow asynchronously and return a structured trace dict.

```python theme={null}
import asyncio
from sirenspec import load_workflow, execute

workflow = load_workflow("workflow.yaml")
trace = asyncio.run(execute(workflow, user_input="What is the capital of France?"))

print(trace["output"])
# {"reply": "Paris is the capital of France."}

print(trace["summary"]["status"])
# "success"
```

**Signature:**

```python theme={null}
async def execute(
    workflow: Workflow,
    user_input: str,
    registry: WorkflowRegistry | None = None,
    human_input_fn: InputCoroutine | None = None,
) -> dict[str, Any]: ...
```

**Parameters:**

| Parameter        | Type                       | Description                                                                                                                                                                          |
| ---------------- | -------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `workflow`       | `Workflow`                 | Validated workflow instance from `load_workflow`.                                                                                                                                    |
| `user_input`     | `str`                      | The initial user message passed to the first (root) node.                                                                                                                            |
| `registry`       | `WorkflowRegistry \| None` | Registry of named sub-workflows for resolving `workflow` nodes by name. File-path refs (starting with `.` or `/`) are resolved from disk without a registry.                         |
| `human_input_fn` | `InputCoroutine \| None`   | Async callable supplying operator responses for `type: human` nodes. Defaults to reading a single line from stdin. Inject a mock in tests or wire in a webhook bridge in production. |

**Returns:**

A dict with the same structure as the `sirenspec run` JSON output. See the [CLI Reference](/cli-reference#json-trace-format) for field descriptions.

***

## `execute_streaming`

Execute a workflow and yield typed events as each node completes. Ideal for building interactive CLI tools, web dashboards, or any integration that needs real-time progress.

```python theme={null}
import asyncio
from sirenspec import load_workflow, execute_streaming, NodeCompleteEvent, SummaryEvent

async def main() -> None:
    workflow = load_workflow("workflow.yaml")

    async for event in execute_streaming(workflow, user_input="What is the capital of France?"):
        if isinstance(event, NodeCompleteEvent):
            print(f"Node '{event.node_id}' ({event.node_type}): {event.status}")
            if event.output:
                print(f"  Output: {event.output}")
        elif isinstance(event, SummaryEvent):
            print(f"Workflow complete: {event.status}")
            print(f"  Nodes executed: {event.total_nodes}")
            print(f"  Total tokens: {event.total_tokens}")
            print(f"  Duration: {event.duration_ms}ms")

asyncio.run(main())
```

**Signature:**

```python theme={null}
async def execute_streaming(
    workflow: Workflow,
    user_input: str,
    registry: WorkflowRegistry | None = None,
    human_input_fn: InputCoroutine | None = None,
) -> AsyncGenerator[NodeCompleteEvent | SummaryEvent, None]: ...
```

**Parameters:**

| Parameter        | Type                       | Description                                                                                       |
| ---------------- | -------------------------- | ------------------------------------------------------------------------------------------------- |
| `workflow`       | `Workflow`                 | Validated workflow instance from `load_workflow`.                                                 |
| `user_input`     | `str`                      | The initial user message passed to the first (root) node.                                         |
| `registry`       | `WorkflowRegistry \| None` | Registry of named sub-workflows. Same semantics as `execute`.                                     |
| `human_input_fn` | `InputCoroutine \| None`   | Async callable supplying operator responses for `type: human` nodes. Same semantics as `execute`. |

**Yields:**

The generator yields two types of events:

1. **`NodeCompleteEvent`** — Emitted once per node (active or skipped):
   * `node_id: str` — The identifier of the completed node.
   * `node_type: str` — One of `"agent"`, `"tool"`, `"swrm"`, `"factory"`, or `"workflow"`.
   * `status: Literal["success", "skipped", "failed"]` — The node's execution result.
   * `output: Any` — The node's output value (string, dict, list, or None).
   * `writes: str` — The context path written by this node (only for successful agent nodes).
   * `error: str | None` — Error message when `status="failed"`.
   * `tokens: int` — Total tokens consumed by this node (0 for tool and skipped nodes).
   * `agents: list[dict[str, Any]] | None` — Per-agent execution traces (swrm nodes only). Each dict contains `id`, `prompt_sent`, `response_received`, `tokens`, `duration_ms`, and `error`. `None` for all non-swrm node types.
   * `duration_ms: float | None` — Wall-clock execution time in milliseconds for swrm nodes. `None` for all other node types.

2. **`SummaryEvent`** — Emitted once at the end of execution:
   * `total_nodes: int` — Number of nodes that ran (active nodes only, excluding skipped).
   * `total_tokens: int` — Aggregate token count across all active nodes.
   * `status: Literal["success", "failed"]` — Overall workflow status.
   * `duration_ms: float` — Wall-clock execution time in milliseconds.

**Example: Building a Progress Bar**

```python theme={null}
import asyncio
from sirenspec import load_workflow, execute_streaming, NodeCompleteEvent, SummaryEvent

async def run_with_progress(workflow_file: str, user_input: str) -> None:
    workflow = load_workflow(workflow_file)
    node_count = len(workflow.nodes)
    completed = 0

    async for event in execute_streaming(workflow, user_input):
        if isinstance(event, NodeCompleteEvent):
            completed += 1
            status_icon = "✓" if event.status == "success" else "✗"
            print(f"[{completed}/{node_count}] {status_icon} {event.node_id}")
        elif isinstance(event, SummaryEvent):
            print(f"\nWorkflow {event.status}")
            print(f"Total tokens: {event.total_tokens}, Duration: {event.duration_ms}ms")

asyncio.run(run_with_progress("workflow.yaml", "Hello"))
```

***

## `WorkflowRegistry`

A registry that maps string names to `Workflow` instances. Pass a populated registry to `execute` or `execute_streaming` when your workflow uses `workflow` nodes with named `ref` values.

File-path refs (starting with `.` or `/`) are resolved directly from disk — no registry needed for those.

```python theme={null}
import asyncio
from sirenspec import load_workflow, execute, WorkflowRegistry

summarizer = load_workflow("summarizer/workflow.yaml")
main = load_workflow("main/workflow.yaml")

registry = WorkflowRegistry()
registry.register("summarizer", summarizer)

trace = asyncio.run(execute(main, user_input="Summarize this.", registry=registry))
```

**Methods:**

| Method                     | Description                                                                  |
| -------------------------- | ---------------------------------------------------------------------------- |
| `register(name, workflow)` | Register a `Workflow` under `name`.                                          |
| `get(name)`                | Return the workflow registered under `name`. Raises `KeyError` if not found. |

***

## Error Handling

All SirenSpec exceptions inherit from `SirenSpecError`:

```
SirenSpecError
├── ProviderError             # LLM API call failed
│   └── RetryExhaustedError   # All retry attempts exhausted
├── GuardrailError            # Guardrail layer raised an error
│   ├── GuardrailViolation    # A guardrail policy was violated (content blocked)
│   ├── BudgetExceededError   # Workflow-level budget ceiling was exceeded
│   └── PIIDetectedError      # PII guardrail with action="block" found PII
├── HumanInputError           # Human node aborted (e.g. timeout with on_timeout="abort")
├── ValidationError           # Workflow schema is invalid
├── WorkflowLintError         # Load-time linter found a blocking issue (e.g. working.<node_id>)
├── InterpolationError        # A {{ ... }} template could not be resolved at runtime
├── FactoryNodeError          # Factory node could not spawn or resolve its instances
├── SwrmAgentError            # One or more swrm agents failed
└── ToolError                 # Tool node execution failed
```

```python theme={null}
import asyncio
from sirenspec import (
    load_workflow,
    execute,
    SirenSpecError,
    BudgetExceededError,
    HumanInputError,
    GuardrailViolation,
    RetryExhaustedError,
)

async def main() -> None:
    try:
        workflow = load_workflow("workflow.yaml")
    except FileNotFoundError as exc:
        print(f"File not found: {exc}")
        return
    except ValueError as exc:
        print(f"Invalid workflow: {exc}")
        return

    try:
        trace = await execute(workflow, user_input="Hello")
    except BudgetExceededError as exc:
        print(f"Workflow exceeded budget: {exc}")
    except HumanInputError as exc:
        print(f"Human node aborted: {exc}")
    except GuardrailViolation as exc:
        print(f"Guardrail blocked execution: {exc}")
    except RetryExhaustedError as exc:
        print(f"Provider failed after all retries: {exc}")
    except SirenSpecError as exc:
        print(f"Workflow error: {exc}")

asyncio.run(main())
```

***

## Extending SirenSpec

### Custom LLM Provider

Implement the `LLMProvider` Protocol to add support for a new backend:

```python theme={null}
from sirenspec import LLMProvider, TokenUsage

class MyProvider:
    """Example custom provider backed by a hypothetical API."""

    def __init__(self) -> None:
        self._last_usage = TokenUsage(prompt_tokens=0, completion_tokens=0)

    async def complete(self, messages: list[dict]) -> str:
        # Call your API here
        response_text = "..."
        self._last_usage = TokenUsage(prompt_tokens=10, completion_tokens=20)
        return response_text

    @property
    def last_token_usage(self) -> TokenUsage:
        return self._last_usage

    @property
    def client(self) -> object:
        return self
```

### Custom Guardrail

Subclass `Guardrail` to add a new per-node policy:

```python theme={null}
from sirenspec import Guardrail, GuardrailViolation

class ProfanityGuardrail(Guardrail):
    BLOCKED = {"badword1", "badword2"}

    def check_input(self, text: str) -> str:
        for word in self.BLOCKED:
            if word in text.lower():
                raise GuardrailViolation(f"Blocked word in input: {word!r}")
        return text

    def check_output(self, text: str) -> str:
        for word in self.BLOCKED:
            if word in text.lower():
                raise GuardrailViolation(f"Blocked word in output: {word!r}")
        return text
```

***

## Token Usage

`TokenUsage` is a frozen dataclass for tracking prompt and completion tokens:

```python theme={null}
from sirenspec import TokenUsage

usage1 = TokenUsage(prompt_tokens=100, completion_tokens=50)
usage2 = TokenUsage(prompt_tokens=80, completion_tokens=40)
combined = usage1 + usage2
print(combined.total)  # 270
```

**Fields:**

| Field               | Type  | Description           |
| ------------------- | ----- | --------------------- |
| `prompt_tokens`     | `int` | Tokens in the input.  |
| `completion_tokens` | `int` | Tokens in the output. |

**Properties / Methods:**

| Name             | Returns      | Description                                 |
| ---------------- | ------------ | ------------------------------------------- |
| `.total`         | `int`        | Sum of prompt and completion tokens.        |
| `__add__(other)` | `TokenUsage` | Combine two usages; returns a new instance. |

***

## Streaming in Web Applications

The streaming API is ideal for interactive dashboards, chat interfaces, or real-time monitoring:

```python theme={null}
import asyncio
from typing import AsyncGenerator
from sirenspec import load_workflow, execute_streaming, NodeCompleteEvent, SummaryEvent


async def stream_to_client(
    workflow_file: str, user_input: str
) -> AsyncGenerator[dict, None]:
    """Yield JSON-serializable event dicts for streaming to a client."""
    workflow = load_workflow(workflow_file)

    async for event in execute_streaming(workflow, user_input):
        if isinstance(event, NodeCompleteEvent):
            yield {
                "type": "node_complete",
                "node_id": event.node_id,
                "node_type": event.node_type,
                "status": event.status,
                "output": event.output,
                "tokens": event.tokens,
            }
        elif isinstance(event, SummaryEvent):
            yield {
                "type": "summary",
                "total_nodes": event.total_nodes,
                "total_tokens": event.total_tokens,
                "status": event.status,
                "duration_ms": event.duration_ms,
            }


# FastAPI integration example
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()


@app.post("/workflow/stream")
async def stream_workflow(workflow_file: str, user_input: str) -> StreamingResponse:
    """Stream workflow execution events as Server-Sent Events."""

    async def event_generator():
        async for event_dict in stream_to_client(workflow_file, user_input):
            yield f"data: {event_dict}\n\n"

    return StreamingResponse(event_generator(), media_type="text/event-stream")
```

On the client side (JavaScript):

```javascript theme={null}
const eventSource = new EventSource(
  `/workflow/stream?workflow_file=workflow.yaml&user_input=Hello`
);

eventSource.addEventListener("message", (event) => {
  const data = JSON.parse(event.data);

  if (data.type === "node_complete") {
    console.log(`${data.node_id} (${data.status}): ${data.tokens} tokens`);
  } else if (data.type === "summary") {
    console.log(`Workflow ${data.status} in ${data.duration_ms}ms`);
  }
});
```
