Skip to main content

Installation

uv add sirenspec
# or
pip install sirenspec
Python 3.11 or later is required.

Public API

Everything you need is exported at the top level:
from sirenspec import (
    load_workflow,
    execute,
    execute_streaming,
    Workflow,
    WorkflowRegistry,
    BudgetConfig,
    HumanNode,
    NodeCompleteEvent,
    SummaryEvent,
    TokenUsage,
    LLMProvider,
    Guardrail,
    WorkflowGuardrail,
    GuardrailSpec,
    SirenSpecError,
    ProviderError,
    GuardrailError,
    GuardrailViolation,
    BudgetExceededError,
    HumanInputError,
    ValidationError,
    RetryExhaustedError,
    SwrmAgentError,
    ToolError,
    WorkflowLintError,
)
PIIDetectedError, InterpolationError, and FactoryNodeError are not re-exported at the top level — import them from sirenspec.exceptions when needed.
NameTypeDescription
load_workflowfunctionLoad and validate a YAML workflow file.
executeasync functionExecute a workflow and return the full trace dict.
execute_streamingasync generator functionExecute a workflow and yield typed events as each node completes.
WorkflowclassPydantic model representing a workflow.
WorkflowRegistryclassRegistry mapping names to Workflow instances for nested workflow resolution.
BudgetConfigclassPydantic model for the workflow-level budget: block (max_tokens, max_cost_usd, max_duration_s, on_exceeded).
HumanNodeclassPydantic model for type: human nodes.
NodeCompleteEventdataclassEvent emitted once per node during streaming execution.
SummaryEventdataclassEvent emitted once at the end of streaming execution.
TokenUsagedataclassPrompt and completion token counts with a .total property.
LLMProviderProtocolImplement this to add a custom LLM provider.
GuardrailABCSubclass this to write a custom per-node guardrail.
WorkflowGuardrailProtocolImplement this to write a custom workflow-level guardrail.
GuardrailSpecclassPydantic model for inline guardrail configuration (name + config).

load_workflow

Load and validate a workflow file, returning a Workflow instance.
from sirenspec import load_workflow

workflow = load_workflow("workflow.yaml")
Signature:
def load_workflow(filepath: str | Path) -> Workflow: ...
Raises:
ExceptionWhen
FileNotFoundErrorThe workflow file — or a declared env_file — does not exist.
ValueErrorYAML is malformed or fails schema validation.
WorkflowLintErrorThe load-time linter found a blocking issue, such as a {{ working.<node_id>.* }} reference.

execute

Execute a workflow asynchronously and return a structured trace dict.
import asyncio
from sirenspec import load_workflow, execute

workflow = load_workflow("workflow.yaml")
trace = asyncio.run(execute(workflow, user_input="What is the capital of France?"))

print(trace["output"])
# {"reply": "Paris is the capital of France."}

print(trace["summary"]["status"])
# "success"
Signature:
async def execute(
    workflow: Workflow,
    user_input: str,
    registry: WorkflowRegistry | None = None,
    human_input_fn: InputCoroutine | None = None,
) -> dict[str, Any]: ...
Parameters:
ParameterTypeDescription
workflowWorkflowValidated workflow instance from load_workflow.
user_inputstrThe initial user message passed to the first (root) node.
registryWorkflowRegistry | NoneRegistry of named sub-workflows for resolving workflow nodes by name. File-path refs (starting with . or /) are resolved from disk without a registry.
human_input_fnInputCoroutine | NoneAsync callable supplying operator responses for type: human nodes. Defaults to reading a single line from stdin. Inject a mock in tests or wire in a webhook bridge in production.
Returns: A dict with the same structure as the sirenspec run JSON output. See the CLI Reference for field descriptions.

execute_streaming

Execute a workflow and yield typed events as each node completes. Ideal for building interactive CLI tools, web dashboards, or any integration that needs real-time progress.
import asyncio
from sirenspec import load_workflow, execute_streaming, NodeCompleteEvent, SummaryEvent

async def main() -> None:
    workflow = load_workflow("workflow.yaml")

    async for event in execute_streaming(workflow, user_input="What is the capital of France?"):
        if isinstance(event, NodeCompleteEvent):
            print(f"Node '{event.node_id}' ({event.node_type}): {event.status}")
            if event.output:
                print(f"  Output: {event.output}")
        elif isinstance(event, SummaryEvent):
            print(f"Workflow complete: {event.status}")
            print(f"  Nodes executed: {event.total_nodes}")
            print(f"  Total tokens: {event.total_tokens}")
            print(f"  Duration: {event.duration_ms}ms")

asyncio.run(main())
Signature:
async def execute_streaming(
    workflow: Workflow,
    user_input: str,
    registry: WorkflowRegistry | None = None,
    human_input_fn: InputCoroutine | None = None,
) -> AsyncGenerator[NodeCompleteEvent | SummaryEvent, None]: ...
Parameters:
ParameterTypeDescription
workflowWorkflowValidated workflow instance from load_workflow.
user_inputstrThe initial user message passed to the first (root) node.
registryWorkflowRegistry | NoneRegistry of named sub-workflows. Same semantics as execute.
human_input_fnInputCoroutine | NoneAsync callable supplying operator responses for type: human nodes. Same semantics as execute.
Yields: The generator yields two types of events:
  1. NodeCompleteEvent — Emitted once per node (active or skipped):
    • node_id: str — The identifier of the completed node.
    • node_type: str — One of "agent", "tool", "swrm", "factory", or "workflow".
    • status: Literal["success", "skipped", "failed"] — The node’s execution result.
    • output: Any — The node’s output value (string, dict, list, or None).
    • writes: str — The context path written by this node (only for successful agent nodes).
    • error: str | None — Error message when status="failed".
    • tokens: int — Total tokens consumed by this node (0 for tool and skipped nodes).
    • agents: list[dict[str, Any]] | None — Per-agent execution traces (swrm nodes only). Each dict contains id, prompt_sent, response_received, tokens, duration_ms, and error. None for all non-swrm node types.
    • duration_ms: float | None — Wall-clock execution time in milliseconds for swrm nodes. None for all other node types.
  2. SummaryEvent — Emitted once at the end of execution:
    • total_nodes: int — Number of nodes that ran (active nodes only, excluding skipped).
    • total_tokens: int — Aggregate token count across all active nodes.
    • status: Literal["success", "failed"] — Overall workflow status.
    • duration_ms: float — Wall-clock execution time in milliseconds.
Example: Building a Progress Bar
import asyncio
from sirenspec import load_workflow, execute_streaming, NodeCompleteEvent, SummaryEvent

async def run_with_progress(workflow_file: str, user_input: str) -> None:
    workflow = load_workflow(workflow_file)
    node_count = len(workflow.nodes)
    completed = 0

    async for event in execute_streaming(workflow, user_input):
        if isinstance(event, NodeCompleteEvent):
            completed += 1
            status_icon = "✓" if event.status == "success" else "✗"
            print(f"[{completed}/{node_count}] {status_icon} {event.node_id}")
        elif isinstance(event, SummaryEvent):
            print(f"\nWorkflow {event.status}")
            print(f"Total tokens: {event.total_tokens}, Duration: {event.duration_ms}ms")

asyncio.run(run_with_progress("workflow.yaml", "Hello"))

WorkflowRegistry

A registry that maps string names to Workflow instances. Pass a populated registry to execute or execute_streaming when your workflow uses workflow nodes with named ref values. File-path refs (starting with . or /) are resolved directly from disk — no registry needed for those.
import asyncio
from sirenspec import load_workflow, execute, WorkflowRegistry

summarizer = load_workflow("summarizer/workflow.yaml")
main = load_workflow("main/workflow.yaml")

registry = WorkflowRegistry()
registry.register("summarizer", summarizer)

trace = asyncio.run(execute(main, user_input="Summarize this.", registry=registry))
Methods:
MethodDescription
register(name, workflow)Register a Workflow under name.
get(name)Return the workflow registered under name. Raises KeyError if not found.

Error Handling

All SirenSpec exceptions inherit from SirenSpecError:
SirenSpecError
├── ProviderError             # LLM API call failed
│   └── RetryExhaustedError   # All retry attempts exhausted
├── GuardrailError            # Guardrail layer raised an error
│   ├── GuardrailViolation    # A guardrail policy was violated (content blocked)
│   ├── BudgetExceededError   # Workflow-level budget ceiling was exceeded
│   └── PIIDetectedError      # PII guardrail with action="block" found PII
├── HumanInputError           # Human node aborted (e.g. timeout with on_timeout="abort")
├── ValidationError           # Workflow schema is invalid
├── WorkflowLintError         # Load-time linter found a blocking issue (e.g. working.<node_id>)
├── InterpolationError        # A {{ ... }} template could not be resolved at runtime
├── FactoryNodeError          # Factory node could not spawn or resolve its instances
├── SwrmAgentError            # One or more swrm agents failed
└── ToolError                 # Tool node execution failed
import asyncio
from sirenspec import (
    load_workflow,
    execute,
    SirenSpecError,
    BudgetExceededError,
    HumanInputError,
    GuardrailViolation,
    RetryExhaustedError,
)

async def main() -> None:
    try:
        workflow = load_workflow("workflow.yaml")
    except FileNotFoundError as exc:
        print(f"File not found: {exc}")
        return
    except ValueError as exc:
        print(f"Invalid workflow: {exc}")
        return

    try:
        trace = await execute(workflow, user_input="Hello")
    except BudgetExceededError as exc:
        print(f"Workflow exceeded budget: {exc}")
    except HumanInputError as exc:
        print(f"Human node aborted: {exc}")
    except GuardrailViolation as exc:
        print(f"Guardrail blocked execution: {exc}")
    except RetryExhaustedError as exc:
        print(f"Provider failed after all retries: {exc}")
    except SirenSpecError as exc:
        print(f"Workflow error: {exc}")

asyncio.run(main())

Extending SirenSpec

Custom LLM Provider

Implement the LLMProvider Protocol to add support for a new backend:
from sirenspec import LLMProvider, TokenUsage

class MyProvider:
    """Example custom provider backed by a hypothetical API."""

    def __init__(self) -> None:
        self._last_usage = TokenUsage(prompt_tokens=0, completion_tokens=0)

    async def complete(self, messages: list[dict]) -> str:
        # Call your API here
        response_text = "..."
        self._last_usage = TokenUsage(prompt_tokens=10, completion_tokens=20)
        return response_text

    @property
    def last_token_usage(self) -> TokenUsage:
        return self._last_usage

    @property
    def client(self) -> object:
        return self

Custom Guardrail

Subclass Guardrail to add a new per-node policy:
from sirenspec import Guardrail, GuardrailViolation

class ProfanityGuardrail(Guardrail):
    BLOCKED = {"badword1", "badword2"}

    def check_input(self, text: str) -> str:
        for word in self.BLOCKED:
            if word in text.lower():
                raise GuardrailViolation(f"Blocked word in input: {word!r}")
        return text

    def check_output(self, text: str) -> str:
        for word in self.BLOCKED:
            if word in text.lower():
                raise GuardrailViolation(f"Blocked word in output: {word!r}")
        return text

Token Usage

TokenUsage is a frozen dataclass for tracking prompt and completion tokens:
from sirenspec import TokenUsage

usage1 = TokenUsage(prompt_tokens=100, completion_tokens=50)
usage2 = TokenUsage(prompt_tokens=80, completion_tokens=40)
combined = usage1 + usage2
print(combined.total)  # 270
Fields:
FieldTypeDescription
prompt_tokensintTokens in the input.
completion_tokensintTokens in the output.
Properties / Methods:
NameReturnsDescription
.totalintSum of prompt and completion tokens.
__add__(other)TokenUsageCombine two usages; returns a new instance.

Streaming in Web Applications

The streaming API is ideal for interactive dashboards, chat interfaces, or real-time monitoring:
import asyncio
from typing import AsyncGenerator
from sirenspec import load_workflow, execute_streaming, NodeCompleteEvent, SummaryEvent


async def stream_to_client(
    workflow_file: str, user_input: str
) -> AsyncGenerator[dict, None]:
    """Yield JSON-serializable event dicts for streaming to a client."""
    workflow = load_workflow(workflow_file)

    async for event in execute_streaming(workflow, user_input):
        if isinstance(event, NodeCompleteEvent):
            yield {
                "type": "node_complete",
                "node_id": event.node_id,
                "node_type": event.node_type,
                "status": event.status,
                "output": event.output,
                "tokens": event.tokens,
            }
        elif isinstance(event, SummaryEvent):
            yield {
                "type": "summary",
                "total_nodes": event.total_nodes,
                "total_tokens": event.total_tokens,
                "status": event.status,
                "duration_ms": event.duration_ms,
            }


# FastAPI integration example
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()


@app.post("/workflow/stream")
async def stream_workflow(workflow_file: str, user_input: str) -> StreamingResponse:
    """Stream workflow execution events as Server-Sent Events."""

    async def event_generator():
        async for event_dict in stream_to_client(workflow_file, user_input):
            yield f"data: {event_dict}\n\n"

    return StreamingResponse(event_generator(), media_type="text/event-stream")
On the client side (JavaScript):
const eventSource = new EventSource(
  `/workflow/stream?workflow_file=workflow.yaml&user_input=Hello`
);

eventSource.addEventListener("message", (event) => {
  const data = JSON.parse(event.data);

  if (data.type === "node_complete") {
    console.log(`${data.node_id} (${data.status}): ${data.tokens} tokens`);
  } else if (data.type === "summary") {
    console.log(`Workflow ${data.status} in ${data.duration_ms}ms`);
  }
});