--- url: /flowneer/core/anchors-routing.md --- # Anchors & Routing Anchors give you fine-grained control over flow execution order. Any step can jump to a named anchor by returning `"#anchorName"`, enabling loops, retries, and conditional skips without needing the `.loop()` primitive. ## Defining an Anchor ```typescript flow .anchor("retry") .then(processStep) .then(async (s) => { if (s.failed) return "#retry"; // jump back to the anchor }) .then(finalStep); ``` `anchor(name)` inserts a no-op marker. The for-loop in the executor increments the step index past the anchor, so the **next step after the anchor** is the effective entry point. ## How goto Works When a `fn` step returns a string starting with `#`, the executor: 1. Strips the `#` prefix to get the anchor name. 2. Looks up the anchor's step index. 3. Sets the iteration counter to that index — the next iteration lands on the **anchor step**, skips it (since it's a no-op), and continues with the following step. ## Branching with Anchors Anchors work in `.branch()` too: ```typescript flow.branch((s) => s.action, { retry: async (s) => { s.retries++; return "#start"; }, finish: async (s) => { s.done = true; }, }); ``` ## Forward Jumps (Skip Ahead) Anchors can also be placed *after* the jumping step for forward skips: ```typescript flow .then(async (s) => { if (s.skipProcessing) return "#save"; }) .then(expensiveProcessing) .anchor("save") .then(saveResults); ``` ## Guard Against Infinite Loops Use [`withCycles`](../plugins/resilience/cycles.md) to cap the number of anchor jumps: ```typescript const AppFlow = FlowBuilder.extend([withCycles]); const flow = new AppFlow() .withCycles(50) // max 50 total jumps per run .withCycles(5, "retry") // max 5 jumps to the "retry" anchor .anchor("retry") .then(doWork) .then((s) => { if (!s.ok) return "#retry"; }); ``` ## Anchors in Generator Steps Generator steps can also return an anchor: ```typescript .then(async function* (s) { for await (const token of stream(s.prompt)) { s.output += token; yield token; } if (s.output.includes("ERROR")) return "#retry"; }) ``` The generator's **return value** (not yield) is used for routing. --- --- url: /flowneer/presets/pipeline/approval-gate.md --- # approvalGate Insert a conditional or always-on human approval / review gate into a flow. On the first run the gate pauses execution, stores a prompt on shared state, and throws an `InterruptError`. When the human responds the caller resumes the flow with the response injected into shared state, and the gate processes the outcome — setting `approved`, `humanEdit`, or `humanFeedback` accordingly. ## Import ```typescript import { approvalGate } from "flowneer/presets/pipeline"; ``` ## Usage ```typescript import { FlowBuilder, InterruptError } from "flowneer"; import { resumeFlow } from "flowneer/plugins/agent"; import { approvalGate } from "flowneer/presets/pipeline"; interface DraftState { draft: string; approved?: boolean; humanEdit?: string; humanFeedback?: string; __humanPrompt?: string; } const flow = new FlowBuilder() .startWith(async (s) => { s.draft = await generateDraft(s.topic); }) .add( approvalGate({ prompt: (s) => `Please review the following draft:\n\n${s.draft}`, onReject: (s, feedback) => { console.log("Rejected:", feedback); // throw to stop, or set state to trigger a revision anchor }, }), ) .then((s) => { if (!s.approved) return; const content = s.humanEdit ?? s.draft; console.log("Publishing:", content); }); // ─── First run ──────────────────────────────────────────────────── try { await flow.run(state); } catch (e) { if (e instanceof InterruptError) { // Deliver the prompt to a human (email, Slack, UI, etc.) console.log("Awaiting review:", e.savedShared.__humanPrompt); // Later, when the human responds: await resumeFlow(flow, e.savedShared, { __approvalResponse: "approve", // or "edit: revised text", or "reject" }); } } ``` ## Options | Option | Type | Default | Description | | ------------- | ----------------------------------------------- | ------------------------------------------------ | ---------------------------------------------------------------------------------------------------------- | | `prompt` | `string \| (s, p) => string \| Promise` | `Approve this output?\n\n` | Prompt message stored on `shared.__humanPrompt` before interrupting | | `condition` | `(s, p) => boolean \| Promise` | Always interrupt | When provided, the gate only activates when this returns `true` | | `onReject` | `(s, feedback?) => void \| Promise` | `() => { throw new Error("Rejected by human") }` | Called when the response is not `"approve"`, `"yes"`, or `"edit: …"`. Throw to halt, or mutate to redirect | | `responseKey` | `string` | `"__approvalResponse"` | Key on `shared` where the human's response is injected during `resumeFlow` | ## State keys ### Internal (cleaned up after resume) | Key | Description | | -------------------- | ------------------------------------------------------ | | `__humanPrompt` | The resolved prompt string, set before interrupting | | `__approvalResponse` | The human's response, injected by the caller on resume | ### Written by the gate on resume | Key | Description | Condition | | --------------- | ----------------------------------------------------- | ----------------------------------------- | | `approved` | `true` when approved or edited, `false` when rejected | Always set on resume | | `humanEdit` | The revised text (everything after `"edit: "`) | Only when response starts with `"edit: "` | | `humanFeedback` | The raw rejection string | Only on rejection | ## Response format The human's response (value of `shared[responseKey]`) is parsed as follows: | Response | Outcome | | --------------------------------- | ----------------------------------------------------------------- | | `"approve"` or `"yes"` (any case) | `approved = true` | | `"edit: "` | `humanEdit = ""`, `approved = true` | | Anything else | `approved = false`, `humanFeedback = `, `onReject()` called | ## Resume pattern ```typescript // After catching InterruptError: await resumeFlow(flow, e.savedShared, { __approvalResponse: "edit: revised version of the content", }); // → s.approved = true, s.humanEdit = "revised version of the content" ``` `resumeFlow` merges the second argument (your partial edits) into the saved state and re-runs the flow from the beginning. The gate step detects `__approvalResponse` is set, processes it, and falls through to the next step. ## Conditional gate Only activate the gate when confidence is low: ```typescript .add( approvalGate({ condition: (s) => s.confidence < 0.8, prompt: (s) => `Low-confidence output (${s.confidence}). Approve?\n\n${s.output}`, }), ) ``` When `condition` returns `false` the gate step is a no-op — the flow continues as if it were not there. ## Return value `approvalGate()` returns a `FlowBuilder` with a single step. Compose it with plugins: ```typescript const gate = approvalGate({ prompt: reviewPrompt }) .withTiming() .withCheckpoint({ save, load, key: (s) => s.jobId }); ``` Or splice it mid-flow via `.add()`: ```typescript const pipeline = new FlowBuilder() .startWith(generate) .add(approvalGate({ prompt: reviewPrompt })) .then(publish); ``` ## See Also * [`clarifyLoop`](./clarify-loop.md) — looped clarification: generate → ask when unclear → retry * [Human-in-the-loop recipe](../../recipes/human-in-the-loop.md) — full checkpoint + resume walkthrough * [`humanNode` plugin](../../plugins/agent/human-node.md) — lower-level interrupt primitive * [`resumeFlow`](../../plugins/agent/human-node.md#resumeflowflow-saved-edits-fromstep) — resume helper API --- --- url: /flowneer/recipes/batch-document-processing.md --- # Batch Document Processing Process a large collection of documents: extract structured data from each one in parallel batches, collect results with a reducer, then aggregate into a final report. Demonstrates `.batch()`, nested `.parallel()`, `withStructuredOutput`, and safe concurrent writes. **Plugins used:** `withStructuredOutput` (LLM), `.batch()` + `.parallel()` (core) *** ## The code ```typescript import "dotenv/config"; import { FlowBuilder } from "flowneer"; import { withStructuredOutput } from "flowneer/plugins/llm"; import { withRateLimit } from "flowneer/plugins/llm"; import { callLlm } from "./utils/callLlm"; const AppFlow = FlowBuilder.extend([withStructuredOutput, withRateLimit]); // ─── Types ─────────────────────────────────────────────────────────────────── interface Document { id: string; text: string; } interface ExtractedData { documentId: string; summary: string; sentiment: "positive" | "neutral" | "negative"; keyTopics: string[]; wordCount: number; } interface ProcessingState { documents: Document[]; results: ExtractedData[]; report: string; __batchItem?: Document; // injected by .batch() __llmOutput?: string; // used by withStructuredOutput } // ─── Extraction prompt ─────────────────────────────────────────────────────── function extractionPrompt(doc: Document) { return ( `Analyse the following document and return a JSON object with these fields:\n` + `- summary: string (1-2 sentences)\n` + `- sentiment: "positive" | "neutral" | "negative"\n` + `- keyTopics: string[] (up to 5 topics)\n\n` + `Document:\n${doc.text}\n\n` + `Return only valid JSON.` ); } // ─── Inner flow — processes one document ───────────────────────────────────── const extractFlow = new AppFlow() .withRateLimit({ requestsPerMinute: 30 }) .startWith(async (s) => { const doc = s.__batchItem!; s.__llmOutput = await callLlm(extractionPrompt(doc)); }) .withStructuredOutput({ parse: (raw) => JSON.parse(raw), retries: 2, onSuccess: (parsed, s) => { const doc = s.__batchItem!; s.results.push({ documentId: doc.id, wordCount: doc.text.split(/\s+/).length, ...parsed, }); }, }); // ─── Outer flow — batches all documents, then aggregates ───────────────────── const pipeline = new AppFlow() .startWith((s) => { s.results = []; }) // Process all documents via the inner flow, one per batch item .batch( (s) => s.documents, (b) => b.add(extractFlow), ) // Aggregate results into a report .then(async (s) => { const pos = s.results.filter((r) => r.sentiment === "positive").length; const neg = s.results.filter((r) => r.sentiment === "negative").length; const neu = s.results.filter((r) => r.sentiment === "neutral").length; const allTopics = s.results.flatMap((r) => r.keyTopics); const topicFreq = allTopics.reduce>((acc, t) => { acc[t] = (acc[t] ?? 0) + 1; return acc; }, {}); const topTopics = Object.entries(topicFreq) .sort((a, b) => b[1] - a[1]) .slice(0, 10) .map(([t]) => t); s.report = [ `# Document Processing Report`, ``, `**Total documents:** ${s.results.length}`, `**Sentiment:** ${pos} positive / ${neu} neutral / ${neg} negative`, `**Top topics:** ${topTopics.join(", ")}`, ``, `## Summaries`, ...s.results.map((r) => `- **${r.documentId}** — ${r.summary}`), ].join("\n"); }); // ─── Run ───────────────────────────────────────────────────────────────────── const documents: Document[] = [ { id: "doc-1", text: "TypeScript 5.0 was released with exciting new features...", }, { id: "doc-2", text: "The new machine learning framework struggled with performance...", }, { id: "doc-3", text: "Open source contributors celebrated a major milestone today...", }, // Add as many as you need — .batch() processes them sequentially, // or switch to .parallel() for concurrent processing (see variation below) ]; const state: ProcessingState = { documents, results: [], report: "", }; await pipeline.run(state); console.log(state.report); ``` *** ## Variation — parallel processing Replace `.batch()` with `.parallel()` to process all documents concurrently. Use a reducer to safely merge results back: ```typescript .parallel( documents.map((doc) => async (s: ProcessingState) => { s.__batchItem = doc; await extractFlow.run(s); }), undefined, // Reducer: merge results arrays from each isolated draft (shared, drafts) => { shared.results = drafts.flatMap((d) => d.results); }, ) ``` ::: warning Rate limits Running many documents in parallel increases your LLM request rate significantly. Make sure `withRateLimit` is configured before enabling this. ::: ## Variation — chunked parallel batches Process in chunks of N concurrently using nested `.batch()` + `.parallel()`: ```typescript // chunk the documents array first const chunkSize = 5; const chunks: Document[][] = []; for (let i = 0; i < documents.length; i += chunkSize) { chunks.push(documents.slice(i, i + chunkSize)); } // outer batch over chunks, inner parallel over each chunk const pipeline = new AppFlow() .startWith((s) => { s.results = []; }) .batch( () => chunks, (b) => b .startWith((s) => { // parallel within the chunk }) .parallel( (s.__batchItem as Document[]).map((doc) => async (s) => { /* extract doc */ }), undefined, (shared, drafts) => { shared.results.push(...drafts.flatMap((d) => d.results)); }, ), ); ``` *** ## See also * [withStructuredOutput](../plugins/llm/structured-output.md) * [withRateLimit](../plugins/llm/rate-limit.md) * [Step Types — batch & parallel](../core/step-types.md) --- --- url: /flowneer/recipes/blog-post-generator.md --- # Blog Post Generator A multi-step LLM pipeline that research a topic, writes an outline, drafts each section in parallel, then assembles and reviews the final post. Demonstrates sequential flow, parallel fan-out with a reducer, structured output, and cost tracking. **Plugins used:** `withCostTracker`, `withRateLimit`, `withStructuredOutput` (LLM), `parallel` (core) *** ## The code ```typescript import "dotenv/config"; import { FlowBuilder } from "flowneer"; import { withCostTracker, withStructuredOutput, withRateLimit, } from "flowneer/plugins/llm"; import { callLlm, callLlmWithUsage } from "./utils/callLlm"; // your LLM helper const AppFlow = FlowBuilder.extend([ withCostTracker, withRateLimit, withStructuredOutput, ]); // ─── State ─────────────────────────────────────────────────────────────────── interface BlogState { topic: string; audience: string; research: string; outline: string[]; sections: Record; draft: string; finalPost: string; __cost?: number; } // ─── Flow ──────────────────────────────────────────────────────────────────── const blogFlow = new AppFlow() .withCostTracker() .withRateLimit({ requestsPerMinute: 60 }) // Step 1 — Research the topic .startWith(async (s) => { const { text, usage } = await callLlmWithUsage( `Research the topic "${s.topic}" for a blog post targeting ${s.audience}. ` + `Summarise the 5 most interesting angles in bullet points.`, ); s.research = text; s.__stepCost = (usage.inputTokens * 0.00015 + usage.outputTokens * 0.0006) / 1000; }) // Step 2 — Generate a structured outline .then(async (s) => { const { text, usage } = await callLlmWithUsage( `Based on this research:\n${s.research}\n\n` + `Write a JSON array of 4 section titles for a blog post about "${s.topic}". ` + `Return only valid JSON, e.g. ["Introduction", "Section 2", "Section 3", "Conclusion"]`, ); s.__llmOutput = text; s.__stepCost = (usage.inputTokens * 0.00015 + usage.outputTokens * 0.0006) / 1000; }) .withStructuredOutput({ parse: (raw) => JSON.parse(raw) as string[], onSuccess: (parsed, s) => { s.outline = parsed; }, }) // Step 3 — Write all sections in parallel .parallel( [0, 1, 2, 3].map((i) => async (s: BlogState) => { const title = s.outline[i]!; const text = await callLlm( `Write the "${title}" section of a blog post about "${s.topic}" ` + `for ${s.audience}. 2–3 paragraphs. Research context:\n${s.research}`, ); s.sections[title] = text; }), undefined, // Reducer: merge sections from each draft back into shared (shared, drafts) => { shared.sections = Object.assign({}, ...drafts.map((d) => d.sections)); }, ) // Step 4 — Assemble the draft .then(async (s) => { s.draft = s.outline .map((title) => `## ${title}\n\n${s.sections[title] ?? ""}`) .join("\n\n"); }) // Step 5 — Editorial pass .then(async (s) => { s.finalPost = await callLlm( `You are a senior editor. Improve this blog post draft for clarity, ` + `flow, and engagement. Keep it under 1000 words.\n\n${s.draft}`, ); }); // ─── Run ───────────────────────────────────────────────────────────────────── const state: BlogState = { topic: "The rise of AI coding assistants", audience: "senior software engineers", research: "", outline: [], sections: {}, draft: "", finalPost: "", }; await blogFlow.run(state); console.log("=== FINAL POST ==="); console.log(state.finalPost); console.log(`\nTotal LLM cost: $${(state.__cost ?? 0).toFixed(4)}`); ``` *** ## Key patterns ### Parallel fan-out with a reducer The parallel step runs one writer per section concurrently. Without a reducer, all four writers would race to write `s.sections` on the same object — unsafe. The reducer receives an array of isolated draft copies and merges them: ```typescript .parallel(writerFns, undefined, (shared, drafts) => { shared.sections = Object.assign({}, ...drafts.map((d) => d.sections)); }) ``` ### Structured output for the outline `withStructuredOutput` reads `s.__llmOutput`, parses it with your `parse` function, and calls `onSuccess` only when parsing succeeds. If it fails (malformed JSON) it retries the previous step automatically. ### Cost tracking Every step sets `s.__stepCost` (in USD). `withCostTracker` accumulates these into `s.__cost` so you have a total at the end. *** ## See also * [withStructuredOutput](../plugins/llm/structured-output.md) * [withCostTracker](../plugins/llm/cost-tracker.md) * [withRateLimit](../plugins/llm/rate-limit.md) * [Multi-agent Patterns](../presets/agent/patterns.md) — `supervisorCrew` is a clean alternative for this topology --- --- url: /flowneer/plugins/memory/buffer-window.md --- # BufferWindowMemory A sliding-window message buffer. Keeps the last `k` messages and discards older ones. The simplest and most commonly used memory implementation. ## Usage ```typescript import { BufferWindowMemory } from "flowneer/plugins/memory"; const memory = new BufferWindowMemory({ maxMessages: 20 }); memory.add({ role: "user", content: "Hello!" }); memory.add({ role: "assistant", content: "Hi there!" }); const context = memory.toContext(); // "user: Hello!\nassistant: Hi there!" const messages = memory.get(); // [{ role: "user", content: "Hello!" }, ...] ``` ## Constructor Options | Option | Type | Default | Description | | ------------- | -------- | ------- | ------------------------------------ | | `maxMessages` | `number` | `20` | Maximum number of messages to retain | ## Methods | Method | Signature | Description | | ----------- | ------------------------------ | ---------------------------------------- | | `add` | `(msg: MemoryMessage) => void` | Append a message; prunes when over limit | | `get` | `() => MemoryMessage[]` | Return a copy of current messages | | `clear` | `() => void` | Remove all messages | | `toContext` | `() => string` | Format as `"role: content\n..."` string | ## With `withMemory` ```typescript import { FlowBuilder } from "flowneer"; import { withMemory, BufferWindowMemory } from "flowneer/plugins/memory"; const AppFlow = FlowBuilder.extend([withMemory]); const memory = new BufferWindowMemory({ maxMessages: 10 }); const flow = new AppFlow() .withMemory(memory) .startWith(async (s) => { s.__memory!.add({ role: "user", content: s.input }); const ctx = s.__memory!.toContext(); s.response = await callLlm(ctx); s.__memory!.add({ role: "assistant", content: s.response }); }); ``` ## Pruning Behaviour When `messages.length > maxMessages`, the **oldest** messages are dropped so only the most recent `maxMessages` remain: ``` [msg1, msg2, ..., msg20, msg21] → [msg2, msg3, ..., msg21] ``` This preserves the most recent context without unbounded growth. --- --- url: /flowneer/presets/pipeline/clarify-loop.md --- # clarifyLoop A refinement loop that generates an output, evaluates whether it needs human clarification, and — if so — pauses to ask, then regenerates with the clarification incorporated. Repeats up to `maxRounds` times. Common use cases: ambiguous user queries, low-confidence LLM responses, outputs that reference unresolved terms. ## Import ```typescript import { clarifyLoop } from "flowneer/presets/pipeline"; ``` ## Usage ```typescript import { InterruptError } from "flowneer"; import { resumeFlow } from "flowneer/plugins/agent"; import { clarifyLoop } from "flowneer/presets/pipeline"; interface QueryState { query: string; output: string; confidence: number; humanClarification?: string; } const flow = clarifyLoop({ generateStep: async (s) => { const prompt = s.humanClarification ? `${s.query}\nAdditional context: ${s.humanClarification}` : s.query; const result = await llm(prompt); s.output = result.text; s.confidence = result.confidence; }, evaluateFn: (s) => s.confidence < 0.8, clarifyPrompt: (s) => `The response has low confidence (${s.confidence}). ` + `Please clarify your question or add more context:\n\n${s.output}`, maxRounds: 2, }); // ─── First run ──────────────────────────────────────────────────── try { await flow.run(state); // flow completed — output is either satisfactory or maxRounds exhausted } catch (e) { if (e instanceof InterruptError) { // Deliver the prompt to the user console.log("Clarification needed:", e.savedShared.__humanPrompt); // When the user replies: await resumeFlow(flow, e.savedShared, { humanClarification: "I meant the 2025 fiscal year totals", }); } } ``` ## Options | Option | Type | Default | Description | | --------------- | ----------------------------------------------- | ------------------------------------------------------------------------ | ------------------------------------------------------------------------------------------------------- | | `generateStep` | `NodeFn` | — | The generation step. On clarification rounds, `shared.humanClarification` holds the human's last input | | `maxRounds` | `number` | `3` | Maximum clarification rounds. After this the flow falls through even if `evaluateFn` still returns true | | `evaluateFn` | `(s, p) => boolean \| Promise` | `s.confidence < 0.7 \|\| output.includes("unclear")` | Returns `true` when the output needs clarification | | `clarifyPrompt` | `string \| (s, p) => string \| Promise` | `"The output is unclear or low-confidence. Please clarify:\n"` | Prompt stored on `shared.__humanPrompt` before interrupting | ## State keys ### Internal (deleted on normal completion) | Key | Description | | ----------------- | ------------------------------------------------------------------------------------- | | `__clarifyRounds` | Counter of clarification rounds completed so far (preserved across resumes via `??=`) | | `__humanPrompt` | The resolved clarification prompt, set before each interrupt | ### User-facing (read / written by consumer) | Key | Direction | Description | | -------------------- | ------------------------- | ------------------------------------------------------------------------------------ | | `humanClarification` | Written by caller | Inject via `resumeFlow` edits — passed through to `generateStep` on the next attempt | | `output` | Written by `generateStep` | The generated output (used by the default `evaluateFn` and prompt) | | `confidence` | Written by `generateStep` | Numeric confidence (used by the default `evaluateFn`) | ## How It Works ``` startWith: __clarifyRounds ??= 0 ↓ generateStep (user-provided) ↓ evaluateFn? false or rounds ≥ maxRounds → cleanup → done true → __clarifyRounds++, store __humanPrompt, throw InterruptError ``` On resume, `resumeFlow(flow, saved, { humanClarification: "…" })` re-runs the preset from the top. The `??=` in the init step preserves `__clarifyRounds`, so the counter accumulates correctly across multiple resume cycles. ## Resume pattern ```typescript // Catch and resume in a simple loop (e.g. CLI / test harness): let current: any = initialState; while (true) { try { await flow.run(current); break; // completed } catch (e) { if (!(e instanceof InterruptError)) throw e; const clarification = await askUser(e.savedShared.__humanPrompt); current = { ...e.savedShared, humanClarification: clarification }; } } ``` ## Default `evaluateFn` When `evaluateFn` is omitted the preset triggers clarification when either: * `shared.confidence` is a number less than `0.7`, or * `String(shared.output)` contains the substring `"unclear"`. Supply your own `evaluateFn` for domain-specific quality checks. ## Return value `clarifyLoop()` returns a `FlowBuilder`. Compose it with plugins: ```typescript const flow = clarifyLoop({ generateStep, evaluateFn }) .withTiming() .withCostTracker(); ``` ## See Also * [`approvalGate`](./approval-gate.md) — single yes/no/edit approval gate * [`generateUntilValid`](./generate-until-valid.md) — automated retry without human input * [Human-in-the-loop recipe](../../recipes/human-in-the-loop.md) — checkpoint + resume walkthrough * [`humanNode` plugin](../../plugins/agent/human-node.md) — lower-level interrupt primitive --- --- url: /flowneer/presets/agent/create-agent.md --- # createAgent & tool() High-level LangChain-style factory functions for building tool-calling agents in a single line of setup. `tool()` defines individual tools and `createAgent()` wires them into a ready-to-run `FlowBuilder`. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { tool, createAgent } from "flowneer/presets/agent"; ``` No `FlowBuilder.extend()` calls are needed — `createAgent` registers `withTools` and `withReActLoop` internally. *** ## `tool()` Create a `Tool` from an execute function and a config object. ```typescript function tool( execute: (args: TArgs) => unknown | Promise, config: ToolConfigSchema | ToolConfigParams, ): Tool; ``` ### With a Zod schema The preferred style — pass `schema: z.object(...)`. Types are inferred automatically. ```typescript import { z } from "zod"; const getWeather = tool( ({ city }) => `Sunny in ${city}!`, // execute — fully typed from schema { name: "get_weather", description: "Get the current weather for a given city", schema: z.object({ city: z.string().describe("The name of the city"), }), }, ); ``` Zod schemas are duck-typed — no direct Zod import is required at the Flowneer package level. ### With plain `params` Use the existing Flowneer `ToolParam` shape when you don't want a Zod dependency. ```typescript const getTime = tool(() => new Date().toUTCString(), { name: "get_time", description: "Get the current UTC date and time", params: {}, // no arguments needed }); const search = tool( async ({ query }: { query: string }) => fetchResults(query), { name: "web_search", description: "Search the web", params: { query: { type: "string", description: "Search query", required: true }, }, }, ); ``` ### Zod → `ToolParam` type mapping | Zod type | `ToolParam.type` | | ------------- | ---------------- | | `ZodString` | `"string"` | | `ZodNumber` | `"number"` | | `ZodBoolean` | `"boolean"` | | `ZodObject` | `"object"` | | `ZodArray` | `"array"` | | anything else | `"string"` | Optional fields (`z.string().optional()`) are mapped to `required: false`. *** ## `createAgent()` Build a reusable agent flow. ```typescript function createAgent(options: CreateAgentOptions): FlowBuilder; ``` Returns a `FlowBuilder`. Call `.run(state)` to execute. ### Options | Option | Type | Default | Description | | --------------- | ------------ | ------- | -------------------------------------------- | | `tools` | `Tool[]` | — | Tools the agent can invoke | | `callLlm` | `LlmAdapter` | — | Vendor-agnostic LLM adapter (see below) | | `systemPrompt` | `string` | — | System message prepended to the conversation | | `maxIterations` | `number` | `10` | Maximum think → act cycles before exhaustion | ### `AgentState` Initialise a state object and pass it to `.run()`: ```typescript interface AgentState { input: string; // user prompt — required output?: string; // final agent answer — set after run completes messages: ChatMessage[]; // conversation history (start as empty array) systemPrompt?: string; // alternative to the createAgent option } ``` ### `LlmAdapter` Supply your own LLM integration. The adapter receives the current conversation history and available tool schemas, and returns either a final text answer or a list of tool calls. ```typescript type LlmAdapter = ( messages: ChatMessage[], tools: LlmToolDef[], ) => Promise<{ text?: string; toolCalls?: ToolCall[] }>; ``` *** ## Full example ```typescript import { z } from "zod"; import { OpenAI } from "openai"; import { tool, createAgent } from "flowneer/presets/agent"; import type { LlmAdapter, AgentState } from "flowneer/presets/agent"; // 1. Define tools const getWeather = tool( ({ city }: { city: string }) => `Always sunny in ${city}!`, { name: "get_weather", description: "Get the weather for a given city", schema: z.object({ city: z.string().describe("City name") }), }, ); const getTime = tool(() => new Date().toUTCString(), { name: "get_time", description: "Get the current UTC time", params: {}, }); // 2. Build an OpenAI adapter const openai = new OpenAI(); const callLlm: LlmAdapter = async (messages, toolDefs) => { const res = await openai.chat.completions.create({ model: "gpt-4o-mini", messages: messages as any, tools: toolDefs.map((t) => ({ type: "function", function: t })), tool_choice: "auto", }); const msg = res.choices[0]!.message; if (msg.tool_calls?.length) { return { toolCalls: msg.tool_calls.map((tc) => ({ id: tc.id, name: (tc as any).function.name, args: JSON.parse((tc as any).function.arguments), })), }; } return { text: msg.content ?? "" }; }; // 3. Create the agent (once, reuse for all calls) const agent = createAgent({ tools: [getWeather, getTime], callLlm, systemPrompt: "You are a helpful assistant. Use tools when needed.", }); // 4. Run it const state: AgentState = { input: "What's the weather in Paris and what time is it?", messages: [], }; await agent.run(state); console.log(state.output); ``` *** ## How it works `createAgent` composes these Flowneer building blocks internally: 1. **`.withTools(tools)`** — registers the `ToolRegistry` on `shared.__tools` 2. **`.startWith(init)`** — seeds `shared.messages` with the system message + user input 3. **`.withReActLoop({ think, onObservation })`** — calls `callLlm` each iteration; on tool calls appends the assistant turn and dispatches tools; on finish stores the answer 4. **`.then(finalise)`** — copies `shared.__reactOutput` to `shared.output` The agent is **reusable** — each `.run(state)` call gets its own fresh message history. *** ## Shared state fields set by the agent | Field | Type | Description | | ------------------ | --------------- | ------------------------------------------------------ | | `output` | `string` | The agent's final answer | | `messages` | `ChatMessage[]` | Full conversation history after the run | | `__reactExhausted` | `boolean` | `true` when `maxIterations` was reached without finish | | `__toolResults` | `ToolResult[]` | Results from the last tool-execution round | *** ## See also * [withReActLoop](./react-loop.md) — low-level ReAct loop primitive * [withTools](../../plugins/tools/overview.md) — tool registry API * [Multi-agent Patterns](./patterns.md) — supervisor, sequential, hierarchical crews --- --- url: /flowneer/recipes/edge-runtime.md --- # Edge Runtime Run Flowneer flows on Cloudflare Workers, Vercel Edge Runtime, and Deno Deploy — no configuration, no shims, no changes to your flow code. **Plugins used:** any — the core and all bundled plugins are edge-compatible by default *** ## Why it just works Flowneer has zero runtime dependencies. The core (`FlowBuilder`, `.run()`, `.stream()`, every built-in plugin) uses only: * **ECMAScript built-ins** — `Promise`, `AsyncGenerator`, `Array`, `Map`, `Set` * **Web-standard globals** — `setTimeout`/`clearTimeout` (timers), `fetch`, `globalThis.crypto` * **Nothing Node-specific** — no `fs`, `path`, `Buffer`, `node:*` imports, `require()`, or Node streams `.stream()` returns a plain `AsyncGenerator`, not a Node.js `Readable`. You can pipe it straight into a `ReadableStream` response — the same pattern works on every edge runtime. *** ## Cloudflare Workers ```typescript import { FlowBuilder } from "flowneer"; import { withTiming } from "flowneer/plugins/observability"; const AppFlow = FlowBuilder.extend([withTiming]); interface SummariseState { url: string; content: string; summary: string; } const summariseFlow = new AppFlow() .withTiming() .startWith(async (s) => { const res = await fetch(s.url); s.content = await res.text(); }) .then(async (s) => { const res = await fetch("https://api.openai.com/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", Authorization: `Bearer ${(globalThis as any).OPENAI_API_KEY}`, }, body: JSON.stringify({ model: "gpt-4o-mini", messages: [ { role: "user", content: `Summarise in one paragraph:\n${s.content.slice(0, 4000)}`, }, ], }), }); const data = (await res.json()) as any; s.summary = data.choices[0].message.content; }); export default { async fetch( request: Request, env: Record, ctx: ExecutionContext, ) { const url = new URL(request.url).searchParams.get("url"); if (!url) return new Response("Missing ?url=", { status: 400 }); const state = await summariseFlow.run({ url, content: "", summary: "" }); return Response.json({ summary: state.summary }); }, } satisfies ExportedHandler; ``` ### Streaming SSE on Cloudflare Workers `.stream()` produces an `AsyncGenerator`. Wrap it in a `ReadableStream` response — Cloudflare Workers supports chunked `ReadableStream` responses natively. ```typescript import type { StreamEvent } from "flowneer"; const encoder = new TextEncoder(); export default { async fetch(request: Request) { const topic = new URL(request.url).searchParams.get("topic") ?? "AI"; const readable = new ReadableStream({ async start(controller) { try { for await (const event of myFlow.stream({ topic })) { if (event.type === "chunk") { controller.enqueue( encoder.encode(`data: ${JSON.stringify(event.data)}\n\n`), ); } if (event.type === "done") break; } } finally { controller.close(); } }, }); return new Response(readable, { headers: { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", "X-Accel-Buffering": "no", }, }); }, } satisfies ExportedHandler; ``` ### ⚠️ Telemetry daemon on Cloudflare Workers The `TelemetryDaemon` plugin uses a `setInterval`-based background flush loop. Cloudflare Workers execute inside V8 isolates — timers registered after a `Response` is returned **do not fire**. The auto-flush will be silently skipped. **Fix:** disable auto-flush and call `telemetry.flush()` inside `ctx.waitUntil()` so Cloudflare keeps the isolate alive until the export completes: ```typescript import { TelemetryDaemon } from "flowneer/plugins/telemetry"; // flushIntervalMs: 0 disables the setInterval background loop const telemetry = new TelemetryDaemon({ flushIntervalMs: 0, exporter: otlpExporter("https://otel.example.com/v1/traces"), }); const AppFlow = FlowBuilder.extend([telemetry.plugin()]); export default { async fetch(request: Request, env: unknown, ctx: ExecutionContext) { const state = await myFlow.run(initialState(request)); // waitUntil keeps the isolate alive while telemetry drains ctx.waitUntil(telemetry.flush()); return Response.json(state); }, } satisfies ExportedHandler; ``` All other plugins — resilience, persistence, observability, memory, messaging, tools, agent, dev, output — are fully compatible with Cloudflare Workers without any changes. *** ## Vercel Edge Runtime Add `export const runtime = "edge"` to a Next.js Route Handler. Everything works, including `TelemetryDaemon`'s auto-flush (Vercel Edge Runtime is Node-compatible and doesn't have the CF Workers timer restriction). ```typescript // app/api/summarise/route.ts import { FlowBuilder } from "flowneer"; import { withTiming } from "flowneer/plugins/observability"; import type { NextRequest } from "next/server"; export const runtime = "edge"; const AppFlow = FlowBuilder.extend([withTiming]); interface State { prompt: string; result: string; } const flow = new AppFlow().withTiming().startWith(async (s) => { const res = await fetch("https://api.openai.com/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", Authorization: `Bearer ${process.env.OPENAI_API_KEY}`, }, body: JSON.stringify({ model: "gpt-4o-mini", messages: [{ role: "user", content: s.prompt }], }), }); const data = (await res.json()) as any; s.result = data.choices[0].message.content; }); export async function GET(req: NextRequest) { const prompt = req.nextUrl.searchParams.get("prompt") ?? "Hello"; const state = await flow.run({ prompt, result: "" }); return Response.json({ result: state.result }); } ``` ### Streaming on Vercel Edge ```typescript // app/api/stream/route.ts import type { NextRequest } from "next/server"; export const runtime = "edge"; const encoder = new TextEncoder(); export async function GET(req: NextRequest) { const prompt = req.nextUrl.searchParams.get("prompt") ?? "Hello"; const readable = new ReadableStream({ async start(controller) { try { for await (const event of myFlow.stream({ prompt })) { if (event.type === "chunk") { controller.enqueue( encoder.encode(`data: ${JSON.stringify(event.data)}\n\n`), ); } } } finally { controller.close(); } }, }); return new Response(readable, { headers: { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", }, }); } ``` *** ## Deno Deploy ```typescript import { FlowBuilder } from "npm:flowneer"; interface State { prompt: string; result: string; } const flow = new FlowBuilder().startWith(async (s) => { const res = await fetch("https://api.openai.com/v1/chat/completions", { method: "POST", headers: { "Content-Type": "application/json", Authorization: `Bearer ${Deno.env.get("OPENAI_API_KEY")}`, }, body: JSON.stringify({ model: "gpt-4o-mini", messages: [{ role: "user", content: s.prompt }], }), }); const data = (await res.json()) as any; s.result = data.choices[0].message.content; }); Deno.serve(async (req) => { const prompt = new URL(req.url).searchParams.get("prompt") ?? "Hello"; const state = await flow.run({ prompt, result: "" }); return Response.json({ result: state.result }); }); ``` *** ## Compatibility table | Feature | CF Workers | Vercel Edge | Deno Deploy | | ----------------------------------------------------------- | :----------------: | :---------: | :---------: | | `FlowBuilder.run()` | ✅ | ✅ | ✅ | | `FlowBuilder.stream()` | ✅ | ✅ | ✅ | | `withTryCatch`, `withFallback`, `withCircuitBreaker` | ✅ | ✅ | ✅ | | `withTimeout` | ✅ | ✅ | ✅ | | `withRateLimit` | ✅ | ✅ | ✅ | | `withTiming`, `withHistory`, `withCallbacks`, `withVerbose` | ✅ | ✅ | ✅ | | `withMemory`, `withCheckpoint`, `withAuditLog` | ✅ | ✅ | ✅ | | `withStream`, `emit()` | ✅ | ✅ | ✅ | | `createAgent`, `withReActLoop` | ✅ | ✅ | ✅ | | `withStructuredOutput`, `withTokenBudget` | ✅ | ✅ | ✅ | | `TelemetryDaemon` (auto-flush) | ⚠️ use `waitUntil` | ✅ | ✅ | | `.batch()`, `.loop()`, `.parallel()` | ✅ | ✅ | ✅ | --- --- url: /flowneer/core/errors.md --- # Errors Flowneer wraps step failures in structured error types so you can distinguish flow errors from unexpected runtime errors and handle them appropriately. ## `FlowError` Thrown when a step (or sub-flow) fails after all retries are exhausted. ```typescript import { FlowError } from "flowneer"; try { await flow.run(shared); } catch (err) { if (err instanceof FlowError) { console.error(`Flow failed at: ${err.step}`); // e.g. "step 2" or "batch (step 1)" console.error(`Caused by:`, err.cause); // the original error } } ``` ### Properties | Property | Type | Description | | --------- | --------- | --------------------------------------------------------------- | | `step` | `string` | Human-readable step label, e.g. `"step 2"`, `"branch (step 0)"` | | `cause` | `unknown` | The original error that caused the failure | | `message` | `string` | `"Flow failed at {step}: {cause.message}"` | ### Step Labels | Step type | Label format | | ---------- | --------------------- | | `fn` | `"step N"` | | `branch` | `"branch (step N)"` | | `loop` | `"loop (step N)"` | | `batch` | `"batch (step N)"` | | `parallel` | `"parallel (step N)"` | *** ## `InterruptError` Thrown intentionally to **pause** a flow mid-execution — not a failure. Used by `interruptIf` and `humanNode` to implement human-in-the-loop and approval gates. ```typescript import { InterruptError } from "flowneer"; try { await flow.run(shared); } catch (err) { if (err instanceof InterruptError) { const savedState = err.savedShared; // deep clone of shared at interrupt time // save to DB, prompt user, etc. const userInput = await promptUser(savedState.__humanPrompt); // resume the flow await resumeFlow(flow, savedState, { feedback: userInput }, resumeFromStep); } } ``` ### Properties | Property | Type | Description | | ------------- | --------- | --------------------------------------------------------------------- | | `savedShared` | `unknown` | Deep clone (`JSON.parse/stringify`) of shared state at interrupt time | | `message` | `string` | `"Flow interrupted"` | `InterruptError` is **never wrapped** in a `FlowError` — it propagates directly so callers can catch it cleanly. *** ## `onError` Hook Plugins and callbacks can listen for step errors without stopping the flow via the `onError` hook: ```typescript this._setHooks({ onError: (meta, error, shared) => { console.error(`Step ${meta.index} failed:`, error); shared.lastError = error instanceof Error ? error.message : String(error); }, }); ``` `onError` fires **before** the error is propagated — it's informational, not a recovery mechanism. To recover, use [`withFallback`](../plugins/resilience/fallback.md). *** ## Error Handling Patterns ### Retry on failure ```typescript .then(riskyStep, { retries: 3, delaySec: 2 }) ``` ### Fallback on failure ```typescript const AppFlow = FlowBuilder.extend([withFallback]); const flow = new AppFlow(); flow.withFallback(async (s) => { s.result = s.__fallbackError.message; // flow continues normally after this }); ``` ### Circuit breaker ```typescript const AppFlow = FlowBuilder.extend([withCircuitBreaker]); const flow = new AppFlow(); flow.withCircuitBreaker({ maxFailures: 3, resetMs: 30_000 }); ``` ### Timeout ```typescript const AppFlow = FlowBuilder.extend([withTimeout]); const flow = new AppFlow(); flow.withTimeout(5000); // 5 s per step // or per-step: .then(slowStep, { timeoutMs: 10_000 }) ``` --- --- url: /flowneer/plugins/eval/overview.md --- # Eval Zero-dependency evaluation primitives for testing flows against datasets. Includes pure scoring functions and a dataset runner that executes a `FlowBuilder` over an array of inputs and aggregates metric averages. ## Import ```typescript import { exactMatch, containsMatch, f1Score, retrievalPrecision, retrievalRecall, answerRelevance, runEvalSuite, } from "flowneer/plugins/eval"; import type { ScoreFn, EvalResult, EvalSummary } from "flowneer/plugins/eval"; ``` > No `FlowBuilder.extend()` call needed — this plugin exports standalone functions only. ## Scoring Functions All scoring functions are pure and synchronous, returning a `number` between `0.0` and `1.0`. ### `exactMatch(predicted, expected)` Case-insensitive exact string comparison. ```typescript exactMatch("Paris", "paris"); // 1.0 exactMatch("London", "paris"); // 0.0 ``` ### `containsMatch(predicted, expected)` Returns `1.0` if `expected` is a substring of `predicted`. ```typescript containsMatch("The capital is Paris, France", "paris"); // 1.0 containsMatch("The capital is London", "paris"); // 0.0 ``` ### `f1Score(predicted, expected)` Token-level F1 score — harmonic mean of token precision and token recall. ```typescript f1Score("the quick brown fox", "the quick fox"); // ~0.857 f1Score("completely wrong answer", "the quick fox"); // 0.0 ``` ### `retrievalPrecision(retrieved, relevant)` Fraction of retrieved items that are in the relevant set. ```typescript retrievalPrecision(["a", "b", "c"], ["a", "c", "d"]); // 0.667 ``` ### `retrievalRecall(retrieved, relevant)` Fraction of relevant items that were retrieved. ```typescript retrievalRecall(["a", "b"], ["a", "b", "c", "d"]); // 0.5 ``` ### `answerRelevance(answer, keywords)` Fraction of `keywords` that appear in `answer`. Returns `1.0` if `keywords` is empty. ```typescript answerRelevance("Paris is the capital of France", [ "paris", "capital", "france", ]); // 1.0 answerRelevance("London is a city", ["paris", "capital", "france"]); // 0.0 ``` ## Dataset Runner ### `runEvalSuite(dataset, flow, scoreFns)` Runs a `FlowBuilder` over each item in `dataset` and collects named scores. ```typescript const { results, summary } = await runEvalSuite(dataset, flow, scoreFns); ``` | Parameter | Type | Description | | ---------- | ---------------------------- | ---------------------------------------------------------- | | `dataset` | `S[]` | Array of initial shared-state objects (one per test case) | | `flow` | `FlowBuilder` | The flow to execute for each item | | `scoreFns` | `Record>` | Named functions `(shared: S) => number \| Promise` | Each dataset item is deep-cloned before the flow runs to prevent cross-contamination between test cases. Items that throw are counted as `failed` and excluded from metric averages. ### `ScoreFn` ```typescript type ScoreFn = (shared: S) => number | Promise; ``` Receives the final shared state after the flow completes, returns a score between `0.0` and `1.0`. ### `EvalResult` ```typescript interface EvalResult { index: number; // Position in the dataset array shared: S; // Final shared state after the flow ran scores: Record; // Score per named metric error?: unknown; // Set if the flow threw } ``` ### `EvalSummary` ```typescript interface EvalSummary { total: number; // Total items passed: number; // Items that ran without error failed: number; // Items that threw averages: Record; // Per-metric average (error items excluded) } ``` ## Full Example ```typescript import { FlowBuilder } from "flowneer"; import { exactMatch, f1Score, answerRelevance, runEvalSuite, } from "flowneer/plugins/eval"; interface QAState { question: string; expectedAnswer: string; output?: string; } const qaFlow = new FlowBuilder().then(async (s) => { s.output = await callLlm(`Answer this question: ${s.question}`); }); const testCases: QAState[] = [ { question: "What is the capital of France?", expectedAnswer: "Paris" }, { question: "What is 2 + 2?", expectedAnswer: "4" }, { question: "Who wrote Hamlet?", expectedAnswer: "Shakespeare" }, ]; const { results, summary } = await runEvalSuite(testCases, qaFlow, { exact: (s) => exactMatch(s.output ?? "", s.expectedAnswer), f1: (s) => f1Score(s.output ?? "", s.expectedAnswer), relevance: (s) => answerRelevance( s.output ?? "", s.expectedAnswer.toLowerCase().split(/\s+/), ), }); console.log(summary); // { // total: 3, // passed: 3, // failed: 0, // averages: { exact: 0.67, f1: 0.78, relevance: 0.89 } // } // Inspect individual results for (const r of results) { if (r.error) { console.error(`Item ${r.index} failed:`, r.error); } else { console.log(`Item ${r.index}:`, r.scores); } } ``` ## Combining with Other Plugins Eval works with any flow, including flows that use `withMocks` to replace LLM calls with deterministic outputs during testing: ```typescript import { withMocks } from "flowneer/plugins/dev"; const AppFlow = FlowBuilder.extend([withMocks]); const testFlow = new AppFlow() .then(async (s) => { s.output = await callLlm(s.question); }) .withMocks([ { stepIndex: 0, mockFn: (s) => { s.output = s.expectedAnswer; }, }, ]); const { summary } = await runEvalSuite(testCases, testFlow, scoreFns); // Deterministic: exact average will be 1.0 ``` --- --- url: /flowneer/core/extending.md --- # Extending Flowneer Flowneer has four distinct extension points. Choosing the right one depends on what you're building and how broadly it should apply. | Mechanism | Scope | Use for | | --------------------------------------- | ----------------------- | ------------------------------------------------ | | `FlowBuilder.extend([plugins])` | Subclass, not global | Adding new builder methods (e.g. `withTiming()`) | | `CoreFlowBuilder.registerStepType(...)` | All instances, globally | New first-class step types | | `flow.add(fragment)` | One specific flow | Composing reusable partial flows | *** ## `FlowBuilder.extend([plugins])` — subclass plugin Creates an isolated **subclass** of `FlowBuilder` with new methods mixed in. This is the standard mechanism for publishable plugins — it never mutates the base class. ```typescript import { FlowBuilder } from "flowneer"; import type { FlowneerPlugin } from "flowneer"; export const withTiming: FlowneerPlugin = { withTiming(this: FlowBuilder) { const starts = new Map(); (this as any)._setHooks({ beforeStep: (meta) => { starts.set(meta.index, performance.now()); }, afterStep: (meta) => { console.log( `step ${meta.index} took ${performance.now() - starts.get(meta.index)!}ms`, ); }, }); return this; }, }; // Create a subclass — never touches FlowBuilder.prototype const AppFlow = FlowBuilder.extend([withTiming]); ``` Add TypeScript types via declaration merging: ```typescript declare module "flowneer" { interface FlowBuilder { withTiming(): this; } } ``` Then use it on any `AppFlow` instance: ```typescript new AppFlow().withTiming().then(step).run(shared); ``` Chain `extend()` to layer plugins: ```typescript const BaseFlow = FlowBuilder.extend([withTiming]); const AppFlow = BaseFlow.extend([withRateLimit]); // has both ``` ::: tip When to use this Publish as an npm package, or create your project's `AppFlow` once and import it everywhere. Methods become available on all instances of the subclass without affecting the base `FlowBuilder` or other subclasses. ::: *** ## `CoreFlowBuilder.registerStepType()` — custom step type Registers a completely new step type into the global dispatch table. The handler receives the step descriptor and a `StepContext` with `shared`, `params`, `signal`, `hooks`, and `builder`. Return `undefined` to continue, or an anchor name (without `#`) to goto. ```typescript import { CoreFlowBuilder, FlowBuilder } from "flowneer"; import type { StepHandler, StepContext } from "flowneer"; // 1. Define the handler const sleepHandler: StepHandler = async (step, ctx) => { await new Promise((r) => setTimeout(r, step.ms)); return undefined; }; // 2. Register it (once at startup) CoreFlowBuilder.registerStepType("sleep", sleepHandler); // 3. Add a builder method that pushes the step descriptor const AppFlow = FlowBuilder.extend([ { sleep(this: any, ms: number) { this.steps.push({ type: "sleep", ms }); return this; }, }, ]); ``` TypeScript types: ```typescript declare module "flowneer" { interface FlowBuilder { sleep(ms: number): this; } } ``` Usage: ```typescript new FlowBuilder() .then(fetchData) .sleep(500) // ← new step type .then(processData) .run(shared); ``` ::: tip Accessing `builder` in the handler `ctx.builder` is the `CoreFlowBuilder` instance. Use `ctx.builder._runSub()` to run a nested flow, or `ctx.builder._execute()` if you have a sub-flow built from a step descriptor (as `loop` and `batch` do internally). ::: *** ## `flow.add(fragment)` — composing fragments Fragments are reusable partial flows. Build one with the `fragment()` factory and splice it into any flow with `.add()`. ```typescript import { fragment, FlowBuilder } from "flowneer"; const enrich = fragment().then(fetchUser).then(enrichProfile); const summarise = fragment().loop( (s) => !s.done, (b) => b.then(summarize), ); new FlowBuilder() .then(init) .add(enrich) // splices enrich's steps in-place .add(summarise) .then(finalize) .run(shared); ``` Fragments cannot be `.run()` or `.stream()` directly — they are composable building blocks only. ::: tip When to use this Use fragments to share **step sequences** between flows. Use `FlowBuilder.extend()` to share **builder methods**. ::: --- --- url: /flowneer/core/flow-builder.md --- # FlowBuilder API `FlowBuilder` is the central class in Flowneer. All flow construction happens through its fluent, chainable API. ## Constructor ```typescript const flow = new FlowBuilder(); // With params type: const flow = new FlowBuilder(); ``` *** ## `.startWith(fn, options?)` Resets any previously chained steps and sets the first step of the flow. ```typescript flow.startWith(async (s) => { s.initialized = true; }); ``` | Parameter | Type | Description | | --------- | ------------------- | -------------------------------- | | `fn` | `NodeFn` | The step function | | `options` | `NodeOptions` | Optional retries, delay, timeout | *** ## `.then(fn, options?)` Appends a sequential step after the current chain. ```typescript flow.startWith(fetchData).then(processData).then(saveData); ``` *** ## `.branch(router, branches, options?)` Routes execution to one of several branches based on the return value of `router`. ```typescript flow.branch( (s) => s.sentiment, // returns a key { positive: async (s) => { s.reply = "Great!"; }, negative: async (s) => { s.reply = "Sorry to hear that."; }, default: async (s) => { s.reply = "Thanks!"; }, }, ); ``` * `router` returns a string key. If the key is not found, the `"default"` branch runs. * Both `router` and the selected branch function are retried together according to `options`. * A branch function can return an `"#anchorName"` string to jump to an anchor like any other step. *** ## `.loop(condition, body)` Repeatedly executes `body` while `condition` returns `true`. ```typescript flow.loop( (s) => s.retries < 3, (b) => b.startWith(async (s) => { s.result = await tryOperation(s); s.retries++; }), ); ``` The `body` callback receives an inner `FlowBuilder` — chain steps on it exactly as you would on the outer flow. *** ## `.batch(items, processor, options?)` Runs `processor` once for each item returned by `items`, setting `shared[key]` to the current item before each run. ```typescript flow.batch( (s) => s.documents, // extract the list (b) => b.startWith(async (s) => { const doc = s.__batchItem; // current item s.results.push(await summarize(doc)); }), ); ``` **Options:** | Option | Default | Description | | ------ | --------------- | ------------------------------------------------ | | `key` | `"__batchItem"` | Key on `shared` where the current item is stored | **Nesting batches:** use a unique `key` per level to prevent key collisions: ```typescript flow.batch( (s) => s.users, (b) => b .startWith((s) => console.log(s.__user)) .batch( (s) => s.__user.posts, (p) => p.startWith((s) => console.log(s.__post)), { key: "__post" }, ), { key: "__user" }, ); ``` *** ## `.parallel(fns, options?, reducer?)` Runs all functions concurrently against the same `shared` state. ```typescript flow.parallel([ async (s) => { s.weatherData = await fetchWeather(); }, async (s) => { s.newsData = await fetchNews(); }, async (s) => { s.stockData = await fetchStocks(); }, ]); ``` **With a `reducer` (safe mode):** each function receives its own shallow draft of `shared`. After all complete, the reducer merges results back — preventing concurrent write conflicts: ```typescript flow.parallel([workerA, workerB, workerC], undefined, (shared, drafts) => { shared.results = drafts.map((d) => d.output); }); ``` *** ## `.anchor(name)` Inserts a named no-op marker. Any step can jump to an anchor by returning `"#anchorName"`. ```typescript flow .anchor("retry") .then(async (s) => { s.attempts++; if (s.attempts < 3) return "#retry"; // jump back }) .then(finalize); ``` See [Anchors & Routing](./anchors-routing.md) for the full guide. *** ## `.add(fragment)` Splices all steps from a `Fragment` into the flow at the current position. Fragments are reusable partial flows created with the `fragment()` factory — see [Fragments](./fragments.md) for the full guide. ```typescript import { FlowBuilder, fragment } from "flowneer"; const enrich = fragment().then(fetchUser).then(enrichProfile); flow .startWith(init) .add(enrich) // enrich's steps are inlined here .then(finalize); ``` | Parameter | Type | Description | | ---------- | ------------------- | ------------------------------------- | | `fragment` | `FlowBuilder` | A fragment (or FlowBuilder) to inline | Steps are copied by reference (same semantics as `loop` / `batch` inner builders). The same fragment can be `.add()`-ed into multiple flows. *** ## `.run(shared, params?, options?)` Executes the flow. ```typescript await flow.run(initialState); await flow.run(initialState, { userId: "u1" }); await flow.run(initialState, {}, { signal: abortController.signal }); ``` *** ## `.stream(shared, params?, options?)` Executes the flow and yields `StreamEvent` objects as an async generator. ```typescript for await (const event of flow.stream(shared)) { if (event.type === "chunk") process.stdout.write(String(event.data)); if (event.type === "done") console.log("finished"); } ``` Event types: | Type | Payload | Description | | ------------- | ---------------- | ----------------------------------------- | | `step:before` | `meta: StepMeta` | Fired before each step | | `step:after` | `meta, shared` | Fired after each step | | `chunk` | `data: unknown` | Yielded from a generator step or `emit()` | | `error` | `error: unknown` | Unhandled error | | `done` | — | Flow completed | See [Streaming](./streaming.md) for details. *** ## `FlowBuilder.extend(plugins)` — static Create a subclass of `FlowBuilder` with the given plugins mixed in. ```typescript import { withTiming } from "flowneer/plugins/observability"; import { withRateLimit } from "flowneer/plugins/llm"; const AppFlow = FlowBuilder.extend([withTiming, withRateLimit]); const flow = new AppFlow() .withTiming() .withRateLimit({ intervalMs: 500 }) .startWith(step); ``` Chain `extend()` calls to layer plugins on top of a base subclass: ```typescript const BaseFlow = FlowBuilder.extend([withTiming]); const TracedFlow = BaseFlow.extend([withTrace]); // has both plugins ``` *** ## `StepMeta` Exposed to hooks and callbacks: ```typescript interface StepMeta { index: number; // 0-based step index type: "fn" | "branch" | "loop" | "batch" | "parallel" | "anchor"; label?: string; // optional label set via NodeOptions } ``` --- --- url: /flowneer/core/fragments.md --- # Fragments Fragments are reusable, composable partial flows — the Flowneer equivalent of Zod partials. Define a step chain once, then splice it into any number of flows with `.add()`. ## Creating a Fragment Use the `fragment()` factory function. It returns a `Fragment` instance that supports the full fluent API: `.then()`, `.loop()`, `.batch()`, `.branch()`, `.parallel()`, `.anchor()`. ```typescript import { fragment } from "flowneer"; const enrich = fragment().then(fetchUser).then(enrichProfile); const summarise = fragment().loop( (s) => !s.done, (b) => b.then(summarize), ); ``` Fragments are typed — `Fragment` carries the same shared-state and params types as `FlowBuilder`, giving you full type safety when composing flows. *** ## Embedding with `.add()` Call `.add(fragment)` on any `FlowBuilder` to splice the fragment's steps inline at that position: ```typescript import { FlowBuilder, fragment } from "flowneer"; const flow = new FlowBuilder() .then(init) .add(enrich) // enrich's steps are inlined here .add(summarise) // summarise's steps follow .then(finalize); await flow.run(shared); ``` This is equivalent to manually chaining every step from the fragment — `.add()` just copies them in order. *** ## Reuse Across Flows The same fragment instance can be `.add()`-ed into multiple flows without conflict. Steps are copied by reference (same semantics as `loop` / `batch` inner builders). ```typescript const validate = fragment().then(checkInput).then(sanitize); const flowA = new FlowBuilder().add(validate).then(handleA); const flowB = new FlowBuilder().add(validate).then(handleB); ``` *** ## Fragments Cannot Run Directly Calling `.run()` or `.stream()` on a `Fragment` throws an error: ```typescript const frag = fragment().then(myStep); await frag.run({}); // ❌ Error: Fragment cannot be run directly — use .add() ``` This is by design — fragments are building blocks, not standalone flows. Always embed them via `.add()`. *** ## All Step Types Supported Fragments can contain any step type: ```typescript const complex = fragment() .then(stepA) .branch((s) => s.mode, { fast: stepFast, slow: stepSlow, }) .loop( (s) => !s.done, (b) => b.then(iterate), ) .batch( (s) => s.items, (b) => b.then(processItem), ) .parallel([workerA, workerB]) .anchor("checkpoint"); ``` When `.add(complex)` is called, all of these steps are inlined into the parent flow and execute as if they were defined directly on it. *** ## Composing Fragments Fragments can `.add()` other fragments, enabling layered composition: ```typescript const auth = fragment().then(authenticate).then(authorize); const enrich = fragment().then(fetchProfile).then(loadPrefs); const setup = fragment().add(auth).add(enrich); const flow = new FlowBuilder().add(setup).then(handleRequest); ``` *** ## API Reference ### `fragment()` Factory function that creates a new `Fragment`. ```typescript import { fragment } from "flowneer"; const frag = fragment(); ``` ### `Fragment` Class extending `FlowBuilder`. Inherits all builder methods. Overrides `.run()` and `.stream()` to throw. ### `.add(frag)` Method on `FlowBuilder` that splices all steps from `frag` into the current flow. | Parameter | Type | Description | | --------- | ------------------- | ------------------------------------- | | `frag` | `FlowBuilder` | A fragment (or FlowBuilder) to inline | Returns `this` for chaining. --- --- url: /flowneer/presets/pipeline/generate-until-valid.md --- # generateUntilValid Generate → validate → retry loop. If the generated output fails validation the error message is placed on shared state so the generator can correct itself on the next attempt. Distinct from the [`withStructuredOutput`](../../plugins/llm/structured-output.md) plugin (which wraps every step) — this is a self-contained flow for a single generation step with retry logic. ## Import ```typescript import { generateUntilValid } from "flowneer/presets/pipeline"; ``` ## Usage ```typescript interface CodeState { prompt: string; code: string; __validationError?: string; } const flow = generateUntilValid({ generate: async (s) => { const hint = s.__validationError ? `\nPrevious error: ${s.__validationError}` : ""; s.code = await llm(`Write a TypeScript function.${hint}\n${s.prompt}`); }, validate: (s) => { try { new Function(s.code); return true; } catch (e) { return (e as Error).message; } }, maxAttempts: 3, }); await flow.run({ prompt: "Sort an array of numbers", code: "" }); ``` ## Options | Option | Type | Default | Description | | ------------- | --------------------------------------------------------------- | ------- | ---------------------------------------------------------------------------------------------------- | | `generate` | `NodeFn` | — | Produces the output. On retries `(shared as any).__validationError` holds the previous error message | | `validate` | `(shared, params) => true \| string \| Promise` | — | Return `true` when valid, or an error string to retry | | `maxAttempts` | `number` | `3` | Maximum generation attempts | ## State keys | Key | Description | | ------------------- | ---------------------------------------------------------------------------------- | | `__validationError` | `undefined` on success, or the last validation error string if all attempts failed | ## Return value Returns a `FlowBuilder`: ```typescript const flow = generateUntilValid({ generate, validate }) .withTiming() .withCostTracker(); ``` ## See Also * [`mapReduceLlm`](./map-reduce-llm.md) — batch LLM calls across N items * [`withStructuredOutput`](../../plugins/llm/structured-output.md) — step-level output parsing and retry --- --- url: /flowneer/core/getting-started.md --- # Getting Started ## Installation ```bash bun add flowneer # or npm install flowneer # or pnpm add flowneer ``` ## Your First Flow Every Flowneer flow starts with a `FlowBuilder`. You define a **shared state** type, chain steps, and call `.run()`. ```typescript import { FlowBuilder } from "flowneer"; interface State { input: string; result: string; } const flow = new FlowBuilder() .startWith(async (s) => { s.result = s.input.toUpperCase(); }) .then(async (s) => { console.log(s.result); // "HELLO WORLD" }); await flow.run({ input: "hello world", result: "" }); ``` ### The Shared State Model All steps operate on the **same object** — `s` in every step is the same reference. Mutate it directly; never replace it with a spread (`s = { ...s }`), as that would break the reference shared between steps. ```typescript // ✅ Correct — mutate in place async (s) => { s.count += 1; }; // ❌ Incorrect — replaces the reference, upstream steps see the old object async (s) => { s = { ...s, count: s.count + 1 }; }; ``` ## Registering Plugins Plugins extend `FlowBuilder` with new methods. Use `FlowBuilder.extend([...plugins])` to create a subclass that has those methods available. ```typescript import { FlowBuilder } from "flowneer"; import { withTiming } from "flowneer/plugins/observability"; import { withCostTracker } from "flowneer/plugins/llm"; const AppFlow = FlowBuilder.extend([withTiming, withCostTracker]); // Now all AppFlow instances have .withTiming() and .withCostTracker() const flow = new AppFlow() .withTiming() .withCostTracker() .startWith(myStep); ``` See [Writing Plugins](./plugins.md) for how to create your own. ## Step Options Every step (`.startWith`, `.then`, `.parallel`) accepts an optional `NodeOptions` object: | Option | Type | Default | Description | | ----------- | ---------------------------- | ------- | ------------------------------------------ | | `retries` | `number \| (s, p) => number` | `1` | How many total attempts (1 = no retry) | | `delaySec` | `number \| (s, p) => number` | `0` | Seconds between retry attempts | | `timeoutMs` | `number \| (s, p) => number` | `0` | Per-step wall-clock timeout (0 = disabled) | ```typescript const flow = new FlowBuilder().startWith(fetchData, { retries: 3, delaySec: 1, timeoutMs: 5000, }); ``` `retries` and `delaySec` can be functions for dynamic per-step behaviour: ```typescript .then(myStep, { retries: (s) => (s.isImportant ? 5 : 1), }) ``` ## Aborting a Flow Pass an `AbortSignal` to `.run()` to cancel mid-flow: ```typescript const controller = new AbortController(); setTimeout(() => controller.abort(), 3000); await flow.run(shared, {}, { signal: controller.signal }); ``` ## TypeScript Generics `FlowBuilder` has two type parameters: * `S` — the shared state type (required) * `P` — the optional `params` type (defaults to `Record`) `params` are read-only contextual values injected at `.run()` time — useful for request-scoped data like user IDs or request metadata. ```typescript interface Params { userId: string; requestId: string; } const flow = new FlowBuilder().startWith(async (s, params) => { s.userId = params.userId; }); await flow.run(initialState, { userId: "u123", requestId: "r456" }); ``` --- --- url: /flowneer/plugins/graph/export.md --- ````markdown # Graph & Flow Export Two plugins let you serialise a flow's structure to JSON for debugging, visualisation, or documentation generation. | Plugin | Works on | Returns | | ----------------- | ----------------- | ------------------------------------ | | `withExportGraph` | Graph flows only | `GraphExport` (nodes + edges) | | `withExportFlow` | Any `FlowBuilder` | `FlowExport` (flow + optional graph) | Load `withExportFlow` last — it overrides `.exportGraph()` with the richer unified shape. --- ## `withExportGraph` Exports the raw nodes and edges declared via `addNode` / `addEdge` **before** `.compile()` is called. The call is non-destructive — `.compile()` can still be chained after. ### Setup ```typescript import { FlowBuilder } from "flowneer"; import { withGraph, withExportGraph } from "flowneer/plugins/graph"; const AppFlow = FlowBuilder.extend([withGraph, withExportGraph]); ``` ### Usage ```typescript const result = new AppFlow() .addNode("fetch", fetchData, { retries: 3 }) .addNode("transform", transformData) .addNode("save", saveData) .addEdge("fetch", "transform") .addEdge("transform", "save") .addEdge("transform", "fetch", (s) => s.needsRetry) // conditional back-edge .exportGraph(); // ← non-destructive console.log(JSON.stringify(result, null, 2)); ``` ```json { "format": "json", "nodes": [ { "name": "fetch", "options": { "retries": 3 } }, { "name": "transform" }, { "name": "save" } ], "edges": [ { "from": "fetch", "to": "transform", "conditional": false }, { "from": "transform", "to": "save", "conditional": false }, { "from": "transform", "to": "fetch", "conditional": true } ] } ``` ### `GraphExport` type ```typescript interface GraphExport { format: "json"; nodes: GraphNodeExport[]; edges: GraphEdgeExport[]; } interface GraphNodeExport { name: string; options?: { retries?: number | string; // "" when a function delaySec?: number | string; timeoutMs?: number | string; }; } interface GraphEdgeExport { from: string; to: string; conditional: boolean; // true when the edge has a runtime guard } ``` ### Notes - Options with value `0` (the default) are omitted from the output. - Dynamic option values (functions) are serialised as `""`. - Calling `.exportGraph()` on a builder with no nodes throws an error. - The `"mermaid"` format is reserved for a future release. --- ## `withExportFlow` Exports **any** `FlowBuilder` — sequential, loop, batch, parallel, branch — into a structured node/edge graph. When loaded alongside `withGraph`, also includes the raw graph store in a separate `graph` section. Loading `withExportFlow` after `withExportGraph` is fine — it overrides `.exportGraph()` at runtime. ### Setup ```typescript import { FlowBuilder } from "flowneer"; import { withExportFlow } from "flowneer/plugins/graph"; // Optionally also load withGraph + withExportGraph for the combined output: // const AppFlow = FlowBuilder.extend([withGraph, withExportGraph, withExportFlow]); const AppFlow = FlowBuilder.extend([withExportFlow]); // load last ``` ### Usage — sequential flow ```typescript const result = new AppFlow() .startWith(loadData) .then(validate) .then(save) .exportGraph(); ``` ```json { "format": "json", "flow": { "nodes": [ { "id": "fn_0", "type": "fn", "label": "loadData" }, { "id": "fn_1", "type": "fn", "label": "validate" }, { "id": "fn_2", "type": "fn", "label": "save" } ], "edges": [ { "from": "fn_0", "to": "fn_1", "kind": "sequential" }, { "from": "fn_1", "to": "fn_2", "kind": "sequential" } ] } } ``` ### Usage — flow with complex steps ```typescript const result = new AppFlow() .startWith(init) .loop( (s) => !s.done, (b) => b.startWith(process).then(check), ) .parallel([workerA, workerB, workerC]) .exportGraph(); ``` Loop and parallel bodies get nested `id` paths (e.g. `"loop_1:body:fn_0"`) so the full structure is unambiguous. ### Usage — graph + flow combined When `withGraph`, `withExportGraph`, and `withExportFlow` are all loaded: ```typescript const result = new AppFlow() .addNode("a", stepA) .addNode("b", stepB) .addEdge("a", "b") .exportGraph(); // before compile() — includes both sections ``` ```json { "format": "json", "flow": { "nodes": [], "edges": [] }, "graph": { "nodes": [{ "name": "a" }, { "name": "b" }], "edges": [{ "from": "a", "to": "b", "conditional": false }] } } ``` After `.compile()` the flow section is populated; the graph section is still present. ### `FlowExport` type ```typescript interface FlowExport { format: "json"; flow: { nodes: FlowNodeExport[]; edges: FlowEdgeExport[]; }; graph?: { // present only when a graph store is attached nodes: GraphNodeExport[]; edges: GraphEdgeExport[]; }; } interface FlowNodeExport { id: string; // e.g. "fn_0", "loop_2", "loop_2:body:fn_1" type: "fn" | "branch" | "loop" | "batch" | "parallel" | "anchor"; label: string; // function name, anchor name, or "anonymous" options?: { retries?: number | string; delaySec?: number | string; timeoutMs?: number | string; }; meta?: Record; // branch keys, parallel count, etc. } interface FlowEdgeExport { from: string; to: string; kind: | "sequential" | "branch-arm" | "loop-body" | "loop-back" | "parallel-fan-out" | "batch-body"; label?: string; // branch key or parallel index } ``` ### Node `id` conventions | Step type | `id` pattern | | ---------- | ------------------------------- | | `fn` | `fn_` | | `branch` | `branch_` | | branch arm | `branch_:arm:` | | `loop` | `loop_` | | loop body | `loop_:body:` | | `batch` | `batch_` | | batch body | `batch_:body:` | | `parallel` | `parallel_` | | `anchor` | `anchor:` | --- ## See also - [Graph Composition](./overview.md) — `addNode`, `addEdge`, `compile()` ```` --- --- url: /flowneer/plugins/graph/overview.md --- # Graph Composition Declare flows as directed acyclic graphs (DAGs) with `addNode` and `addEdge`, then call `compile()` to produce an executable `FlowBuilder`. Cycles are supported via conditional back-edges, which compile into anchor-based goto jumps. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withGraph } from "flowneer/plugins/graph"; const AppFlow = FlowBuilder.extend([withGraph]); ``` ## Usage ```typescript const flow = new AppFlow() .addNode("fetch", fetchData) .addNode("validate", validateData) .addNode("transform", transformData) .addNode("save", saveData) .addEdge("fetch", "validate") .addEdge("validate", "transform") .addEdge("transform", "save") // Conditional back-edge: loop back to fetch if retry needed .addEdge("validate", "fetch", (s) => s.needsRetry) .compile(); await flow.run({ url: "https://api.example.com/data", needsRetry: false }); ``` ## API ### `.addNode(name, fn, options?)` Register a named node. Nodes are not executed at registration time. | Parameter | Type | Description | | --------- | ------------------- | -------------------------------- | | `name` | `string` | Unique node name | | `fn` | `NodeFn` | Step function | | `options` | `NodeOptions` | Optional retries, delay, timeout | ### `.addEdge(from, to, condition?)` Add a directed edge between nodes. | Parameter | Type | Description | | ----------- | ------------------------------------------------- | ---------------- | | `from` | `string` | Source node name | | `to` | `string` | Target node name | | `condition` | `(shared, params) => boolean \| Promise` | Optional guard | Conditional edges enable **cycles** (back-edges). Unconditional edges form the DAG skeleton. ### `.compile()` Topologically sorts the unconditional edges and compiles the graph into an executable flow: 1. Runs Kahn's algorithm to find the topological order. 2. Detects back-edges and compiles them as `.anchor()` + conditional goto. 3. Inserts `.anchor()` + routing step for each back-edge. 4. Returns `this` for chaining. **Throws** if unconditional edges form a cycle — use conditional edges to break any cycle. ## How Compilation Works Given a graph `A → B → C` with a back-edge `C → A (conditional)`: ``` compile() produces: .anchor("A") .then(nodeA_fn) .then(nodeB_fn) .then(nodeC_fn) .then(async (s, p) => { if (condition(s, p)) return "#A"; }) ``` ## Full Example ```typescript interface ProcessState { data: any[]; valid: boolean; needsRetry: boolean; retries: number; } const flow = new AppFlow() .addNode("load", (s) => { s.data = loadData(); }) .addNode("validate", (s) => { s.valid = s.data.every(isValid); s.needsRetry = !s.valid; }) .addNode("process", async (s) => { s.data = await process(s.data); }) .addNode("save", async (s) => { await saveData(s.data); }) .addEdge("load", "validate") .addEdge("validate", "process") .addEdge("process", "save") // Retry up to 3 times if validation fails .addEdge("validate", "load", (s) => s.needsRetry && ++s.retries < 3) .compile() .withCycles(3, "load"); // guard against infinite retry loops ``` ## Notes * All nodes must have at least one incoming edge except the starting node (in-degree 0). * Unreachable nodes are included in the compiled flow if they appear in `addNode()` but not reachable from a root. * `compile()` mutates the builder in place and returns `this`. ## See also * [Graph & Flow Export](./export.md) — `withExportGraph` and `withExportFlow` --- --- url: /flowneer/recipes/human-in-the-loop.md --- # Human-in-the-loop Pause a flow mid-execution to ask a human for input or approval, then resume with their response. Useful for content moderation gates, approval workflows, or any pipeline where human judgment is required at a specific step. **Plugins used:** `withHumanNode`, `resumeFlow` (agent), `withCheckpoint` (persistence) *** ## The code ### The flow ```typescript import { FlowBuilder } from "flowneer"; import { withHumanNode, resumeFlow } from "flowneer/plugins/agent"; import { withCheckpoint } from "flowneer/plugins/persistence"; import { InterruptError } from "flowneer"; const AppFlow = FlowBuilder.extend([withHumanNode, withCheckpoint]); // ─── State ─────────────────────────────────────────────────────────────────── interface ContentState { jobId: string; rawContent: string; humanFeedback?: string; approved?: boolean; finalContent: string; checkpointData?: unknown; // used by withCheckpoint } // ─── Simulated checkpoint store (use Redis / DB in production) ─────────────── const checkpointStore = new Map(); // ─── Flow ──────────────────────────────────────────────────────────────────── const contentFlow = new AppFlow() .withCheckpoint({ // Persist state so it survives the process restart between pause and resume save: async (id, data) => { checkpointStore.set(id, data); }, load: async (id) => checkpointStore.get(id) ?? null, key: (s) => s.jobId, }) // Step 1 — Generate content .startWith(async (s) => { // Replace with your LLM call s.rawContent = `Draft article about ${s.jobId}... [generated content here]`; console.log("Content generated. Awaiting human review."); }) // Step 2 — Pause and ask a human for approval .withHumanNode({ prompt: (s) => `Please review the following content and reply with "approve", "reject", ` + `or "edit: ":\n\n${s.rawContent}`, onResponse: (response, s) => { if (response.startsWith("edit: ")) { s.humanFeedback = response.slice(6); s.approved = true; } else { s.approved = response.toLowerCase() === "approve"; s.humanFeedback = response; } }, timeoutMs: 24 * 60 * 60 * 1000, // 24 hours }) // Step 3 — Act on the review .then((s) => { if (!s.approved) { console.log("Content rejected. Stopping pipeline."); return; } s.finalContent = s.humanFeedback?.startsWith("edit:") ? s.humanFeedback : s.rawContent; console.log("Content approved! Publishing:", s.finalContent.slice(0, 80)); }); ``` ### Starting the flow (initial run) ```typescript async function startJob(jobId: string) { const state: ContentState = { jobId, rawContent: "", finalContent: "", }; try { await contentFlow.run(state); console.log("Flow completed without interruption."); } catch (err) { if (err instanceof InterruptError) { // The humanNode paused execution — state is checkpointed. // Send the prompt to the reviewer (email, Slack, webhook, etc.) console.log("\n⏸ Flow paused — awaiting human review."); console.log("Prompt sent to reviewer:", err.prompt); console.log("Resume with: resumeJob('${jobId}', '')"); } else { throw err; } } } ``` ### Resuming the flow (after human responds) ```typescript async function resumeJob(jobId: string, humanResponse: string) { // Load the checkpointed state from your store const savedState = checkpointStore.get(jobId) as ContentState | undefined; if (!savedState) throw new Error(`No checkpoint found for job ${jobId}`); // resumeFlow injects the human response and re-runs from the interrupt point await resumeFlow(contentFlow, savedState, humanResponse); console.log("Flow resumed and completed."); } // Simulate the round-trip await startJob("article-42"); await resumeJob("article-42", "approve"); ``` *** ## How `withHumanNode` works 1. When the flow reaches `.withHumanNode()`, it throws an `InterruptError` containing the prompt string. 2. Your `catch` block receives the error and is responsible for delivering the prompt to a human (email, Slack message, HTTP webhook, etc.). 3. When the human responds, call `resumeFlow(flow, savedState, response)`. This injects the response via `onResponse`, then continues execution from the step immediately after the interrupt. 4. `withCheckpoint` persists state between the throw and the resume — essential when your process may restart in the meantime. ## Variation — sequential approval gates Chain multiple approval steps for a multi-stage review pipeline: ```typescript const publishFlow = new AppFlow() .startWith(generateDraft) .withHumanNode({ prompt: (s) => `Review draft:\n${s.draft}`, onResponse: setEditorFeedback, }) .then(incorporateFeedback) .withHumanNode({ prompt: (s) => `Legal review:\n${s.revised}`, onResponse: setLegalApproval, }) .then(publishContent); ``` ## Variation — Slack / webhook delivery ```typescript catch (err) { if (err instanceof InterruptError) { await slack.chat.postMessage({ channel: "#content-review", text: err.prompt, metadata: { jobId }, }); } } ``` In your Slack event handler, call `resumeJob(jobId, slackResponse.text)`. *** ## See also * [humanNode reference](../plugins/agent/human-node.md) * [withCheckpoint](../plugins/persistence/checkpoint.md) * [Anchors & Routing](../core/anchors-routing.md) --- --- url: /flowneer/plugins/agent/human-node.md --- # humanNode & resumeFlow Insert a human-in-the-loop pause point into a flow. When the step executes it throws an `InterruptError` carrying a deep snapshot of the current state. A `resumeFlow` helper makes resuming ergonomic. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withHumanNode } from "flowneer/plugins/agent"; const AppFlow = FlowBuilder.extend([withHumanNode]); ``` ## Usage ```typescript import { InterruptError } from "flowneer"; import { withHumanNode, resumeFlow } from "flowneer/plugins/agent"; interface State { draft: string; approved: boolean; feedback: string; __humanPrompt?: string; } const flow = new AppFlow() .startWith(async (s) => { s.draft = await generateDraft(s.topic); }) .humanNode({ prompt: "Please review the draft and provide feedback.", }) .then(async (s) => { if (!s.approved) { s.draft = await reviseDraft(s.draft, s.feedback); } }); // --- Runner --- const shared: State = { draft: "", approved: false, feedback: "" }; try { await flow.run(shared); } catch (e) { if (e instanceof InterruptError) { const saved = e.savedShared as State; console.log("Prompt:", saved.__humanPrompt); console.log("Draft:", saved.draft); // Get human input... const feedback = await getHumanInput(); const approved = await getApproval(); // Resume from step 2 (skip steps 0 and 1 which already ran) await resumeFlow(flow, saved, { feedback, approved }, 2); } } ``` ## `.humanNode(options?)` | Option | Type | Default | Description | | ----------- | ----------------------------------------------- | ----------------- | -------------------------------------------- | | `promptKey` | `string` | `"__humanPrompt"` | Key on `shared` where the prompt is stored | | `prompt` | `string \| (s, p) => string \| Promise` | — | Prompt message to store before interrupting | | `condition` | `(s, p) => boolean \| Promise` | Always interrupt | Only interrupt when condition returns `true` | ## `resumeFlow(flow, saved, edits?, fromStep?)` Helper that merges `edits` into `savedShared`, optionally applies `withReplay(fromStep)` to skip already-completed steps, and calls `flow.run(merged)`. ```typescript import { resumeFlow } from "flowneer/plugins/agent"; await resumeFlow( flow, e.savedShared as State, { feedback: "Looks good", approved: true }, resumeFromStep, // skip steps 0..resumeFromStep-1 ); ``` | Parameter | Type | Description | | ---------- | ------------- | ------------------------------------------------------- | | `flow` | `FlowBuilder` | The same instance that was interrupted | | `saved` | `S` | `e.savedShared` from the `InterruptError` | | `edits` | `Partial` | Human input / corrections to merge | | `fromStep` | `number` | If provided, skips steps 0..fromStep-1 via `withReplay` | ## Conditional Interrupts Only pause when the draft needs review: ```typescript .humanNode({ prompt: (s) => `Review this draft:\n\n${s.draft}`, condition: (s) => s.draft.includes("TODO") || s.confidence < 0.8, }) ``` ## See Also * [`interruptIf`](../observability/interrupts.md) — lower-level interrupt primitive * [`withReplay`](../persistence/replay.md) — skip completed steps on resume --- --- url: /flowneer/presets/rag/iterative-rag.md --- # iterativeRag Multi-pass RAG loop: **retrieve → generate → if still unsatisfied, retrieve again**. Useful when a single retrieval isn't enough and the model needs to issue follow-up searches with a refined query. The current pass number is available as `(shared as any).__ragIter` (0-based) so `retrieve` can adapt its strategy. ## Import ```typescript import { iterativeRag } from "flowneer/presets/rag"; ``` ## Usage ```typescript interface RagState { question: string; context: string[]; answer: string; followUpQuery?: string; } const flow = iterativeRag({ retrieve: async (s) => { const query = (s as any).__ragIter === 0 ? s.question : (s.followUpQuery ?? s.question); s.context = await vectorSearch(query); }, generate: async (s) => { const result = await llm(buildPrompt(s)); s.answer = result.answer; s.followUpQuery = result.followUpQuery; // set when more info is needed }, needsMoreInfo: (s) => Boolean(s.followUpQuery), maxIterations: 3, }); await flow.run({ question: "Explain Flowneer presets", context: [], answer: "", }); ``` ## Options | Option | Type | Default | Description | | --------------- | ------------------------------------------------- | ------- | ----------------------------------------------------------------------------------- | | `retrieve` | `NodeFn` | — | Fetches documents; `(shared as any).__ragIter` (0-based) indicates the current pass | | `generate` | `NodeFn` | — | Generates a (potentially partial) answer; can signal more info is needed | | `needsMoreInfo` | `(shared, params) => boolean \| Promise` | — | Return `true` to trigger another retrieve → generate pass | | `maxIterations` | `number` | `3` | Maximum passes before the loop exits regardless | ## How It Works ``` initialize(__ragIter = 0, __ragDone = false) loop(while !__ragDone && __ragIter < maxIterations): retrieve generate check needsMoreInfo → set __ragDone or increment __ragIter cleanup(__ragIter, __ragDone) ``` ## Return value Returns a `FlowBuilder` — composable with all Flowneer plugins: ```typescript const flow = iterativeRag({ retrieve, generate, needsMoreInfo }) .withCostTracker() .withTiming(); ``` ## See Also * [`ragPipeline`](./rag-pipeline.md) — single-pass RAG --- --- url: /flowneer/presets/config/overview.md --- # JsonFlowBuilder Build and validate `FlowBuilder` instances from a plain JSON (or TypeScript object) configuration. Useful for dynamic flows, low-code editors, database-driven pipelines, or any scenario where the flow structure is determined at runtime rather than compile time. ## Setup ```typescript import { JsonFlowBuilder } from "flowneer/presets/config"; ``` `JsonFlowBuilder` is a standalone class — no `FlowBuilder.extend()` call needed. *** ## Quick start ```typescript import { JsonFlowBuilder } from "flowneer/presets/config"; const config = { steps: [ { type: "fn", fn: "fetchUser", label: "pii:user", retries: 2 }, { type: "fn", fn: "saveResult", label: "save" }, ], }; const registry = { fetchUser: async (s) => { s.user = await db.get(s.userId); }, saveResult: async (s) => { await db.save(s.result); }, }; const flow = JsonFlowBuilder.build(config, registry); await flow.run(shared); ``` *** ## `FlowConfig` — supported step types All `FlowBuilder` step types are supported: ```typescript type StepConfig = | { type: "fn"; fn: string; label?: string; retries?: number; delaySec?: number; timeoutMs?: number; } | { type: "branch"; router: string; branches: Record; label?: string; retries?: number; delaySec?: number; timeoutMs?: number; } | { type: "loop"; condition: string; body: StepConfig[]; label?: string } | { type: "batch"; items: string; processor: StepConfig[]; key?: string; label?: string; } | { type: "parallel"; fns: string[]; label?: string; retries?: number; delaySec?: number; timeoutMs?: number; } | { type: "anchor"; name: string; maxVisits?: number }; interface FlowConfig { steps: StepConfig[]; } ``` All string values (`fn`, `router`, `condition`, `items`, `fns`, etc.) are registry keys — resolved to real functions at build time. *** ## API ### `JsonFlowBuilder.build(config, registry, FlowClass?)` Validates and compiles a `FlowConfig` into a runnable `FlowBuilder`. Calls `validate()` first. Throws `ConfigValidationError` if the config is invalid. ```typescript const flow = JsonFlowBuilder.build(config, registry); await flow.run(shared); ``` The optional third argument `FlowClass` controls which class is instantiated. Pass a `FlowBuilder.extend([...plugins])` subclass to get plugin methods on the returned flow (see [Using plugins](#using-plugins)). ### `JsonFlowBuilder.validate(config, registry)` Validates structure and registry references without building. Returns **all** errors found — does not short-circuit on the first error. ```typescript const result = JsonFlowBuilder.validate(config, registry); if (!result.valid) { for (const err of result.errors) { console.error(`${err.path}: ${err.message}`); } } ``` `validate()` checks: 1. Structural shape of every step (correct `type`, required fields present) 2. All function references exist in the registry 3. Duplicate anchor names 4. Recursive validation of nested `body` and `processor` arrays Custom types registered via `registerStepBuilder()` are accepted without an "unknown step type" error. Built-in types still undergo full structural checks regardless. ### `JsonFlowBuilder.registerStepBuilder(type, builder)` Register a config-level builder for a custom step type. Mirrors `CoreFlowBuilder.registerStepType()` — the dispatch table follows the same pattern. ```typescript import type { StepConfigBuilder } from "flowneer/presets/config"; const sleepBuilder: StepConfigBuilder = (step, flow) => { flow.then(async () => new Promise((r) => setTimeout(r, (step as any).ms))); }; JsonFlowBuilder.registerStepBuilder("sleep", sleepBuilder); ``` The builder receives four arguments: | Argument | Type | Description | | ---------- | ------------------------------- | ----------------------------------------------------------------------------- | | `step` | `StepConfig & { type: string }` | Raw config object for this step | | `flow` | `FlowBuilder` | The flow being assembled — call builder methods on it | | `registry` | `FnRegistry` | Map of all registered functions | | `recurse` | `ApplyFn` | Helper to compile nested sub-steps (for container step types like loop/batch) | Use `recurse` when your custom step type contains nested `StepConfig[]` arrays: ```typescript JsonFlowBuilder.registerStepBuilder( "retryLoop", (step: any, flow, registry, recurse) => { flow.loop( registry[step.condition], (inner) => recurse(step.body, inner, registry), { label: step.label }, ); }, ); ``` `registerStepBuilder` is global — registered builders apply to all `JsonFlowBuilder.build()` calls in the process. *** ## Using plugins `build()` accepts an optional `FlowClass` constructor. Pass a subclass produced by `FlowBuilder.extend([...plugins])` to get plugin methods on the returned flow: ```typescript import { FlowBuilder } from "flowneer"; import { withTiming } from "flowneer/plugins/observability"; import { withRateLimit } from "flowneer/plugins/llm"; import { JsonFlowBuilder } from "flowneer/presets/config"; // Create your project's AppFlow once const AppFlow = FlowBuilder.extend([withTiming, withRateLimit]); // Pass it to every build() call const flow = JsonFlowBuilder.build(config, registry, AppFlow); // Plugin methods are available on the result flow.withTiming().withRateLimit(10); await flow.run(shared); ``` Config-driven flows get the same plugin surface as hand-written ones. A common pattern is to define a single project-wide `AppFlow` and always pass it to `build()`: ```typescript // appFlow.ts export const AppFlow = FlowBuilder.extend([ withTelemetry, withAuditLog, withCircuitBreaker, ]); // usage const flow = JsonFlowBuilder.build(config, registry, AppFlow); ``` ### Using plugins for compliance auditing ```typescript import { withAuditFlow, withRuntimeCompliance, } from "flowneer/plugins/compliance"; const CA = FlowBuilder.extend([withAuditFlow, withRuntimeCompliance]); const flow = JsonFlowBuilder.build( { steps: [ { type: "fn", fn: "fetchUser", label: "pii:user" }, { type: "fn", fn: "sendEmail", label: "external:send" }, ], }, registry, CA, ); // Static taint analysis — no run needed const report = flow.auditFlow([ { source: ["pii:*"], sink: ["external:*"], message: "PII must not leave the system", }, ]); console.log(report.passed, report.violations); ``` *** ## Examples ### Branch ```typescript const config = { steps: [ { type: "branch", router: "routeByScore", branches: { pass: "publishResult", retry: "refineResult", fail: "logFailure", }, }, ], }; ``` ### Loop (condition-based) ```typescript const config = { steps: [ { type: "loop", condition: "notDone", // registry key — returns boolean body: [ { type: "fn", fn: "generateDraft" }, { type: "fn", fn: "scoreResult" }, ], label: "refine-loop", }, ], }; const registry = { notDone: (s) => s.score < 0.9, generateDraft: async (s) => { /* ... */ }, scoreResult: async (s) => { /* ... */ }, }; ``` ### Anchor + goto (jump-based loop) Use `anchor` steps together with a step that returns `"#anchorName"` to implement arbitrary goto-style loops with an optional cycle guard: ```typescript const config = { steps: [ { type: "anchor", name: "refine", maxVisits: 5 }, { type: "fn", fn: "generateDraft" }, { type: "fn", fn: "checkScore" }, // returns "#refine" or undefined { type: "fn", fn: "publish" }, ], }; ``` ### Batch ```typescript const config = { steps: [ { type: "batch", items: "getDocumentList", processor: [ { type: "fn", fn: "summarizeDocument" }, { type: "fn", fn: "embedDocument" }, ], key: "__currentDoc", // defaults to "__batchItem" if omitted }, ], }; ``` Each processor step receives the current item on `shared[key]`. The key is restored to its previous value after the batch completes. Use distinct keys when batches are nested. ### Parallel ```typescript const config = { steps: [ { type: "parallel", fns: ["fetchProfile", "fetchOrders", "fetchPreferences"], label: "parallel:fetch", }, ], }; ``` All functions run concurrently against the same shared state. Results are merged when all fns complete. For conflict-safe merging use a reducer — this requires a hand-written `parallel` step or a custom step builder. *** ## Validation errors ```typescript export interface ValidationError { path: string; // dot-path to the problem, e.g. "$.steps[1].branches.fail" message: string; // human-readable description } export interface ValidationResult { valid: boolean; errors: ValidationError[]; } ``` ### `ConfigValidationError` Thrown by `build()` when validation fails. Contains the full error list. ```typescript import { ConfigValidationError } from "flowneer/presets/config"; try { const flow = JsonFlowBuilder.build(config, registry); } catch (err) { if (err instanceof ConfigValidationError) { console.error(err.message); // "FlowConfig validation failed:\n $.steps[0].fn: \"missingFn\" not found in registry" console.error(err.errors); // Array } } ``` *** ## Types ```typescript /** Recursive applicator passed to nested step builders. */ export type ApplyFn = ( steps: StepConfig[], flow: FlowBuilder, registry: FnRegistry, ) => void; /** A step config builder registered via registerStepBuilder(). */ export type StepConfigBuilder = ( step: StepConfig & { type: string }, flow: FlowBuilder, registry: FnRegistry, recurse: ApplyFn, ) => void; ``` `CustomStepBuilder` is kept as an alias for `StepConfigBuilder` for backwards compatibility. *** ## Notes * `build()` always produces a fresh flow instance — calling it twice with the same config produces two independent flows. * The registry is **not** validated for unused entries — only referenced keys must exist. * `registerStepBuilder()` is global — registered builders apply to all `build()` calls in the process. Built-in step types (`fn`, `branch`, `loop`, `batch`, `parallel`, `anchor`) can be overridden this way. * `validate()` is called automatically inside `build()`. Call it separately when you want to surface errors without constructing a flow (e.g. in a config editor UI). --- --- url: /flowneer/plugins/memory/kv-memory.md --- # KVMemory A key-value store for episodic or entity-level memory. Unlike `BufferWindowMemory` (which records a message log), `KVMemory` stores discrete named facts — user preferences, extracted entities, episodic knowledge — that persist across turns and can be serialised. ## Usage ```typescript import { KVMemory } from "flowneer/plugins/memory"; const kv = new KVMemory(); kv.set("user.name", "Alice"); kv.set("user.preference", "concise answers"); kv.set("last.topic", "machine learning"); console.log(kv.getValue("user.name")); // "Alice" console.log(kv.keys()); // ["user.name", "user.preference", "last.topic"] console.log(kv.size); // 3 console.log(kv.toContext()); // "- user.name: Alice\n- user.preference: concise answers\n- last.topic: machine learning" kv.delete("last.topic"); ``` ## Methods | Method | Signature | Description | | ------------------- | ------------------------------ | ------------------------------------------------------------ | | `set` | `(key, value: string) => void` | Store or overwrite a key-value pair | | `getValue` | `(key) => string \| undefined` | Retrieve a value by key | | `delete` | `(key) => boolean` | Delete a key; returns `true` if it existed | | `keys` | `() => string[]` | List all stored keys | | `size` | `number` (getter) | Number of entries | | `add` | `(msg: MemoryMessage) => void` | Memory interface compat — stores as `msg_N: content` | | `get` | `() => MemoryMessage[]` | Memory interface compat — returns entries as system messages | | `clear` | `() => void` | Remove all entries | | `toContext` | `() => string` | Bullet-style `"- key: value"` string | | `toJSON` | `() => string` | Serialise entire store to JSON | | `KVMemory.fromJSON` | `(json: string) => KVMemory` | Restore from JSON | ## Persistence Pattern Because `toJSON` / `fromJSON` are synchronous, KVMemory is easy to persist between sessions: ```typescript import fs from "fs"; import { KVMemory } from "flowneer/plugins/memory"; // Restore from disk const raw = fs.existsSync("memory.json") ? fs.readFileSync("memory.json", "utf8") : "{}"; const kv = KVMemory.fromJSON(raw); // … run flow … // Save back fs.writeFileSync("memory.json", kv.toJSON()); ``` ## With `withMemory` ```typescript import { FlowBuilder } from "flowneer"; import { withMemory, KVMemory } from "flowneer/plugins/memory"; const AppFlow = FlowBuilder.extend([withMemory]); const kv = new KVMemory(); const flow = new AppFlow().withMemory(kv).startWith(async (s) => { (s.__memory as KVMemory).set("last.intent", s.intent); const ctx = await s.__memory!.toContext(); s.response = await callLlm(buildPrompt(ctx, s.userInput)); }); ``` --- --- url: /flowneer/presets/pipeline/map-reduce-llm.md --- # mapReduceLlm Map-reduce over LLM calls: run `map` once per item from an `items` list, then run `reduce` once to aggregate all results. Core pattern for batch document processing, multi-source summarisation, and any workload that fans out across a list then fans back in. ## Import ```typescript import { mapReduceLlm } from "flowneer/presets/pipeline"; ``` ## Usage ```typescript interface SummariseState { documents: string[]; summaries: string[]; finalSummary: string; } const flow = mapReduceLlm({ items: (s) => s.documents, map: async (s) => { s.summaries ??= []; s.summaries.push(await llm(`Summarise: ${(s as any).__mapItem}`)); }, reduce: async (s) => { s.finalSummary = await llm( `Combine these summaries:\n${s.summaries.join("\n")}`, ); }, }); await flow.run({ documents: ["doc1...", "doc2..."], summaries: [], finalSummary: "", }); ``` ## Options | Option | Type | Default | Description | | --------- | --------------------------------------------- | ------------- | ----------------------------------------------------------- | | `items` | `(shared, params) => any[] \| Promise` | — | Returns the array of items to process | | `map` | `NodeFn` | — | Per-item step — `shared[itemKey]` holds the current item | | `reduce` | `NodeFn` | — | Aggregation step — runs once after all items are processed | | `itemKey` | `string` | `"__mapItem"` | Key under which the current item is exposed on shared state | ## How It Works Internally uses `.batch()` to iterate over `items`, placing each item on `shared[itemKey]` before calling `map`, then calls `reduce` once: ``` batch(items → shared[itemKey]) → map then → reduce ``` ## Return value Returns a `FlowBuilder`: ```typescript const flow = mapReduceLlm({ items, map, reduce }) .withCostTracker() .withTiming(); ``` ## See Also * [`generateUntilValid`](./generate-until-valid.md) — generate with retry on validation failure * [Batch Document Processing](../../recipes/batch-document-processing.md) — recipe using this pattern --- --- url: /flowneer/plugins/memory/overview.md --- # Memory — Overview The memory plugin family provides conversational and episodic memory for LLM-powered flows. All memory implementations share the `Memory` interface so they're interchangeable. ## The `Memory` Interface ```typescript interface Memory { add(message: MemoryMessage): void | Promise; get(): MemoryMessage[] | Promise; clear(): void | Promise; toContext(): string | Promise; } interface MemoryMessage { role: "system" | "user" | "assistant" | "tool"; content: string; meta?: Record; } ``` ## Available Implementations | Class | Description | | ------------------------------------------ | --------------------------------------------- | | [`BufferWindowMemory`](./buffer-window.md) | Keeps the last `k` messages | | [`KVMemory`](./kv-memory.md) | Key-value episodic / entity store | | [`SummaryMemory`](./summary-memory.md) | Compresses old messages via an LLM summarizer | ## Attaching Memory to a Flow Use `withMemory` to attach any Memory instance to `shared.__memory` before the flow starts: ```typescript import { FlowBuilder } from "flowneer"; import { withMemory, BufferWindowMemory } from "flowneer/plugins/memory"; const AppFlow = FlowBuilder.extend([withMemory]); const memory = new BufferWindowMemory({ maxMessages: 20 }); const flow = new AppFlow() .withMemory(memory) .startWith(async (s) => { s.__memory!.add({ role: "user", content: s.userInput }); const context = await s.__memory!.toContext(); s.response = await callLlm(context); s.__memory!.add({ role: "assistant", content: s.response }); }); ``` ## Multi-turn Conversation Pattern Memory instances are **stateful** and live outside the flow. Create them once and reuse across turns: ```typescript const memory = new BufferWindowMemory({ maxMessages: 30 }); async function chat(userInput: string): Promise { const state = { userInput, response: "" }; await flow.run(state); // flow uses the same memory instance each call return state.response; } ``` --- --- url: /flowneer/presets/agent/patterns.md --- # Multi-agent Helpers Factory functions that return pre-configured `FlowBuilder` instances implementing common multi-agent orchestration topologies. ## Import ```typescript import { supervisorCrew, sequentialCrew, hierarchicalCrew, roundRobinDebate, swarm, handoffTo, } from "flowneer/presets/agent"; ``` *** ## `supervisorCrew` A supervisor runs first to plan/set up context, workers execute in parallel, then the supervisor runs again to aggregate results. ```typescript const flow = supervisorCrew( // 1. Supervisor: plan async (s) => { s.outline = await planContent(s.topic); s.sections = {}; }, // 2. Workers: write sections in parallel [ async (s) => { s.sections.intro = await writeSection("Introduction", s.topic); }, async (s) => { s.sections.body = await writeSection("Main Body", s.topic); }, async (s) => { s.sections.outro = await writeSection("Conclusion", s.topic); }, ], // 3. Post step: aggregate { post: async (s) => { s.draft = [s.sections.intro, s.sections.body, s.sections.outro].join( "\n\n", ); }, // Optional: use reducer for safe parallel writes reducer: (shared, drafts) => { shared.sections = Object.assign({}, ...drafts.map((d) => d.sections)); }, }, ); await flow.run({ topic: "TypeScript", outline: [], sections: {}, draft: "" }); ``` ### Signature ```typescript function supervisorCrew( supervisor: NodeFn, workers: NodeFn[], options?: { post?: NodeFn; reducer?: (shared: S, drafts: S[]) => void; }, ): FlowBuilder; ``` *** ## `sequentialCrew` A strict pipeline: each step runs in order and passes its results to the next via shared state. ```typescript const flow = sequentialCrew([ async (s) => { s.research = await research(s.query); }, async (s) => { s.draft = await writeDraft(s.research); }, async (s) => { s.final = await editDraft(s.draft); }, ]); ``` ### Signature ```typescript function sequentialCrew(steps: NodeFn[]): FlowBuilder; ``` *** ## `hierarchicalCrew` A top-level manager delegates to sub-team flows (each is its own `FlowBuilder`). Teams run sequentially after the manager, then an optional aggregation step runs. ```typescript const researchTeam = supervisorCrew(researchSupervisor, researchWorkers); const writingTeam = sequentialCrew([drafter, editor, seoOptimizer]); const flow = hierarchicalCrew( async (s) => { s.plan = planTasks(s.input); }, // manager [researchTeam, writingTeam], // sub-teams async (s) => { s.output = mergeResults(s); }, // aggregation ); ``` ### Signature ```typescript function hierarchicalCrew( manager: NodeFn, teams: FlowBuilder[], aggregate?: NodeFn, ): FlowBuilder; ``` *** ## `roundRobinDebate` Each agent runs in sequence, repeated `rounds` times. Agents append their perspectives to shared state — producing a multi-turn collaborative output. ```typescript const flow = roundRobinDebate( [ async (s) => { s.debate.push({ agent: "optimist", text: await optimist(s) }); }, async (s) => { s.debate.push({ agent: "critic", text: await critic(s) }); }, async (s) => { s.debate.push({ agent: "synthesiser", text: await synth(s) }); }, ], 3, // 3 full rounds → 9 total turns ); await flow.run({ topic: "AI safety", debate: [] }); ``` The current round is tracked in `shared.__debateRound`. ### Signature ```typescript function roundRobinDebate( agents: NodeFn[], rounds: number, ): FlowBuilder; ``` *** ## `swarm` Decentralized peer-to-peer handoff: any agent can pass control to any other agent at runtime. No central manager. Agents call `handoffTo(shared, targetName, reason?)` inside their `fn` to request a handoff; the loop repeats until an agent finishes without handing off or `maxHandoffs` is reached. See **[swarm.md](./swarm.md)** for the full guide. ```typescript const flow = swarm([triageAgent, billingAgent, supportAgent], { defaultAgent: "triage", maxHandoffs: 5, }); await flow.run({ messages: [{ role: "user", content: "I need a refund" }] }); ``` ### Signature ```typescript function swarm( agents: SwarmAgent[], options: SwarmOptions, ): FlowBuilder; ``` *** ## Composition All pattern functions return a `FlowBuilder`, so they can be extended with additional plugins: ```typescript const flow = supervisorCrew(supervisor, workers, { post }) .withTiming() .withCostTracker(); ``` And nested inside larger flows: ```typescript const mainFlow = new FlowBuilder() .startWith(initialize) .then(async (s, p) => { await researchFlow.run(s, p); // inline sub-flow }) .then(finalize); ``` --- --- url: /flowneer/plugins/dev/atomic-updates.md --- # parallelAtomic A convenience wrapper around `.parallel()` with a required reducer. Enforces the safe draft-based parallel execution pattern where each function gets its own shallow clone of `shared`, preventing concurrent write conflicts. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withAtomicUpdates } from "flowneer/plugins/dev"; const AppFlow = FlowBuilder.extend([withAtomicUpdates]); ``` ## Usage ```typescript const flow = new AppFlow() .startWith(prepareData) .parallelAtomic( [ async (s) => { s.resultA = await processA(s.input); }, async (s) => { s.resultB = await processB(s.input); }, async (s) => { s.resultC = await processC(s.input); }, ], (shared, [draftA, draftB, draftC]) => { shared.resultA = draftA.resultA; shared.resultB = draftB.resultB; shared.resultC = draftC.resultC; }, ) .then(combineResults); ``` ## API ### `.parallelAtomic(fns, reducer, options?)` | Parameter | Type | Description | | --------- | ---------------------------------- | ------------------------------------- | | `fns` | `NodeFn[]` | Functions to execute concurrently | | `reducer` | `(shared: S, drafts: S[]) => void` | Merges draft results back into shared | | `options` | `NodeOptions` | Optional retries, delay, timeout | This is a thin alias for `.parallel(fns, options, reducer)` — the reducer parameter is positionally moved to the second argument to make its presence mandatory. ## Why Atomic Parallel? In plain `.parallel()`, all functions share the same `shared` reference — concurrent mutations can silently overwrite each other: ```typescript // ❌ Race condition: both fns might clobber shared.results flow.parallel([ async (s) => { s.results = await processA(); }, async (s) => { s.results = await processB(); }, ]); ``` With `parallelAtomic`, each function receives its own shallow draft. The reducer runs **after** all functions complete, providing a safe merge point: ```typescript // ✅ Safe: drafts are isolated flow.parallelAtomic( [ async (s) => { s.aResult = await processA(); }, async (s) => { s.bResult = await processB(); }, ], (shared, [dA, dB]) => { shared.aResult = dA.aResult; shared.bResult = dB.bResult; }, ); ``` --- --- url: /flowneer/plugins/output/parse-json.md --- # parseJsonOutput Extracts and parses JSON from LLM output, handling common formatting artifacts like markdown code fences and surrounding prose. ## Import ```typescript import { parseJsonOutput } from "flowneer/plugins/output"; ``` ## Usage ````typescript // Raw JSON const data = parseJsonOutput<{ name: string }>('{"name": "Alice"}'); // → { name: "Alice" } // Markdown-fenced JSON const data2 = parseJsonOutput( 'Sure! Here is the data:\n```json\n{"score": 8}\n```', ); // → { score: 8 } // JSON embedded in prose const data3 = parseJsonOutput( 'The result is {"status": "ok", "value": 42} as requested.', ); // → { status: "ok", value: 42 } // With a Zod validator import { z } from "zod"; const schema = z.object({ score: z.number(), label: z.string() }); const validated = parseJsonOutput(llmOutput, schema); // → type-safe: { score: number, label: string } ```` ## Signature ```typescript function parseJsonOutput( text: string, validator?: Validator, ): T; ``` ## Extraction Strategy 1. **Direct parse** — tries `JSON.parse(text)` first. 2. **Code fence stripping** — if that fails, looks for ` ```json ... ``` ` or ` ``` ... ``` ` blocks. 3. **Embedded JSON extraction** — finds the first `{` or `[` and last matching `}` or `]`, then parses what's between them. 4. **Throws** `parseJsonOutput: could not extract valid JSON from input` if all strategies fail. ## With `withStructuredOutput` ```typescript import { withStructuredOutput } from "flowneer/plugins/llm"; import { parseJsonOutput } from "flowneer/plugins/output"; .withStructuredOutput(MySchema, { parse: (raw) => parseJsonOutput(raw), // use the robust extractor as the parse fn }) ``` --- --- url: /flowneer/plugins/output/parse-list.md --- # parseListOutput Parses a bulleted, numbered, or newline-separated list from LLM output into a clean string array. ## Import ```typescript import { parseListOutput } from "flowneer/plugins/output"; ``` ## Usage ```typescript const items = parseListOutput("- apples\n- bananas\n- oranges"); // ["apples", "bananas", "oranges"] const items2 = parseListOutput("1. First item\n2. Second item\n3. Third item"); // ["First item", "Second item", "Third item"] const items3 = parseListOutput("• item one\n• item two"); // ["item one", "item two"] const items4 = parseListOutput("alpha\nbeta\ngamma"); // ["alpha", "beta", "gamma"] ``` ## Signature ```typescript function parseListOutput(text: string): string[]; ``` ## Supported Formats | Format | Example | | ---------------- | --------------------- | | Dash bullets | `- item` | | Asterisk bullets | `* item` | | Bullet character | `• item` | | Numbered (dot) | `1. item` | | Numbered (paren) | `1) item` | | Plain newlines | `item` (one per line) | Empty lines and whitespace-only entries are stripped from the result. ## In a Flow ```typescript .then(async (s) => { const raw = await callLlm( `List 5 keywords for: "${s.topic}". One per line, no numbering.` ); s.keywords = parseListOutput(raw); // s.keywords: ["AI", "machine learning", "deep learning", "NLP", "GPT"] }) ``` --- --- url: /flowneer/plugins/output/parse-table.md --- # parseMarkdownTable Parses a GitHub-Flavored Markdown table from LLM output into an array of objects, one per data row. ## Import ```typescript import { parseMarkdownTable } from "flowneer/plugins/output"; ``` ## Usage ```typescript const rows = parseMarkdownTable(` | Name | Age | Role | |-------|-----|------------| | Alice | 30 | Engineer | | Bob | 25 | Designer | | Carol | 35 | Manager | `); // [ // { Name: "Alice", Age: "30", Role: "Engineer" }, // { Name: "Bob", Age: "25", Role: "Designer" }, // { Name: "Carol", Age: "35", Role: "Manager" }, // ] ``` ## Signature ```typescript function parseMarkdownTable(text: string): Record[]; ``` Returns an empty array `[]` if the text contains no valid table. ## Notes * The first row is treated as the header. * The separator row (`|---|---|`) is automatically skipped. * Cell values are trimmed of whitespace. * All values are strings — apply type conversions (`Number()`, etc.) as needed. * Partial rows (fewer cells than headers) are handled gracefully with empty string defaults. ## In a Flow ```typescript .then(async (s) => { s.__llmOutput = await callLlm( `Compare these products in a markdown table with columns: Name, Price, Rating:\n${s.products}` ); }) .then((s) => { const rows = parseMarkdownTable(s.__llmOutput!); s.comparison = rows.map((r) => ({ name: r.Name, price: Number(r.Price.replace(/[^0-9.]/g, "")), rating: Number(r.Rating), })); }) ``` --- --- url: /flowneer/plugins/output/parse-regex.md --- # parseRegexOutput Extracts structured data from LLM text using regular expression capture groups. Supports both named groups and positional groups with explicit names. ## Import ```typescript import { parseRegexOutput } from "flowneer/plugins/output"; ``` ## Usage ### Named groups (recommended) ```typescript const result = parseRegexOutput( "Action: search Query: quantum computing", /Action:\s*(?\w+)\s+Query:\s*(?.+)/, ); // { action: "search", query: "quantum computing" } ``` ### Positional groups with names ```typescript const result = parseRegexOutput( "SCORE: 8/10", "SCORE:\\s*(\\d+)/(\\d+)", ["score", "total"], // maps group 1 → "score", group 2 → "total" ); // { score: "8", total: "10" } ``` ### No match ```typescript const result = parseRegexOutput("no match here", /Action:\s*(\w+)/); // null ``` ## Signature ```typescript function parseRegexOutput( text: string, pattern: RegExp | string, groups?: string[], ): Record | null; ``` ## Parameters | Parameter | Type | Description | | --------- | ------------------ | --------------------------------------------------------------- | | `text` | `string` | LLM output to parse | | `pattern` | `RegExp \| string` | Pattern with capture groups | | `groups` | `string[]` | Names for positional captures (used when no named groups exist) | Returns `null` if the pattern does not match. All captured values are strings — apply `Number()`, `parseFloat()`, etc. as needed. ## Fallback Indexing If neither named groups nor `groups` array is provided, positional captures are returned as `group_1`, `group_2`, etc.: ```typescript parseRegexOutput("foo 42 bar", /(\w+)\s+(\d+)/); // { group_1: "foo", group_2: "42" } ``` ## In a Flow ```typescript .then(async (s) => { s.__llmOutput = await callLlm(`Rate the sentiment (1-10): "${s.text}"`); }) .then((s) => { const match = parseRegexOutput(s.__llmOutput!, /Score:\s*(?\d+)/); s.score = match ? Number(match.score) : 5; // default 5 if no match }) ``` --- --- url: /flowneer/presets/pipeline.md --- # Pipeline Presets General-purpose LLM workflow patterns. Import from `flowneer/presets/pipeline`. | Preset | Description | | ------------------------------------------------- | -------------------------------------------------------------------- | | [`generateUntilValid`](./generate-until-valid.md) | Generate → validate → retry with error context | | [`mapReduceLlm`](./map-reduce-llm.md) | Fan out LLM calls across N items, then aggregate | | [`approvalGate`](./approval-gate.md) | Human approval / review gate with approve, reject, and edit outcomes | | [`clarifyLoop`](./clarify-loop.md) | Generate → evaluate → ask for clarification → retry (up to N rounds) | --- --- url: /flowneer/plugins/overview.md --- # Plugins Overview Flowneer ships with a focused set of plugin methods plus a smaller set of helper modules. Plugin methods follow the same pattern: create a subclass with `FlowBuilder.extend([...plugins])`, then call the added methods on any instance of that subclass. Helper utilities such as `createAgent`, crew patterns, and output parsers are imported directly and do not participate in plugin registration. ```typescript import { FlowBuilder } from "flowneer"; import { withTiming } from "flowneer/plugins/observability"; const AppFlow = FlowBuilder.extend([withTiming]); const flow = new AppFlow().withTiming().startWith(myStep); ``` ## Plugin Categories | Category | Surface | | ------------------ | ------------------------------------------------------------------------------------------------------- | | **LLM** | `withCostTracker`, `withRateLimit`, `withStructuredOutput`, `withTokenBudget` | | **Memory** | `withMemory`, `BufferWindowMemory`, `KVMemory`, `SummaryMemory` | | **Observability** | `withCallbacks`, `withHistory`, `withInterrupts`, `withTiming`, `withVerbose` | | **Persistence** | `withCheckpoint`, `withAuditLog`, `withReplay`, `withVersionedCheckpoint`, `withManualStepping` | | **Resilience** | `withCircuitBreaker`, `withTimeout`, `withFallback`, `withCycles` | | **Dev / Testing** | `withDryRun`, `withMocks`, `withStepLimit`, `withAtomicUpdates`, `withFlowAnalyzer`, `withPerfAnalyzer` | | **Agent plugins** | `withReActLoop`, `withHumanNode`, `resumeFlow` | | **Agent helpers** | `tool`, `createAgent`, `supervisorCrew`, `sequentialCrew`, `hierarchicalCrew`, `roundRobinDebate` | | **Tools** | `withTools`, `ToolRegistry`, `executeTool`, `executeTools` | | **Messaging** | `withChannels`, `withStream`, `emit`, `sendTo`, `receiveFrom` | | **Output helpers** | `parseJsonOutput`, `parseListOutput`, `parseRegexOutput`, `parseMarkdownTable` | | **Telemetry** | `withTelemetry`, `TelemetryDaemon`, `consoleExporter`, `otlpExporter` | | **Graph** | `withGraph`, `withExportGraph`, `withExportFlow` | | **Eval** | `runEvalSuite`, `exactMatch`, `containsMatch`, `f1Score`, `retrievalPrecision`, `retrievalRecall` | | **Compliance** | `withAuditFlow`, `withRuntimeCompliance`, `scanShared` | | **Config** | `JsonFlowBuilder`, `validate` | ## Import Paths Plugins are bundled under `flowneer/plugins/*`: ```typescript // LLM import { withCostTracker } from "flowneer/plugins/llm"; import { withRateLimit } from "flowneer/plugins/llm"; // Memory import { withMemory, BufferWindowMemory, KVMemory, SummaryMemory, } from "flowneer/plugins/memory"; // Observability import { withCallbacks, withHistory, withTiming, withVerbose, } from "flowneer/plugins/observability"; import { withInterrupts } from "flowneer/plugins/observability"; // Persistence import { withCheckpoint, withAuditLog, withReplay, withVersionedCheckpoint, withManualStepping, } from "flowneer/plugins/persistence"; // Resilience import { withCircuitBreaker, withTimeout, withFallback, withCycles, } from "flowneer/plugins/resilience"; // Dev import { withDryRun, withMocks, withStepLimit } from "flowneer/plugins/dev"; import { withAtomicUpdates } from "flowneer/plugins/dev"; // Agent plugins import { withReActLoop } from "flowneer/plugins/agent"; import { withHumanNode, resumeFlow } from "flowneer/plugins/agent"; // Agent helpers import { tool, createAgent } from "flowneer/plugins/agent"; import { supervisorCrew, sequentialCrew, hierarchicalCrew, roundRobinDebate, } from "flowneer/plugins/agent"; // Tools import { withTools, ToolRegistry, executeTool, executeTools, } from "flowneer/plugins/tools"; // Messaging import { withChannels, sendTo, receiveFrom } from "flowneer/plugins/messaging"; import { withStream, emit } from "flowneer/plugins/messaging"; // Output helpers import { parseJsonOutput } from "flowneer/plugins/output"; import { parseListOutput } from "flowneer/plugins/output"; import { parseRegexOutput } from "flowneer/plugins/output"; import { parseMarkdownTable } from "flowneer/plugins/output"; // Telemetry import { TelemetryDaemon, consoleExporter } from "flowneer/plugins/telemetry"; // Graph import { withGraph } from "flowneer/plugins/graph"; import { withExportGraph, withExportFlow } from "flowneer/plugins/graph"; // Eval import { runEvalSuite, exactMatch, f1Score } from "flowneer/plugins/eval"; // Compliance import { withAuditFlow, withRuntimeCompliance, scanShared, } from "flowneer/plugins/compliance"; // Dev — analysis import { withFlowAnalyzer } from "flowneer/plugins/dev"; import { withPerfAnalyzer } from "flowneer/plugins/dev"; import type { StepPerfStats, PerfReport, PerfAnalyzerOptions, } from "flowneer/plugins/dev"; // Config import { JsonFlowBuilder } from "flowneer/plugins/config"; ``` ## Shared State Conventions Many plugins use reserved keys on the shared state object. By convention, built-in plugin keys are prefixed with `__` to avoid collisions with application data. ### `AugmentedState` — automatic typing All plugin-provided `__*` keys are declared on the exported `AugmentedState` interface via TypeScript declaration merging. Extend your state with it to get every key typed and documented automatically: ```typescript import type { AugmentedState } from "flowneer"; interface MyState extends AugmentedState { topic: string; // your domain fields results: string[]; // __cost, __history, __tools … all typed automatically } ``` No additional imports or setup are needed. Augmentations are applied per-plugin when the plugin module is imported. ### Key reference | Key | Set by | | -------------------- | ---------------------------------------------- | | `__cost` | `withCostTracker` | | `__stepCost` | Your step (consumed by `withCostTracker`) | | `__llmOutput` | Your step (consumed by `withStructuredOutput`) | | `__structuredOutput` | `withStructuredOutput` | | `__validationError` | `withStructuredOutput` (on failure) | | `__memory` | `withMemory` | | `__history` | `withHistory` | | `__timings` | `withTiming` | | `__fallbackError` | `withFallback` | | `__tryError` | `withTryCatch` | | `__tools` | `withTools` | | `__channels` | `withChannels` | | `__stream` | `withStream` / `.stream()` | | `__humanPrompt` | `withHumanNode` | | `__humanFeedback` | Written by caller before `resumeFlow()` | | `__toolResults` | `withReActLoop` | | `__reactExhausted` | `withReActLoop` | | `__batchItem` | `.batch()` (configurable via `key`) | | `__perfStats` | `withPerfAnalyzer` (per-step stats array) | | `__perfReport` | `withPerfAnalyzer` (flow-level summary) | --- --- url: /flowneer/presets.md --- # Presets Presets are higher-level, opinionated building blocks that compose Flowneer's core primitives into common patterns. Unlike plugins (which add capabilities to a `FlowBuilder`), presets return a fully wired `FlowBuilder` ready to run. ## When to use presets vs plugins | | Plugins | Presets | | --------------- | -------------------------------- | --------------------------------------- | | What they are | Capabilities added to a flow | Pre-built flow templates | | Usage | `.withTiming()`, `.withMemory()` | `createAgent(...)`, `ragPipeline(...)` | | Extension point | `FlowBuilder.extend([...])` | Call the function, get a `FlowBuilder` | | Composable? | Yes — chain on any flow | Yes — extend the returned `FlowBuilder` | All presets return a `FlowBuilder`, so they compose freely with plugins and each other. ## Categories ### Agent High-level agent patterns — tool-calling agents and multi-agent orchestration topologies. ```typescript import { createAgent, tool, withReActLoop } from "flowneer/presets/agent"; import { supervisorCrew, sequentialCrew, hierarchicalCrew, } from "flowneer/presets/agent"; import { roundRobinDebate, reflexionAgent, planAndExecute, } from "flowneer/presets/agent"; ``` * [createAgent & tool()](./agent/create-agent.md) * [withReActLoop](./agent/react-loop.md) * [Multi-agent Patterns](./agent/patterns.md) ### Config Declarative JSON → FlowBuilder compiler. ```typescript import { JsonFlowBuilder } from "flowneer/presets/config"; ``` * [JsonFlowBuilder](./config/overview.md) ### RAG Retrieval-augmented generation flows. ```typescript import { ragPipeline, iterativeRag } from "flowneer/presets/rag"; ``` * [ragPipeline](./rag/rag-pipeline.md) * [iterativeRag](./rag/iterative-rag.md) ### Pipeline General-purpose LLM workflow patterns. ```typescript import { generateUntilValid, mapReduceLlm, approvalGate, clarifyLoop, } from "flowneer/presets/pipeline"; ``` * [generateUntilValid](./pipeline/generate-until-valid.md) * [mapReduceLlm](./pipeline/map-reduce-llm.md) * [approvalGate](./pipeline/approval-gate.md) * [clarifyLoop](./pipeline/clarify-loop.md) --- --- url: /flowneer/presets/rag.md --- # RAG Presets Ready-made retrieval-augmented generation flows. Import from `flowneer/presets/rag`. | Preset | Description | | ------------------------------------ | ------------------------------------------------------ | | [`ragPipeline`](./rag-pipeline.md) | Standard retrieve → \[augment] → generate pipeline | | [`iterativeRag`](./iterative-rag.md) | Multi-pass retrieval loop when one search isn't enough | --- --- url: /flowneer/presets/rag/rag-pipeline.md --- # ragPipeline Standard RAG (Retrieval-Augmented Generation) pipeline: **retrieve → \[augment] → generate**. The most common LLM pattern. `retrieve` writes context to shared state; `generate` reads it to produce the answer. Use the optional `augment` step to rerank or filter before generation. ## Import ```typescript import { ragPipeline } from "flowneer/presets/rag"; ``` ## Usage ```typescript interface RagState { query: string; context: string[]; answer: string; } const flow = ragPipeline({ retrieve: async (s) => { s.context = await vectorSearch(s.query); }, generate: async (s) => { s.answer = await llm(buildPrompt(s.query, s.context)); }, }); await flow.run({ query: "What is Flowneer?", context: [], answer: "" }); ``` ### With an augmentation step (reranking) ```typescript const flow = ragPipeline({ retrieve: async (s) => { s.context = await vectorSearch(s.query, { topK: 20 }); }, augment: async (s) => { s.context = await rerank(s.query, s.context, { topK: 5 }); }, generate: async (s) => { s.answer = await llm(buildPrompt(s.query, s.context)); }, }); ``` ## Options | Option | Type | Required | Description | | ---------- | -------------- | -------- | ---------------------------------------------------------- | | `retrieve` | `NodeFn` | ✓ | Fetches relevant documents and writes them to shared state | | `augment` | `NodeFn` | — | Optional reranking / filtering step | | `generate` | `NodeFn` | ✓ | Generates the final answer from the retrieved context | ## Return value Returns a `FlowBuilder` — all plugins and methods work normally on the result: ```typescript const flow = ragPipeline({ retrieve, generate }).withTiming().withCostTracker(); ``` ## See Also * [`iterativeRag`](./iterative-rag.md) — multi-pass retrieval when one search isn't enough --- --- url: /flowneer/recipes.md --- # Recipes End-to-end examples for common Flowneer patterns. Each recipe is self-contained and runnable — copy it, swap in your API keys, and go. ## Available recipes | Recipe | What it shows | | ----------------------------------------------------------- | ------------------------------------------------------------------------------- | | [Tool-calling Agent](./tool-calling-agent.md) | `createAgent` + `tool()`, OpenAI function calling, multi-turn loop | | [Blog Post Generator](./blog-post-generator.md) | Sequential LLM pipeline, structured output, cost tracking | | [Resilient API Pipeline](./resilient-api-pipeline.md) | Retry, timeout, circuit breaker, fallback | | [Streaming Chat Server](./streaming-chat-server.md) | `.stream()`, server-sent events, Bun HTTP | | [Batch Document Processing](./batch-document-processing.md) | `.batch()`, `.parallel()`, structured output, shared reducer | | [Human-in-the-loop](./human-in-the-loop.md) | `humanNode`, interrupt + resume, approval gates | | [Edge Runtime](./edge-runtime.md) | CF Workers, Vercel Edge, Deno Deploy — zero-config, streaming, telemetry caveat | --- --- url: /flowneer/recipes/resilient-api-pipeline.md --- # Resilient API Pipeline Fetch data from a flaky external API with automatic retries, a per-step timeout, a circuit breaker to stop hammering a dead endpoint, and a fallback to cached data when everything fails. **Plugins used:** `withCircuitBreaker`, `withTimeout`, `withFallback` (resilience), step-level `retries` + `delaySec` *** ## The code ```typescript import "dotenv/config"; import { FlowBuilder } from "flowneer"; import { withCircuitBreaker, withTimeout, withFallback, } from "flowneer/plugins/resilience"; import { withTiming } from "flowneer/plugins/observability"; const AppFlow = FlowBuilder.extend([ withCircuitBreaker, withFallback, withTiming, ]); // ─── State ─────────────────────────────────────────────────────────────────── interface PipelineState { productId: string; productData: Record | null; enrichedData: Record | null; finalRecord: Record | null; fromCache: boolean; __timings?: Record; } // ─── Helpers ───────────────────────────────────────────────────────────────── async function fetchProduct(id: string) { const res = await fetch(`https://api.example.com/products/${id}`, { signal: AbortSignal.timeout(3000), }); if (!res.ok) throw new Error(`HTTP ${res.status}`); return res.json(); } async function fetchCachedProduct(id: string) { // Fall back to a local cache / DB return { id, name: "Cached Product", price: 0, fromCache: true }; } async function enrichProduct(product: Record) { const res = await fetch( `https://api.enrich.example.com/enrich?id=${product.id}`, ); if (!res.ok) throw new Error(`Enrich HTTP ${res.status}`); return { ...product, ...(await res.json()) }; } // ─── Flow ──────────────────────────────────────────────────────────────────── const pipeline = new AppFlow() .withTiming() // Step 1 — Fetch product, retry up to 3×, 1 s delay, 5 s timeout // Circuit breaker trips after 5 consecutive failures across all runs .withCircuitBreaker({ failureThreshold: 5, resetTimeoutMs: 60_000 }) .withFallback(async (s) => { // Circuit open or all retries exhausted — use cache s.productData = (await fetchCachedProduct(s.productId)) as any; s.fromCache = true; }) .startWith( async (s) => { s.productData = await fetchProduct(s.productId); s.fromCache = false; }, { retries: 3, delaySec: 1, timeoutMs: 5_000 }, ) // Step 2 — Enrich (skip if we're working from cache) .then( async (s) => { if (s.fromCache) return; // no point enriching stale data s.enrichedData = await enrichProduct(s.productData!); }, { retries: 2, delaySec: 0.5, timeoutMs: 4_000 }, ) // Step 3 — Normalise into a final record .then(async (s) => { const base = s.enrichedData ?? s.productData ?? {}; s.finalRecord = { ...base, processedAt: new Date().toISOString(), source: s.fromCache ? "cache" : "live", }; }); // ─── Run ───────────────────────────────────────────────────────────────────── const state: PipelineState = { productId: "prod_123", productData: null, enrichedData: null, finalRecord: null, fromCache: false, }; await pipeline.run(state); console.log("Final record:", state.finalRecord); console.log("Step timings (ms):", state.__timings); if (state.fromCache) console.warn("Warning: data served from cache."); ``` *** ## Resilience layers explained ### Step-level retries + delay Pass `{ retries: 3, delaySec: 1 }` as the second argument to `.startWith()` or `.then()`. Flowneer retries the step up to 3 times total (not 3 additional — 1 attempt + 2 retries) with a 1 s gap. ```typescript .startWith(fetchFn, { retries: 3, delaySec: 1 }) ``` ### Per-step timeout `timeoutMs` wraps the step in a `Promise.race` against a rejection timer. If the step exceeds the limit, Flowneer throws `StepTimeoutError` (which triggers the retry/fallback chain). ```typescript .then(slowFn, { timeoutMs: 5_000 }) ``` ### Circuit breaker `withCircuitBreaker` tracks failures across all flow runs. After `failureThreshold` consecutive failures the circuit trips to **open** — subsequent runs skip the protected step(s) entirely (triggering the fallback) until `resetTimeoutMs` elapses. ```typescript .withCircuitBreaker({ failureThreshold: 5, resetTimeoutMs: 60_000 }) ``` ### Fallback `.withFallback(fn)` runs `fn` whenever any step in the flow throws an unhandled error — after all retries are exhausted and the circuit is open. Use it to write safe recovery logic. *** ## Variation — timeout at the flow level To abort the entire pipeline after a wall-clock deadline, pass an `AbortSignal`: ```typescript const controller = new AbortController(); setTimeout(() => controller.abort(), 10_000); // 10 s total budget await pipeline.run(state, {}, { signal: controller.signal }); ``` *** ## See also * [withCircuitBreaker](../plugins/resilience/circuit-breaker.md) * [withTimeout](../plugins/resilience/timeout.md) * [withFallback](../plugins/resilience/fallback.md) * [withTiming](../plugins/observability/timing.md) --- --- url: /flowneer/api-examples.md --- # Runtime API Examples This page demonstrates usage of some of the runtime APIs provided by VitePress. The main `useData()` API can be used to access site, theme, and page data for the current page. It works in both `.md` and `.vue` files: ```md ## Results ### Theme Data
{{ theme }}
### Page Data
{{ page }}
### Page Frontmatter
{{ frontmatter }}
``` ## Results ### Theme Data ### Page Data ### Page Frontmatter ## More Check out the documentation for the [full list of runtime APIs](https://vitepress.dev/reference/runtime-api#usedata). --- --- url: /flowneer/core/step-types.md --- # Step Types Flowneer provides six built-in step primitives. Every step in a flow is one of these types. ## `fn` — Plain step Created by `.startWith()` and `.then()`. The workhorse of most flows. ```typescript flow .startWith(async (s) => { s.fetchedData = await fetch(s.url).then((r) => r.json()); }) .then(async (s) => { s.processed = transform(s.fetchedData); }); ``` **Return value routing:** If a `fn` step returns a string beginning with `#`, it's treated as a goto jump to a named anchor: ```typescript .then(async (s) => { if (s.needsRetry) return "#retry"; // jump to the anchor named "retry" }) ``` **Async generator steps:** A step declared as `async function*` yields token chunks and returns an optional routing string: ```typescript .then(async function* (s) { for await (const token of streamLlm(s.prompt)) { s.response += token; yield token; // → forwarded as a "chunk" event to .stream() } // optionally: return "#anchorName" }) ``` *** ## `branch` — Conditional routing Created by `.branch()`. A router function returns a key; the matching branch function executes. ```typescript flow.branch( (s) => s.intent, // "buy" | "refund" | anything else → "default" { buy: handleBuy, refund: handleRefund, default: handleGeneral, }, ); ``` * If the router returns a key not in `branches`, the `"default"` branch (if present) runs. * The `retries` / `delaySec` options apply to both the router and the selected branch function. * Branch functions can return `"#anchorName"` for goto. *** ## `loop` — While loop Created by `.loop()`. Runs the inner flow body repeatedly while the condition holds. ```typescript flow.loop( (s) => !s.isDone, (b) => b.startWith(async (s) => { const result = await pollApi(s.jobId); s.isDone = result.status === "complete"; }), ); ``` The condition is checked **before** each body execution (pre-condition loop). Hook middleware (`wrapStep`, etc.) does **not** wrap the entire loop — it wraps each step within the body individually. *** ## `batch` — Sequential item processing Created by `.batch()`. Runs the inner flow for each item from `items`. ```typescript flow.batch( (s) => s.emails, // extract list from shared state (b) => b.startWith(async (s) => { const email = s.__batchItem as Email; await sendEmail(email); }), ); ``` Key behaviour: * Items are processed **sequentially** (not in parallel — use `.parallel()` for that). * The current item is stored on `shared[key]` (defaults to `"__batchItem"`). * The `key` property is cleaned up from `shared` after the batch completes. * Nested batches require distinct `key` values to avoid collisions. *** ## `parallel` — Concurrent execution Created by `.parallel()`. Runs all provided functions concurrently with `Promise.all`. ```typescript flow.parallel([ async (s) => { s.a = await fetchA(); }, async (s) => { s.b = await fetchB(); }, async (s) => { s.c = await fetchC(); }, ]); ``` **Safe mode (with reducer):** Each function operates on its own shallow draft. The reducer merges drafts back into `shared` after all complete — safe against concurrent write races: ```typescript flow.parallel([workerA, workerB], undefined, (shared, [draftA, draftB]) => { shared.results = [...(draftA.results ?? []), ...(draftB.results ?? [])]; }); ``` The `retries` and `delaySec` options apply per-function. Each branch also checks the flow's `AbortSignal` on entry, so aborting the parent flow takes effect as each running branch completes rather than waiting for all to settle. *** ## `anchor` — Named jump target Created by `.anchor()`. A pure no-op marker — nothing executes at an anchor step. Its only purpose is to provide a goto target. ```typescript flow .anchor("start") .then(async (s) => { s.count++; if (s.count < 5) return "#start"; // loop back }) .then(finalize); ``` Anchors are automatically detected before flow execution. Jumping to a non-existent anchor throws `Error: goto target anchor "name" not found`. *** ## NodeOptions All `fn`, `branch`, `loop`, `batch`, and `parallel` steps accept `NodeOptions`: ```typescript interface NodeOptions { retries?: NumberOrFn; // total attempts, default 1 delaySec?: NumberOrFn; // seconds between retries, default 0 timeoutMs?: NumberOrFn; // per-step timeout ms, default 0 (disabled) label?: string; // human-readable name for this step } ``` `NumberOrFn` means you can pass a plain number or a `(shared, params) => number` callback for dynamic resolution: ```typescript .then(step, { retries: (s) => (s.isHighPriority ? 5 : 2), timeoutMs: 10_000, label: "fetchUser", }) ``` ### Step labels The `label` option is available on every step type and serves two purposes: * **Observability** — `meta.label` is populated in all hook callbacks (`beforeStep`, `afterStep`, `onError`, `wrapStep`), making it easy to identify which step is running without relying on index numbers. * **Error messages** — when a labelled step throws, the `FlowError` message includes the label: `"fetchUser" step 2` instead of `step 2`. ```typescript flow .then(fetchUser, { label: "fetchUser" }) .branch( routeByIntent, { buy: handleBuy, refund: handleRefund }, { label: "intentRouter" }, ) .loop( (s) => !s.done, (b) => b.then(poll), { label: "pollLoop" }, ) .batch( (s) => s.items, (b) => b.then(process), { label: "itemBatch" }, ) .parallel([workerA, workerB], { label: "parallelFetch" }); ``` --- --- url: /flowneer/core/streaming.md --- # Streaming Flowneer supports two complementary streaming APIs: 1. **`.stream()` on `FlowBuilder`** — pull-based async generator yielding structured `StreamEvent` objects. 2. **Generator step functions** — push token chunks from inside a step. 3. **`.withStream()` plugin** — push-based subscriber registered on `shared.__stream`. *** ## FlowBuilder.stream() `.stream()` is the recommended API when consuming a flow from the outside. It yields events as the flow runs. The same flow instance can be streamed multiple times — hook state is scoped to each individual call and cleaned up automatically when the generator returns or is abandoned: ```typescript for await (const event of flow.stream(shared)) { switch (event.type) { case "step:before": console.log(`→ step ${event.meta.index} starting`); break; case "step:after": console.log(`✓ step ${event.meta.index} done`); break; case "chunk": process.stdout.write(String(event.data)); break; case "error": console.error("Flow error:", event.error); break; case "done": console.log("\nFlow complete"); break; } } ``` ### StreamEvent types | `type` | Payload | When | | ------------- | ---------------- | --------------------------------------- | | `step:before` | `meta: StepMeta` | Before each step body executes | | `step:after` | `meta, shared` | After each step body completes | | `chunk` | `data: unknown` | Yielded by a generator step or `emit()` | | `error` | `error: unknown` | Unhandled error during the flow | | `done` | — | After `afterFlow` hooks complete | *** ## Generator Step Functions Declare a step as `async function*` to yield chunks in real-time. Each `yield` value is forwarded as a `chunk` event. The generator's final `return` value is used for anchor routing (just like plain steps). ```typescript const flow = new FlowBuilder<{ prompt: string; response: string }>().startWith( async function* (s) { s.response = ""; for await (const token of callLlmStream(s.prompt)) { s.response += token; yield token; // → "chunk" event } // return "#anchorName" // optional routing }, ); for await (const event of flow.stream({ prompt: "Hello", response: "" })) { if (event.type === "chunk") process.stdout.write(String(event.data)); } ``` *** ## withStream Plugin The `withStream` plugin registers a **push-based subscriber** on `shared.__stream`. Call `emit(shared, chunk)` from any step to trigger it: ```typescript import { FlowBuilder } from "flowneer"; import { withStream, emit } from "flowneer/plugins/messaging"; const AppFlow = FlowBuilder.extend([withStream]); const flow = new AppFlow() .withStream((chunk) => { process.stdout.write(String(chunk)); }) .startWith(async (s) => { for await (const token of streamLlm(s.prompt)) { s.response += token; emit(s, token); // triggers the subscriber } }); ``` `emit()` is a safe no-op when no subscriber is registered. The subscriber is stored on `shared.__stream` so it is **inherited by sub-flows** (inside `.loop()`, `.batch()`, etc.) automatically. *** ## HTTP Streaming Server Both APIs compose well with HTTP streaming. Here's a minimal Bun server example: ```typescript import { FlowBuilder } from "flowneer"; const flow = new FlowBuilder<{ prompt: string; response: string }>().startWith( async function* (s) { s.response = ""; for await (const token of streamLlm(s.prompt)) { s.response += token; yield token; } }, ); Bun.serve({ port: 3000, async fetch(req) { const { prompt } = await req.json(); const shared = { prompt, response: "" }; const stream = new ReadableStream({ async start(controller) { for await (const event of flow.stream(shared)) { if (event.type === "chunk") { controller.enqueue(new TextEncoder().encode(String(event.data))); } if (event.type === "done") controller.close(); } }, }); return new Response(stream, { headers: { "Content-Type": "text/plain; charset=utf-8" }, }); }, }); ``` --- --- url: /flowneer/recipes/streaming-chat-server.md --- # Streaming Chat Server Build an HTTP server that streams LLM token chunks to the browser using [Server-Sent Events (SSE)](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events). Each request runs a Flowneer flow via `.stream()`, forwarding every `chunk` event to the client in real time. **Plugins used:** none — generator steps + `.stream()` (core API) *** ## The code ```typescript import "dotenv/config"; import { FlowBuilder } from "flowneer"; import { OpenAI } from "openai"; const openai = new OpenAI(); // ─── State ─────────────────────────────────────────────────────────────────── interface ChatState { userMessage: string; history: Array<{ role: "user" | "assistant"; content: string }>; reply: string; } // ─── Flow ──────────────────────────────────────────────────────────────────── const chatFlow = new FlowBuilder() .startWith(async (s) => { s.history.push({ role: "user", content: s.userMessage }); }) .then(async function* streamLlm(s) { // Use OpenAI streaming — yield each text delta as a chunk const stream = await openai.chat.completions.create({ model: "gpt-4o-mini", messages: [ { role: "system", content: "You are a helpful assistant." }, ...s.history, ], stream: true, }); let full = ""; for await (const chunk of stream) { const delta = chunk.choices[0]?.delta?.content ?? ""; if (delta) { full += delta; yield delta; // ← each token becomes a `chunk` event via .stream() } } s.reply = full; }) .then((s) => { s.history.push({ role: "assistant", content: s.reply }); }); // ─── Server ─────────────────────────────────────────────────────────────────── Bun.serve({ port: 3000, async fetch(req) { const url = new URL(req.url); // ── SSE endpoint ──────────────────────────────────────────────────────── if (url.pathname === "/chat" && req.method === "POST") { const { message } = (await req.json()) as { message: string }; const state: ChatState = { userMessage: message, history: [], reply: "", }; const encoder = new TextEncoder(); const body = new ReadableStream({ async start(controller) { for await (const event of chatFlow.stream(state)) { if (event.type === "chunk") { // SSE format: "data: \n\n" const line = `data: ${JSON.stringify(event.data)}\n\n`; controller.enqueue(encoder.encode(line)); } if (event.type === "error") { controller.enqueue( encoder.encode( `data: ${JSON.stringify({ error: String(event.error) })}\n\n`, ), ); } if (event.type === "done") { controller.enqueue(encoder.encode("data: [DONE]\n\n")); controller.close(); } } }, }); return new Response(body, { headers: { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", Connection: "keep-alive", "Access-Control-Allow-Origin": "*", }, }); } // ── Simple HTML client ──────────────────────────────────────────────── if (url.pathname === "/" || url.pathname === "/index.html") { return new Response(htmlClient, { headers: { "Content-Type": "text/html" }, }); } return new Response("Not found", { status: 404 }); }, }); console.log("Streaming chat server running at http://localhost:3000"); // ─── Minimal browser client ─────────────────────────────────────────────────── const htmlClient = ` Flowneer Chat
`; ``` *** ## How `.stream()` works with async generators Flowneer's `.stream()` listens for two signal types: | What you do in a step | What the consumer sees | | -------------------------------------------- | -------------------------------------- | | `emit(s, value)` | `{ type: "chunk", data: value }` | | `yield value` from an `async function*` step | `{ type: "chunk", data: value }` | | Step starts | `{ type: "step:before", meta }` | | Step ends | `{ type: "step:after", meta, shared }` | | Any thrown error | `{ type: "error", error }` | | Flow completes | `{ type: "done" }` | The `streamLlm` step above is an `async function*` — every `yield` is automatically forwarded as a chunk. This avoids manual `emit()` calls inside generator steps. ## Variation — Express / Node.js Replace `Bun.serve` with Express: ```typescript import express from "express"; const app = express(); app.use(express.json()); app.post("/chat", async (req, res) => { res.setHeader("Content-Type", "text/event-stream"); res.setHeader("Cache-Control", "no-cache"); res.flushHeaders(); const state: ChatState = { userMessage: req.body.message, history: [], reply: "", }; for await (const event of chatFlow.stream(state)) { if (event.type === "chunk") res.write(`data: ${JSON.stringify(event.data)}\n\n`); if (event.type === "done") { res.write("data: [DONE]\n\n"); res.end(); } } }); app.listen(3000); ``` *** ## See also * [withStream & emit()](../plugins/messaging/stream.md) * [Streaming — core API](../core/streaming.md) --- --- url: /flowneer/plugins/memory/summary-memory.md --- # SummaryMemory A memory implementation that automatically compresses old messages into a running summary when the buffer grows too large. Ideal for long conversations that must stay within an LLM's context window. ## Usage ```typescript import { SummaryMemory } from "flowneer/plugins/memory"; import { callLlm } from "./utils/callLlm"; const memory = new SummaryMemory({ maxMessages: 10, summarize: async (messages) => { const text = messages.map((m) => `${m.role}: ${m.content}`).join("\n"); return callLlm(`Summarize this conversation concisely:\n${text}`); }, }); ``` ## Constructor Options | Option | Type | Required | Default | Description | | ------------- | ------------------------------------- | -------- | ------- | -------------------------------------------- | | `summarize` | `(msgs) => string \| Promise` | ✅ | — | Summarisation function (usually an LLM call) | | `maxMessages` | `number` | | `10` | Number of recent messages to keep verbatim | ## Methods | Method | Signature | Description | | ----------- | ------------------------------ | --------------------------------------------------- | | `add` | `async (msg) => Promise` | Append a message; may trigger summarisation | | `get` | `() => MemoryMessage[]` | Current messages, prepended with summary if present | | `clear` | `() => void` | Remove all messages and the running summary | | `toContext` | `() => string` | Formatted context: summary block + recent messages | > **Note:** `add()` is async because it may call your `summarize` function. Await it in your steps. ## Compression Behaviour When `messages.length > maxMessages`: 1. The **oldest half** of messages is passed to `summarize`. 2. If a previous summary exists it is prepended as context for the new summary. 3. The compressed messages are replaced by a single `[Summary] ...` system message. This keeps the conversation context manageable for long sessions. ## Example with `withMemory` ```typescript import { FlowBuilder } from "flowneer"; import { withMemory, SummaryMemory } from "flowneer/plugins/memory"; const AppFlow = FlowBuilder.extend([withMemory]); const memory = new SummaryMemory({ maxMessages: 8, summarize: (msgs) => callLlm( "Summarize:\n" + msgs.map((m) => `${m.role}: ${m.content}`).join("\n"), ), }); const flow = new AppFlow() .withMemory(memory) .startWith(async (s) => { await s.__memory!.add({ role: "user", content: s.userInput }); const ctx = await s.__memory!.toContext(); s.response = await callLlm(`${ctx}\nassistant:`); await s.__memory!.add({ role: "assistant", content: s.response }); }); ``` --- --- url: /flowneer/presets/agent/swarm.md --- # swarm Decentralized multi-agent preset where any agent can hand off to any other agent at runtime. Unlike `supervisorCrew` or `hierarchicalCrew`, there is no central manager — routing emerges from the agents' own logic. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { swarm, handoffTo, historyText } from "flowneer/presets/agent"; import type { SwarmAgent, SwarmState, SwarmOptions, RouterContext, SwarmRouter, } from "flowneer/presets/agent"; ``` *** ## Quick start ```typescript interface MyState extends SwarmState { messages: { role: "user" | "assistant"; content: string; agent?: string }[]; } const triageAgent: SwarmAgent = { name: "triage", description: "Routes incoming requests to the right specialist", fn: async (shared) => { const topic = classifyMessage(shared.messages); if (topic === "billing") { handoffTo(shared, "billing", "billing question detected"); } else { handoffTo(shared, "support"); } }, }; const billingAgent: SwarmAgent = { name: "billing", description: "Handles billing and payment queries", fn: async (shared) => { const reply = await billingLlm(shared.messages); shared.messages.push({ role: "assistant", content: reply, agent: "billing", }); // No handoffTo → turn ends here }, }; const supportAgent: SwarmAgent = { name: "support", description: "General customer support", fn: async (shared) => { const reply = await supportLlm(shared.messages); shared.messages.push({ role: "assistant", content: reply, agent: "support", }); }, }; const flow = swarm([triageAgent, billingAgent, supportAgent], { defaultAgent: "triage", }); await flow.run({ messages: [{ role: "user", content: "I was charged twice last month" }], }); ``` *** ## `handoffTo(shared, agentName, reason?)` Call `handoffTo` inside any agent's `fn` to request a handoff to another agent. ```typescript // yield to another agent handoffTo(shared, "billing"); // with an optional reason (available in onHandoff callback) handoffTo(shared, "billing", "bill-related query"); ``` * If `agentName` is not in the swarm the handoff is silently dropped and the turn ends. * Only one handoff per agent invocation is honoured — set `__swarmHandoff` is only read once per loop iteration. *** ## Shared state `swarm()` reads and writes the following fields on `shared`. Declare them in your state type using `SwarmState`: | Field | Type | Description | | ---------------- | ----------------------------- | ---------------------------------------------------------------------------------- | | `currentAgent` | `string \| undefined` | Name of the agent that will run next. Set to `defaultAgent` on the very first run. | | `messages` | `SwarmMessage[] \| undefined` | Conversation history — managed by your agent fns. | | `turnCount` | `number \| undefined` | Number of handoffs accepted in the current `.run()` call. Reset to `0` each run. | | `__swarmHandoff` | `{ target, reason? }` | **Internal** — set by `handoffTo`, consumed by the loop. Never read this directly. | | `__swarmDone` | `boolean` | **Internal** — loop-exit sentinel. Deleted after each run. | `currentAgent` **persists between `.run()` calls** — the swarm remembers which agent last had control. This supports multi-turn conversations where the same specialist handles follow-up messages. *** ## `swarm(agents, options)` — signature ```typescript function swarm( agents: SwarmAgent[], options: SwarmOptions, ): FlowBuilder; ``` *** ## `SwarmAgent` ```typescript interface SwarmAgent { name: string; // unique identifier used in handoffTo() description: string; // human-readable; can be injected into an LLM prompt fn: NodeFn; // standard Flowneer step function } ``` *** ## `SwarmOptions` | Option | Type | Default | Description | | --------------- | ----------------------------------------------------- | ------- | -------------------------------------------------------- | | `defaultAgent` | `string` | — | **Required.** Starting agent on first `.run()`. | | `maxHandoffs` | `number` | `5` | Max hops per `.run()` call. The first agent run is free. | | `onHandoff` | `(from, to, reason, shared) => void \| Promise` | — | Called on every accepted handoff. | | `onMaxHandoffs` | `(shared) => void \| Promise` | — | Called instead of the handoff when the limit is reached. | `swarm()` throws at **construction time** if `defaultAgent` is not in the `agents` array. ### `onHandoff` example Use `onHandoff` for observability, audit logging, or side-effects triggered on every accepted handoff: ```typescript const flow = swarm(agents, { defaultAgent: "triage", onHandoff: async (from, to, reason, shared) => { // Structured audit log console.log( JSON.stringify({ event: "swarm_handoff", from, to, reason, turnCount: shared.turnCount, messageCount: shared.messages?.length ?? 0, ts: Date.now(), }), ); // Optionally append a system note to the conversation history shared.messages?.push({ role: "assistant", content: `[Transferring you to the ${to} team${reason ? ` — ${reason}` : ""}]`, agent: "system", }); }, }); ``` `onHandoff` fires **before** `turnCount` is incremented and **before** `currentAgent` is updated to the new agent. This means `shared.currentAgent` still points to the agent that is handing off, and `shared.turnCount` reflects the number of hops accepted so far (not including this one). *** ## LLM router Pass a `router` object in `SwarmOptions` to let an LLM choose the starting agent on each `.run()` call. The router runs **once per run**, before the handoff loop begins. | Option | Type | Description | | -------- | ------------------------------------------------------------------ | ----------------------------------------------------- | | `call` | `(prompt: string) => Promise` | **Required.** Calls the LLM and returns a raw string. | | `prompt` | `string \| ((ctx: RouterContext) => string \| Promise)` | Custom prompt. Defaults to a built-in routing prompt. | ```typescript import { swarm } from "flowneer/presets/agent"; import OpenAI from "openai"; const client = new OpenAI(); const flow = swarm(agents, { defaultAgent: "triage", router: { call: async (prompt) => { const res = await client.chat.completions.create({ model: "gpt-4o-mini", messages: [{ role: "user", content: prompt }], }); return res.choices[0].message.content ?? ""; }, }, }); ``` ### Custom prompt Supply `prompt` as an async function to build context-aware routing prompts: ```typescript const flow = swarm(agents, { defaultAgent: "triage", router: { call: myLlm, prompt: async ({ messages, agents }) => { const agentList = agents .map((a) => `- ${a.name}: ${a.description}`) .join("\n"); const latest = messages.at(-1)?.content ?? "(none)"; return `Pick the best agent for this message.\n\nAgents:\n${agentList}\n\nLatest message:\n${latest}\n\nRespond with only the agent name.`; }, }, }); ``` `RouterContext` fields: | Field | Type | Description | | -------------- | ----------------- | ----------------------------------------------------- | | `messages` | `SwarmMessage[]` | Conversation history at the time of routing. | | `agents` | `SwarmAgent[]` | All agents registered in the swarm. | | `currentAgent` | `string` | Fallback agent if the router returns an unknown name. | | `shared` | `S` | Live shared state — mutations are visible downstream. | > **Note:** The router response is trimmed and matched **case-insensitively** against agent names. An unrecognised response is silently ignored and `currentAgent` remains unchanged. *** ## `historyText(messages)` Utility that formats a `SwarmMessage[]` into a plain-text string for use in LLM prompts. ```typescript historyText(messages: SwarmMessage[]): string ``` Each message is formatted as `[agentName] role: content`; the `[agentName]` prefix is omitted when `message.agent` is undefined. ```typescript import { historyText } from "flowneer/presets/agent"; const billingAgent: SwarmAgent = { name: "billing", description: "Handles billing queries", fn: async (shared) => { const history = historyText(shared.messages ?? []); const reply = await myLlm(`${history}\nAnswer the billing question.`); shared.messages?.push({ role: "assistant", content: reply, agent: "billing", }); }, }; ``` *** ## Lifecycle per `.run()` call ``` init (set currentAgent if unset, reset turnCount) │ └─ loop while !done ├─ dispatch → agent.fn(shared, params) └─ handoff check ├─ no __swarmHandoff → done = true ├─ unknown target → done = true ├─ turnCount ≥ max → onMaxHandoffs() → done = true └─ accepted → onHandoff() → turnCount++ → update currentAgent │ cleanup (delete __swarmDone) ``` *** ## Max-handoffs behaviour ```typescript const flow = swarm(agents, { defaultAgent: "triage", maxHandoffs: 3, onMaxHandoffs: async (shared) => { shared.messages!.push({ role: "assistant", content: "Sorry, we couldn't route your request. Please try again.", }); }, }); ``` When `turnCount` reaches `maxHandoffs` the requested handoff is rejected, `onMaxHandoffs` is called, and the turn ends. The `currentAgent` is **not** updated — the agent that tried to hand off remains active for the next `.run()` call. *** ## Internal step labels Every internal step in `swarm()` carries a stable `swarm:*` label, making them individually targetable via `StepFilter`: | Label | Description | | ---------------- | --------------------------------------------------------------- | | `swarm:init` | Per-run init — sets `currentAgent` if unset, resets `turnCount` | | `swarm:router` | Optional LLM router step (no-op when `router` is not set) | | `swarm:loop` | The handoff loop itself | | `swarm:dispatch` | Agent `fn` invocation — fires once per loop iteration | | `swarm:handoff` | Handoff check, `onHandoff` / `onMaxHandoffs` callbacks | | `swarm:cleanup` | Cleanup — deletes `__swarmDone` after the loop exits | ```typescript import { withTiming } from "flowneer/plugins/observability"; const AppFlow = FlowBuilder.extend([withTiming]); const flow = swarm(agents, { defaultAgent: "triage" }, AppFlow); (flow as InstanceType).withTiming(["swarm:dispatch"]); // only time agent fn calls // .withTiming(["swarm:*"]) // time every internal step // .withTiming(["!swarm:*"]) // exclude all swarm internals ``` *** ## Internal step labels Every internal step in `swarm()` carries a stable `swarm:*` label that appears in `StepMeta` (error messages, `beforeStep`/`afterStep` hooks): | Label | Description | Reachable via outer hooks | | ---------------- | ---------------------------------------- | ------------------------- | | `swarm:init` | Per-run initialisation | ✅ yes | | `swarm:router` | Optional LLM router call | ✅ yes | | `swarm:loop` | Handoff loop (the `.loop()` step itself) | ✅ yes | | `swarm:dispatch` | Agent function invocation | ❌ loop-body only | | `swarm:handoff` | Handoff resolution | ❌ loop-body only | | `swarm:cleanup` | Post-loop cleanup | ✅ yes | `swarm:dispatch` and `swarm:handoff` run inside the loop body's own `FlowBuilder` instance. Plugins and `StepFilter` registered on the outer flow (via the `FlowClass` parameter) can target `swarm:init`, `swarm:router`, `swarm:loop`, and `swarm:cleanup`. The inner steps still carry their labels and appear correctly in error messages — they are just not reached by hooks on the outer flow. ```ts const AppFlow = FlowBuilder.extend([ withTiming(["swarm:loop", "swarm:router"]), ]); const flow = swarm(agents, options, AppFlow); flow.withTiming(); // times the loop + router steps ``` *** ## Composing with plugins Pass a `FlowBuilder.extend()` subclass as the optional third argument to apply plugins to the swarm's internal steps: ```typescript import { withTiming } from "flowneer/plugins/observability"; import { withRateLimit } from "flowneer/plugins/llm"; const AppFlow = FlowBuilder.extend([withTiming, withRateLimit]); const flow = swarm(agents, { defaultAgent: "triage" }, AppFlow); (flow as InstanceType) .withTiming() .withRateLimit({ intervalMs: 500 }); await flow.run(shared); ``` Plugin hooks (`beforeStep`, `afterStep`, etc.) will fire on every internal swarm step — the init, router, each agent dispatch, the handoff check, and the cleanup. *** ## Providing agent descriptions to LLMs The `description` field on each `SwarmAgent` is intentionally available for use inside your agent fns — for example, to build a routing prompt: ```typescript const agents = [triageAgent, billingAgent, supportAgent]; const agentMenu = agents.map((a) => `- ${a.name}: ${a.description}`).join("\n"); const triageAgent: SwarmAgent = { name: "triage", description: "Routes incoming requests to the right specialist", fn: async (shared) => { const target = await routerLlm(shared.messages, agentMenu); handoffTo(shared, target, "routed by triage"); }, }; ``` --- --- url: /flowneer/plugins/telemetry/overview.md --- # TelemetryDaemon A lightweight background daemon that collects per-step spans and exports them in batches. Completely external to core — wired via the `FlowHooks` API with zero coupling. ## Import ```typescript import { TelemetryDaemon, consoleExporter } from "flowneer/plugins/telemetry"; ``` ## Usage ```typescript import { TelemetryDaemon, consoleExporter } from "flowneer/plugins/telemetry"; import { FlowBuilder } from "flowneer"; const telemetry = new TelemetryDaemon({ exporter: consoleExporter, flushIntervalMs: 5000, maxBuffer: 100, }); const flow = new FlowBuilder(); // Attach telemetry hooks — each call generates isolated traceId + spans flow._setHooks(telemetry.hooks()); flow.startWith(stepA).then(stepB).then(stepC); await flow.run(shared); // Flush remaining spans on shutdown process.on("SIGTERM", () => telemetry.stop()); ``` ## Constructor Options | Option | Type | Default | Description | | ----------------- | ------------------- | ----------------- | ------------------------------------------ | | `exporter` | `TelemetryExporter` | `consoleExporter` | Where to send spans | | `flushIntervalMs` | `number` | `5000` | How often to flush the span buffer (ms) | | `maxBuffer` | `number` | `100` | Force flush after this many buffered spans | ## `Span` Object ```typescript interface Span { traceId: string; // per-run unique ID spanId: string; // per-step unique ID parentId?: string; // spans inside sub-flows point to their parent name: string; // e.g. "fn[0]", "batch[2]" startMs: number; // Unix ms endMs: number; durationMs: number; status: "ok" | "error"; attrs: Record; } ``` ## Custom Exporter Implement `TelemetryExporter` to send spans to any backend (OpenTelemetry, Jaeger, Datadog, etc.): ```typescript import type { TelemetryExporter, Span } from "flowneer/plugins/telemetry"; const otlpExporter: TelemetryExporter = { export(spans: Span[]) { return fetch("http://otel-collector:4318/v1/traces", { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ spans }), }); }, }; const telemetry = new TelemetryDaemon({ exporter: otlpExporter }); ``` ## `consoleExporter` A built-in exporter that pretty-prints spans to `console.log`: ``` [telemetry] fn[0] ok 142ms { stepType: "fn", stepIndex: 0 } [telemetry] fn[1] ok 8ms { stepType: "fn", stepIndex: 1 } ``` ## `TelemetryDaemon` Methods | Method | Description | | --------------- | --------------------------------------------------------------------- | | `hooks()` | Returns `FlowHooks` to attach to a flow. Call once per flow instance. | | `flush()` | Force-flush the current buffer immediately | | `stop()` | Clear the flush timer and flush remaining spans | | `record(span)` | Manually record a span (advanced) | ## Multiple Flows `hooks()` creates an isolated traceId and span context each time it's called, so multiple concurrent flows don't bleed into each other: ```typescript const flow1 = new FlowBuilder(); flow1._setHooks(telemetry.hooks()); // traceId: "abc..." const flow2 = new FlowBuilder(); flow2._setHooks(telemetry.hooks()); // traceId: "xyz..." ``` --- --- url: /flowneer/recipes/tool-calling-agent.md --- # Tool-calling Agent Build a reusable tool-calling agent using `createAgent` and `tool()`. The agent automatically loops through think → tool calls → observation until it produces a final answer. **Plugins used:** `createAgent`, `tool()` (from `flowneer/presets/agent`) *** ## The code ```typescript import "dotenv/config"; import { OpenAI } from "openai"; import { tool, createAgent } from "flowneer/presets/agent"; import type { LlmAdapter, AgentState } from "flowneer/presets/agent"; // ─── Tools ─────────────────────────────────────────────────────────────────── const calculator = tool( ({ expression }: { expression: string }) => { // Safe arithmetic only — replace with a proper parser in production return Function(`"use strict"; return (${expression})`)(); }, { name: "calculator", description: "Evaluate a mathematical expression", params: { expression: { type: "string", description: "A JavaScript arithmetic expression, e.g. '(12 * 4) / 2'", required: true, }, }, }, ); const getTime = tool(() => new Date().toUTCString(), { name: "get_time", description: "Get the current UTC date and time", params: {}, }); const webSearch = tool( async ({ query }: { query: string }) => { // Replace with a real search API call return `[mock search results for: ${query}]`; }, { name: "web_search", description: "Search the web for up-to-date information", params: { query: { type: "string", description: "Search query", required: true }, }, }, ); // ─── LLM adapter (OpenAI) ──────────────────────────────────────────────────── const openai = new OpenAI(); const callLlm: LlmAdapter = async (messages, toolDefs) => { const res = await openai.chat.completions.create({ model: "gpt-4o-mini", messages: messages as any, tools: toolDefs.map((t) => ({ type: "function" as const, function: t })), tool_choice: "auto", }); const msg = res.choices[0]!.message; if (msg.tool_calls?.length) { return { toolCalls: msg.tool_calls.map((tc: any) => ({ id: tc.id, name: tc.function.name, args: JSON.parse(tc.function.arguments), })), }; } return { text: msg.content ?? "" }; }; // ─── Agent (create once, reuse forever) ────────────────────────────────────── const agent = createAgent({ tools: [calculator, getTime, webSearch], callLlm, systemPrompt: "You are a helpful assistant. Use the available tools when needed. " + "Always show your reasoning before giving a final answer.", maxIterations: 8, }); // ─── Run ───────────────────────────────────────────────────────────────────── const state: AgentState = { input: "What is 1337 * 42, and what time is it right now?", messages: [], }; await agent.run(state); console.log(state.output); if (state.__reactExhausted) { console.warn("Agent hit the iteration limit without finishing."); } ``` *** ## How it works 1. `tool()` wraps each function with a name, description, and parameter schema. 2. `createAgent()` returns a `FlowBuilder` that wires `.withTools()` + `.withReActLoop()` internally. 3. On each iteration the `callLlm` adapter receives the full conversation history and tool schemas. If the model returns `tool_calls`, Flowneer dispatches them and injects the results into `state.messages` before the next iteration. 4. When the model returns plain text with no tool calls, the loop ends and `state.output` is set. ## Variations **Swap the model** — change `"gpt-4o-mini"` to `"gpt-4o"` or any Claude / Gemini adapter. **Add memory** — call `.withMemory(new BufferWindowMemory(10))` on the returned `FlowBuilder` to persist context across runs. **Cap cost** — add `.withTokenBudget(...)` to stop the loop when the token budget is exceeded. **Stream chunks** — replace `agent.run(state)` with: ```typescript for await (const event of agent.stream(state)) { if (event.type === "chunk") process.stdout.write(String(event.data)); } ``` ## See also * [createAgent & tool() reference](../presets/agent/create-agent.md) * [withReActLoop](../presets/agent/react-loop.md) * [withTools & ToolRegistry](../plugins/tools/overview.md) --- --- url: /flowneer/plugins/compliance/audit-flow.md --- # withAuditFlow Statically audits a flow for taint violations — checks that no "sink" step (e.g. an outbound HTTP call) can be reached after a "source" step (e.g. a PII-fetching step), without executing the flow at all. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withAuditFlow } from "flowneer/plugins/compliance"; const AppFlow = FlowBuilder.extend([withAuditFlow]); ``` ## Usage ```typescript import { withAuditFlow } from "flowneer/plugins/compliance"; import type { TaintRule } from "flowneer/plugins/compliance"; const flow = new AppFlow() .then(fetchUser, { label: "pii:fetchUser" }) .then(enrichProfile, { label: "pii:enrich" }) .then(callAnalytics, { label: "external:analytics" }) .then(saveResult); const rules: TaintRule[] = [ { source: ["pii:*"], sink: (meta) => meta.label?.startsWith("external:") ?? false, message: "PII must not reach external endpoints", onViolation: "throw", }, ]; const report = flow.auditFlow(rules); if (!report.passed) { for (const v of report.violations) { console.error( `Rule violation: step "${v.source.label}" (index ${v.source.index}) ` + `can reach sink "${v.sink.label}" (index ${v.sink.index})`, ); } } ``` ## `TaintRule` ```typescript interface TaintRule { /** Steps that produce sensitive data. Matched via StepFilter. */ source: StepFilter; /** Steps that send data outbound. Matched via StepFilter. */ sink: StepFilter; /** Human-readable description, included in violation messages. */ message?: string; /** What to do at runtime via withRuntimeCompliance. Defaults to "throw". */ onViolation?: "throw" | "warn" | "record"; } ``` ## `ComplianceReport` ```typescript interface ComplianceReport { passed: boolean; violations: ComplianceViolation[]; } interface ComplianceViolation { rule: TaintRule; source: { index: number; label?: string }; sink: { index: number; label?: string }; } ``` ## `StepFilter` patterns `source` and `sink` accept any [`StepFilter`](/core/plugins#stepfilter): a string array (exact labels or glob patterns) or a predicate function. ```typescript // Exact labels { source: ["fetchUser"], sink: ["sendToAnalytics"] } // Glob — matches "pii:user", "pii:address", etc. { source: ["pii:*"], sink: ["external:*"] } // Predicate { source: (meta) => meta.label?.startsWith("pii:") ?? false, sink: (meta) => meta.label?.startsWith("external:") ?? false, } ``` Steps without a label are never matched by string patterns. ## How It Works `auditFlow()` walks the compiled `steps[]` array recursively (including nested `.loop()` bodies and `.batch()` processors). For each rule it collects all source and sink positions, then flags any sink that appears at a higher index than a source. Nothing is executed — the analysis is purely structural and synchronous. ## Notes * The analysis is **conservative by design**: it reports any structural path from source to sink regardless of runtime conditions. A sink that could only be reached when a branch is false is still reported. * For runtime enforcement (checking actual shared state values), use [`withRuntimeCompliance`](./runtime-compliance.md). * Combine both for defence-in-depth: `auditFlow()` at startup to catch structural issues, `withRuntimeCompliance` during execution to catch data-level issues. --- --- url: /flowneer/plugins/persistence/audit-log.md --- # withAuditLog Writes an immutable audit entry to a store after each step — both successful completions and errors. Each entry is a **deep clone** of `shared` at that point in time, making it suitable for compliance, debugging post-mortems, and replay analysis. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withAuditLog } from "flowneer/plugins/persistence"; const AppFlow = FlowBuilder.extend([withAuditLog]); ``` ## The `AuditLogStore` Interface ```typescript interface AuditEntry { stepIndex: number; type: string; timestamp: number; // Unix ms shared: S; // deep clone via JSON.parse/stringify error?: string; // present on failed steps } interface AuditLogStore { append: (entry: AuditEntry) => void | Promise; } ``` ## Usage ```typescript const log: AuditEntry[] = []; const store: AuditLogStore = { append: (entry) => log.push(entry), }; const flow = new AppFlow() .withAuditLog(store) .startWith(stepA) .then(stepB) .then(stepC); await flow.run(initialState); // Every step, success or failure, is now in `log` for (const entry of log) { console.log( `Step ${entry.stepIndex} (${entry.type}) at ${new Date(entry.timestamp).toISOString()}`, ); if (entry.error) console.error(" Error:", entry.error); } ``` ## Differences vs `withCheckpoint` | | `withCheckpoint` | `withAuditLog` | | ------------------------ | ---------------- | -------------- | | Captures errors | ❌ | ✅ | | Deep clone | ✅ (your impl) | ✅ (built-in) | | Designed for replay | ✅ | ❌ | | Designed for audit trail | ❌ | ✅ | ## Persistent Backend Example ```typescript import Database from "better-sqlite3"; const db = new Database("audit.db"); db.exec(`CREATE TABLE IF NOT EXISTS audit ( step_index INTEGER, type TEXT, timestamp INTEGER, shared TEXT, error TEXT )`); const sqliteStore: AuditLogStore = { append: ({ stepIndex, type, timestamp, shared, error }) => { db.prepare(`INSERT INTO audit VALUES (?, ?, ?, ?, ?)`).run( stepIndex, type, timestamp, JSON.stringify(shared), error ?? null, ); }, }; ``` --- --- url: /flowneer/plugins/observability/callbacks.md --- # withCallbacks Register expanded lifecycle callbacks dispatched based on step label prefixes. Provides LangChain-style `onLLMStart`, `onToolEnd`, etc. without modifying core. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withCallbacks } from "flowneer/plugins/observability"; const AppFlow = FlowBuilder.extend([withCallbacks]); ``` ## Usage ```typescript const flow = new AppFlow() .withCallbacks({ onLLMStart: (meta) => console.log(`LLM step ${meta.index} starting`), onLLMEnd: (meta, s) => console.log(`LLM done, tokens: ${s.tokensUsed}`), onToolStart: (meta) => console.log(`Tool ${meta.label} starting`), onToolEnd: (meta, s) => console.log(`Tool done`), onChainStart: (meta) => console.log(`Step ${meta.index} starting`), onError: (meta, err) => console.error(`Step ${meta.index} failed`, err), }) .startWith(async (s) => { /* chain step */ }) .then(myLlmStep); // label this step "llm:generate" to trigger onLLMStart/End ``` ## Callback Dispatch by Label Prefix The callback invoked depends on the `label` field in `StepMeta`: | Label prefix | `beforeStep` callback | `afterStep` callback | | :------------------------ | :-------------------- | :------------------- | | `"llm:*"` | `onLLMStart` | `onLLMEnd` | | `"tool:*"` | `onToolStart` | `onToolEnd` | | `"agent:*"` | `onAgentAction` | `onAgentFinish` | | *(anything else or none)* | `onChainStart` | `onChainEnd` | ## Setting Step Labels Labels are set via `NodeOptions` (not yet exposed as a first-class API in core — forward-looking feature): ```typescript // Convention: combine with a plugin that sets meta.label ``` > Currently labels are set internally by plugins like `withTools` and `withReActLoop`. You can set `meta.label` via a custom `wrapStep` or a `beforeStep` hook in your own plugin. ## All Callback Handlers ```typescript interface CallbackHandlers { onLLMStart?: (meta: StepMeta, shared: S, params: P) => void | Promise; onLLMEnd?: (meta: StepMeta, shared: S, params: P) => void | Promise; onToolStart?: (meta: StepMeta, shared: S, params: P) => void | Promise; onToolEnd?: (meta: StepMeta, shared: S, params: P) => void | Promise; onAgentAction?: ( meta: StepMeta, shared: S, params: P, ) => void | Promise; onAgentFinish?: ( meta: StepMeta, shared: S, params: P, ) => void | Promise; onChainStart?: (meta: StepMeta, shared: S, params: P) => void | Promise; onChainEnd?: (meta: StepMeta, shared: S, params: P) => void | Promise; onError?: (meta: StepMeta, error: unknown, shared: S, params: P) => void; } ``` All callbacks are optional. Only those for which a handler is registered will be called. --- --- url: /flowneer/plugins/messaging/channels.md --- # withChannels Provides a named message channel system on `shared.__channels`. Steps communicate asynchronously by sending messages to named channels and draining them in subsequent steps or parallel workers. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withChannels } from "flowneer/plugins/messaging"; const AppFlow = FlowBuilder.extend([withChannels]); ``` ## Usage ```typescript import { sendTo, receiveFrom, peekChannel } from "flowneer/plugins/messaging"; const flow = new AppFlow() .withChannels() .startWith(async (s) => { // Send results to a named channel sendTo(s, "processed", { id: 1, value: "result_a" }); sendTo(s, "processed", { id: 2, value: "result_b" }); }) .then(async (s) => { // Drain the channel — returns all pending messages and clears it const items = receiveFrom<{ id: number; value: string }>(s, "processed"); console.log(items); // [{ id: 1, value: "result_a" }, { id: 2, value: "result_b" }] }); ``` ## Helper Functions ```typescript import { sendTo, receiveFrom, peekChannel } from "flowneer/plugins/messaging"; ``` | Function | Signature | Description | | ------------- | ------------------------------------ | -------------------------------------------------------- | | `sendTo` | `(shared, channel, message) => void` | Enqueue a message on the named channel | | `receiveFrom` | `(shared, channel) => T[]` | Drain and return all pending messages (clears the queue) | | `peekChannel` | `(shared, channel) => T[]` | Return messages without clearing the queue | ## Pattern: Fan-out / Fan-in ```typescript const flow = new AppFlow() .withChannels() .startWith(async (s) => { // Fan-out: producer sends items for (const task of s.tasks) { sendTo(s, "tasks", task); } }) .parallel([ async (s) => { const myTasks = receiveFrom(s, "tasks"); for (const t of myTasks) { const result = await processTask(t); sendTo(s, "results", result); } }, // more workers... ]) .then(async (s) => { // Fan-in: collect all results s.finalResults = receiveFrom(s, "results"); }); ``` ## Notes * Channels are backed by `Map` stored on `shared.__channels`. * `withChannels()` initialises `shared.__channels` in `beforeFlow` if it doesn't already exist. * Messages are not typed at the channel level — use generics in `receiveFrom` for safety. * Channel state persists across steps for the duration of a `.run()` call. --- --- url: /flowneer/plugins/persistence/checkpoint.md --- # withCheckpoint · resumeFrom Unified checkpoint plugin — saves snapshots on configurable triggers and optionally maintains a versioned history. `resumeFrom` is its companion plugin for resuming flows from a saved version. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withCheckpoint, resumeFrom } from "flowneer/plugins/persistence"; // Both are separate FlowneerPlugin objects — extend with whichever you need. const AppFlow = FlowBuilder.extend([withCheckpoint, resumeFrom]); ``` ## `CheckpointOptions` ```typescript interface CheckpointOptions { /** Called every time a checkpoint fires. */ save: (snapshot: S, meta: CheckpointMeta) => void | Promise; /** Which triggers activate checkpointing. Default: ['step:after', 'error']. */ on?: Trigger[]; /** Narrow step-scoped triggers to matching steps only (label glob or predicate). */ filter?: StepFilter; /** Custom deep-clone function. Default: structuredClone. */ serialize?: (s: S) => S; /** Enable versioned history with version ids and parent pointers. */ history?: HistoryOptions; } ``` ### `Trigger` | Value | When it fires | | ------------------ | ------------------------------------------------------------ | | `'step:after'` | After every successful step (default; respects `filter`) | | `'error'` | When a step throws (default; respects `filter`) | | `'flow:start'` | Once before the first step runs | | `'flow:end'` | Once after the last step completes (even on error) | | `'loop:iteration'` | After each loop-body iteration completes (respects `filter`) | | `'anchor:hit'` | When a `goto` jump resolves to an anchor | ### `CheckpointMeta` ```typescript interface CheckpointMeta { trigger: Trigger; stepMeta?: StepMeta; // step:after | error | loop:iteration iteration?: number; // loop:iteration only anchorName?: string; // anchor:hit only (label without '#') error?: unknown; // error only version?: string; // set when history is enabled, e.g. "v3" parentVersion?: string | null; // set when history is enabled } ``` ### `HistoryOptions` ```typescript interface HistoryOptions { /** 'full' stores a complete snapshot; 'diff' stores only changed keys. Default: 'full'. */ strategy?: "full" | "diff"; /** Maximum versions to keep; oldest is pruned when exceeded. */ maxVersions?: number; } ``` ## Usage ### Basic — save after every step ```typescript const flow = new AppFlow() .withCheckpoint({ save(snapshot, meta) { db.save(snapshot, meta.stepMeta?.index); }, }) .startWith(stepA) .then(stepB) .then(stepC); await flow.run(initialState); ``` ### Custom triggers ```typescript .withCheckpoint({ save(snapshot, meta) { store.set(meta.trigger, snapshot); }, on: ['flow:start', 'step:after', 'error', 'flow:end'], }) ``` ### Scoped to specific steps via `filter` ```typescript // Only checkpoint steps whose label matches 'llm:*' or 'api:*' .withCheckpoint({ save: (snap, meta) => persist(snap, meta), filter: ['llm:*', 'api:*'], }) ``` ### Versioned history (full strategy) Each call to `save` receives a `version` id and a `parentVersion` pointer, forming a linked chain across the run. ```typescript const history = new Map(); .withCheckpoint({ save(snapshot, meta) { history.set(meta.version!, { snapshot, parent: meta.parentVersion ?? null, }); }, history: { strategy: 'full', maxVersions: 20 }, }) ``` ### Versioned history (diff strategy) Only changed keys are written to `snapshot` — useful for large state objects. ```typescript .withCheckpoint({ save(diff, meta) { // diff only contains the keys that changed since the last checkpoint patchStore.append(meta.version!, diff, meta.parentVersion); }, history: { strategy: 'diff' }, }) ``` ### Error checkpoint ```typescript .withCheckpoint({ save(snapshot, meta) { if (meta.trigger === 'error') { console.error('step failed:', (meta.error as Error).message); } db.upsert(snapshot); }, on: ['step:after', 'error'], }) ``` ### Loop iteration checkpoint ```typescript .withCheckpoint({ save(snapshot, meta) { console.log(`iteration ${meta.iteration} done`); }, on: ['loop:iteration'], }) ``` *** ## `resumeFrom` Resumes execution from a previously saved checkpoint version. Steps whose index is ≤ the saved `stepIndex` are skipped; `shared` is pre-populated from the saved snapshot before the first live step runs. ```typescript .resumeFrom(version, store) ``` | Argument | Type | Description | | --------------- | ---------------------------------------------------- | ------------------------------------------------------------- | | `version` | `string` | Version id to resume from (e.g. `"v3"`). | | `store.resolve` | `(v: string) => { stepIndex: number; snapshot?: S }` | Returns the step to resume at and an optional state snapshot. | ### Crash-recovery example ```typescript const checkpointStore = { saved: null as null | { stepIndex: number; snapshot: State }, save(snapshot: State, meta: CheckpointMeta) { this.saved = { stepIndex: meta.stepMeta!.index, snapshot }; }, resolve(_version: string) { if (!this.saved) throw new Error("no checkpoint"); return this.saved; }, }; const flow = new AppFlow() .withCheckpoint({ save: checkpointStore.save.bind(checkpointStore), on: ["step:after"], }) .startWith(stepA) .then(stepB) // crashes here .then(stepC) .then(stepD); try { await flow.run(initialState); } catch { // Re-build the same flow but skip completed steps const recoveryFlow = new AppFlow() .withCheckpoint({ save: checkpointStore.save.bind(checkpointStore), on: ["step:after"], }) .resumeFrom("last", { resolve: checkpointStore.resolve.bind(checkpointStore), }) .startWith(stepA) .then(stepB) .then(stepC) .then(stepD); await recoveryFlow.run({} as State); // snapshot is applied automatically } ``` ## Notes * `save` always receives a **deep-cloned** snapshot (`structuredClone` by default). Override with `serialize` if you need a custom clone. * `filter` only narrows step-scoped triggers (`step:after`, `error`, `loop:iteration`). It has no effect on `flow:start`, `flow:end`, or `anchor:hit`. * When `history.strategy === 'diff'`, the very first checkpoint always contains the full snapshot (nothing to diff against). * `resumeFrom` is a separate plugin so it can be omitted from flows that never need resume logic. * Pairs naturally with [`withReplay`](./replay.md) as a lightweight alternative that skips steps by index without needing a store. --- --- url: /flowneer/plugins/resilience/circuit-breaker.md --- # withCircuitBreaker Implements the circuit-breaker pattern at the flow level. After `maxFailures` consecutive step failures, the circuit opens and every subsequent step throws immediately without executing. After `resetMs` milliseconds, the circuit closes and allows one probe attempt. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withCircuitBreaker } from "flowneer/plugins/resilience"; const AppFlow = FlowBuilder.extend([withCircuitBreaker]); ``` ## Usage ```typescript const flow = new AppFlow() .withCircuitBreaker({ maxFailures: 3, resetMs: 30_000 }) .startWith(callExternalApi) .then(processResult); try { await flow.run(shared); } catch (err) { if (err.message.startsWith("circuit open")) { console.error("External API is unavailable, circuit tripped."); } } ``` ## Options | Option | Type | Default | Description | | ------------- | -------- | ------- | --------------------------------------------------------------- | | `maxFailures` | `number` | `3` | Consecutive failures needed to open the circuit | | `resetMs` | `number` | `30000` | Milliseconds after which the circuit resets for a probe attempt | ## Circuit States ``` CLOSED (normal operation) │ ├── step failure (1, 2, 3...) │ ▼ OPEN (after maxFailures reached) │ ├── resetMs elapsed │ ▼ HALF-OPEN (one probe step allowed) │ ├── probe succeeds → CLOSED └── probe fails → OPEN (timer resets) ``` ## Behaviour Details * `consecutiveFailures` resets to 0 on any successful step. * When `openedAt + resetMs <= Date.now()`, the circuit resets and one step is allowed to execute (half-open probe). * If the probe fails, the circuit opens again immediately. * Circuit state is **per-builder-instance** — different flows have independent breakers. ## Tips * Combine with `retries` on individual steps for local retry before the global circuit counts a failure. * For per-API rate limiting, consider `withRateLimit` instead. --- --- url: /flowneer/plugins/llm/cost-tracker.md --- # withCostTracker Accumulates per-step cost values stored in `shared.__stepCost` into a running total at `shared.__cost`. After each step the `__stepCost` key is cleared to prevent double-counting. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withCostTracker } from "flowneer/plugins/llm"; const AppFlow = FlowBuilder.extend([withCostTracker]); ``` ## Usage ```typescript interface State { prompt: string; response: string; __stepCost?: number; __cost?: number; } const flow = new AppFlow() .withCostTracker() .startWith(async (s) => { const { text, usage } = await callLlmWithUsage(s.prompt); s.response = text; // Set __stepCost so the plugin picks it up after this step: s.__stepCost = usage.totalTokens * 0.000002; // e.g. $0.000002 per token }) .then((s) => { console.log(`Total cost so far: $${s.__cost?.toFixed(6)}`); }); await flow.run({ prompt: "Hello!", response: "" }); ``` ## Behaviour 1. After each step completes, `withCostTracker` reads `shared.__stepCost` (defaulting to `0` if absent). 2. Adds it to `shared.__cost` (initialising to `0` on first run). 3. Deletes `shared.__stepCost` so it isn't counted again in subsequent steps. ## State Keys | Key | Direction | Description | | ------------ | --------------------- | --------------------------------------- | | `__stepCost` | **Write** (your step) | Cost incurred during this step | | `__cost` | **Read** (your step) | Running total accumulated by the plugin | ## Tips * Your LLM utility function is responsible for setting `__stepCost`. The plugin only aggregates it. * Works seamlessly with `.parallel()` — each parallel function can set `__stepCost` on its draft, and the reducer merges costs into the parent. * Combine with `withTokenBudget` to both track and cap spending. --- --- url: /flowneer/plugins/resilience/cycles.md --- # withCycles Guards against infinite anchor-jump loops. Throws when the number of goto jumps exceeds a configured limit. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withCycles } from "flowneer/plugins/resilience"; const AppFlow = FlowBuilder.extend([withCycles]); ``` ## Usage ### Global jump limit ```typescript const flow = new AppFlow() .withCycles(100) // abort after 100 total anchor jumps .anchor("retry") .then(async (s) => { s.attempts++; if (s.attempts < 3) return "#retry"; }); ``` ### Per-anchor limit ```typescript flow .withCycles(5, "retry") // max 5 jumps to the "retry" anchor .anchor("retry") .then(async (s) => { if (!s.done) return "#retry"; }); ``` ### Combined ```typescript flow .withCycles(100) // total jump limit .withCycles(5, "retry") // plus per-anchor limit for "retry" .withCycles(10, "next"); // plus per-anchor limit for "next" ``` ## API ### `.withCycles(maxJumps: number, anchor?: string)` | Parameter | Type | Default | Description | | ---------- | -------- | ----------- | ---------------------------------------------------------------------------- | | `maxJumps` | `number` | — | Maximum number of jumps allowed | | `anchor` | `string` | `undefined` | If provided, limits only jumps to this anchor. If omitted, applies globally. | ## How Jump Detection Works The plugin detects a jump by comparing the current step index with the previous one. A step index that is equal to or less than the previous index indicates a backward jump via goto. ## Error When the limit is exceeded: * Global: `"cycle limit exceeded: N anchor jumps > maxJumps(M)"` * Per-anchor: `"cycle limit exceeded for anchor "name": N visits > limit(M)"` ## Counters Reset per Run Jump counters reset at `beforeFlow`, so each `.run()` call starts fresh. --- --- url: /flowneer/plugins/dev/dry-run.md --- # withDryRun Skips all step bodies while still firing `beforeStep` and `afterStep` hooks. Use in tests and CI to validate hook wiring and observability pipelines without executing real side effects. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withDryRun } from "flowneer/plugins/dev"; const AppFlow = FlowBuilder.extend([withDryRun]); ``` ## Usage ```typescript const events: string[] = []; const flow = new AppFlow() .withDryRun() .withCallbacks({ onChainStart: (meta) => events.push(`start:${meta.index}`), onChainEnd: (meta) => events.push(`end:${meta.index}`), }) .startWith(async (s) => { await callExternalApi(s); /* never runs */ }) .then(async (s) => { await saveToDb(s); /* never runs */ }); await flow.run({}); console.log(events); // ["start:0", "end:0", "start:1", "end:1"] // Step bodies didn't run, but hooks fired correctly ``` ## How It Works Registers a `wrapStep` hook that does nothing — `next()` is never called, so the step body is skipped. Because hooks registered before `withDryRun` run in their own `wrapStep` layers, those hooks still fire. ## Use Cases * Validate that your observability and callback wiring is correct without real I/O. * Benchmark hook overhead in isolation. * Snapshot-test step sequences in unit tests. ## Notes * `withDryRun` affects **all** steps, including those inside `.loop()`, `.batch()`, and `.parallel()` bodies. * State remains unchanged (since step bodies don't run), so hooks that read from `shared` will see the initial values. * Combine with [`withMocks`](./mocks.md) if you want some steps to execute with fake implementations. --- --- url: /flowneer/plugins/resilience/fallback.md --- # withFallback Catches any step error and calls a fallback function instead of propagating the error. The flow continues normally after the fallback executes. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withFallback } from "flowneer/plugins/resilience"; const AppFlow = FlowBuilder.extend([withFallback]); ``` ## Usage ```typescript const flow = new AppFlow() .withFallback(async (s) => { const err = s.__fallbackError!; console.error(`Step ${err.stepIndex} failed: ${err.message}`); s.result = "default value"; }) .startWith(async (s) => { s.result = await riskyOperation(s); // might throw }) .then((s) => { console.log(s.result); // either the real result or "default value" }); ``` ## API ### `.withFallback(fn: NodeFn)` The fallback function receives the same `(shared, params)` signature as a regular step. After it completes, execution continues with the **next step** in the flow as if nothing failed. ## `__fallbackError` Context When a step fails, `withFallback` stores error context on `shared.__fallbackError` before calling your fallback function: ```typescript interface FallbackError { stepIndex: number; stepType: string; message: string; stack?: string; } ``` ```typescript .withFallback(async (s) => { if (s.__fallbackError?.stepType === "fn") { s.usedFallback = true; } }) ``` ## Notes * The fallback applies globally — all steps in the flow will use it if they fail. * `InterruptError` is **not** caught by `withFallback` — it propagates normally. * Combine with `withHistory` or `withAuditLog` to track which steps triggered fallbacks. * For per-step error handling without a global fallback, use the `onError` hook in a custom plugin. --- --- url: /flowneer/plugins/dev/flow-analyzer.md --- # withFlowAnalyzer Two complementary tools for understanding what a flow does: * **`analyzeFlow()`** — synchronous static walk; answers *"what paths are possible?"* * **`withTrace()`** — installs runtime hooks; answers *"what path was actually taken?"* ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withFlowAnalyzer } from "flowneer/plugins/dev"; const AppFlow = FlowBuilder.extend([withFlowAnalyzer]); ``` *** ## `analyzeFlow()` — static path map Walks the compiled `steps[]` array and returns a `PathMap` describing all known nodes, anchors, and structural paths. Nothing is executed. ```typescript const flow = new AppFlow() .anchor("refine", 5) .then(generateDraft, { label: "gen:draft" }) .then(async (s) => (s.score < 0.9 ? "#refine" : undefined), { label: "check:score", }) .then(publish, { label: "publish" }); const map = flow.analyzeFlow(); console.log(map.anchors); // ["refine"] console.log(map.hasDynamicGotos); // true — fn steps may return goto strings console.log(map.nodes.map((n) => n.label)); // ["refine", "gen:draft", "check:score", "publish"] ``` ### `PathMap` ```typescript interface PathMap { nodes: PathNode[]; /** All anchor names declared in this flow (and nested sub-flows). */ anchors: string[]; /** * True whenever fn steps are present — they may return "#anchorName" at * runtime. Static analysis cannot resolve these edges without execution. */ hasDynamicGotos: boolean; } ``` ### `PathNode` ```typescript interface PathNode { id: string; // "fn_0", "branch_2", "anchor:refine", etc. type: "fn" | "branch" | "loop" | "batch" | "parallel" | "anchor"; label?: string; branches?: Record; // branch arms body?: PathNode[]; // loop / batch inner steps parallel?: PathNode[][]; // parallel fan-out lanes } ``` ### Branch analysis ```typescript const flow = new AppFlow().branch( async (s) => (s.ok ? "pass" : "fail"), { pass: async (s) => { s.result = "ok"; }, fail: async (s) => { s.result = "failed"; }, }, ); const map = flow.analyzeFlow(); const branchNode = map.nodes.find((n) => n.type === "branch"); console.log(Object.keys(branchNode.branches)); // ["pass", "fail"] ``` *** ## `withTrace()` — runtime execution trace Installs `beforeStep`/`afterStep` hooks that record every visited step with its type, label, and wall-clock duration. ```typescript const flow = new AppFlow() .then(fetchUser, { label: "fetch:user" }) .then(enrichProfile, { label: "enrich:profile" }) .then(saveResult, { label: "save" }); const trace = flow.withTrace(); await flow.run(shared); const report = trace.getTrace(); console.log(report.pathSummary); // ["fetch:user", "enrich:profile", "save"] console.log(report.totalDurationMs); // 347 trace.dispose(); // remove hooks when done ``` ### `TraceHandle` ```typescript interface TraceHandle { /** Returns a snapshot of the trace collected so far. Safe to call mid-run. */ getTrace(): TraceReport; /** Removes the installed hooks. */ dispose(): void; } ``` ### `TraceReport` ```typescript interface TraceReport { events: TraceEvent[]; totalDurationMs: number; /** Ordered list of visited step labels. Unlabelled steps are omitted. */ pathSummary: string[]; } interface TraceEvent { stepIndex: number; type: string; label?: string; durationMs: number; } ``` ### Compose with `withDryRun` Trace the execution path without running any real logic: ```typescript const flow = new AppFlow() .withDryRun() .then(callExpensiveApi, { label: "api:call" }) .then(processResult, { label: "process" }); const trace = flow.withTrace(); await flow.run(shared); // no real I/O console.log(trace.getTrace().pathSummary); // ["api:call", "process"] trace.dispose(); ``` ### Dispose `withTrace()` returns a `dispose()` function that removes the hooks. Always call `dispose()` when the trace is no longer needed to avoid accumulating hooks across multiple runs. ```typescript const trace = flow.withTrace(); try { await flow.run(shared); console.log(trace.getTrace()); } finally { trace.dispose(); } ``` *** ## Use Cases * **Pre-deployment checks** — run `analyzeFlow()` at startup to verify anchor names are present and flow structure matches expectations. * **Debugging** — `.withDryRun().withTrace()` shows the execution path without side effects in staging or test environments. * **CI test assertions** — assert that a flow visited exactly the steps it should in a given scenario. * **Performance profiling** — use `TraceEvent.durationMs` to find slow steps without a full telemetry stack. --- --- url: /flowneer/plugins/observability/history.md --- # withHistory Records a shallow snapshot of `shared` after each step into `shared.__history`. Useful for debugging, replay, and auditing flow execution state transitions. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withHistory } from "flowneer/plugins/observability"; const AppFlow = FlowBuilder.extend([withHistory]); ``` ## Usage ```typescript interface State { count: number; __history?: Array<{ index: number; type: string; snapshot: any }>; } const flow = new AppFlow() .withHistory() .startWith((s) => { s.count = 1; }) .then((s) => { s.count = 2; }) .then((s) => { s.count = 3; }); const state: State = { count: 0 }; await flow.run(state); console.log(state.__history); // [ // { index: 0, type: "fn", snapshot: { count: 1 } }, // { index: 1, type: "fn", snapshot: { count: 2 } }, // { index: 2, type: "fn", snapshot: { count: 3 } }, // ] ``` ## Snapshot Format Each entry in `shared.__history`: ```typescript { index: number; // step index (0-based) type: string; // "fn" | "branch" | "loop" | "batch" | "parallel" snapshot: object; // shallow clone of shared, excluding __history itself } ``` The snapshot is a **shallow** copy — nested objects are not deep-cloned. For deep snapshots, use [`withCheckpoint`](../persistence/checkpoint.md) or [`withAuditLog`](../persistence/audit-log.md). ## State Keys | Key | Direction | Description | | ----------- | -------------------- | ----------------------- | | `__history` | **Read** (your step) | Array of step snapshots | ## Use Cases * **Debugging:** Inspect what state looked like at each step after a failure. * **Testing:** Assert that specific state transitions happened in the right order. * **UI feedback:** Show a progress timeline by reading `__history` incrementally. --- --- url: /flowneer/plugins/observability/interrupts.md --- # withInterrupts / interruptIf Insert conditional pause points into a flow. When the condition is true, the flow throws an `InterruptError` carrying a deep clone of the current shared state. Catch it in your runner to implement approval gates, human review, or external-resume patterns. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withInterrupts } from "flowneer/plugins/observability"; const AppFlow = FlowBuilder.extend([withInterrupts]); ``` ## Usage ```typescript import { InterruptError } from "flowneer"; const flow = new AppFlow() .startWith(generateDraft) .interruptIf((s) => s.draft.length > 0) // pause after draft is ready .then(publishDraft); try { await flow.run(shared); } catch (e) { if (e instanceof InterruptError) { const saved = e.savedShared as State; console.log("Draft ready for review:", saved.draft); // Human reviews... then resume: saved.approved = true; await flow.run(saved); // re-runs from scratch // Use withReplay to skip completed steps } } ``` ## API ### `.interruptIf(condition)` ```typescript .interruptIf( (shared: S, params: P) => boolean | Promise ) ``` Inserts a synthetic step that: 1. Evaluates `condition(shared, params)`. 2. If `true`, throws `new InterruptError(JSON.parse(JSON.stringify(shared)))`. 3. If `false`, does nothing — flow continues. ## `InterruptError` ```typescript class InterruptError extends Error { savedShared: unknown; // deep clone of shared at interrupt time } ``` `InterruptError` is **never wrapped** in `FlowError` — it propagates directly to the caller. ## Resume with Replay Combine with [`withReplay`](../persistence/replay.md) to skip already-completed steps when resuming: ```typescript import { withReplay } from "flowneer/plugins/persistence"; const AppFlow = FlowBuilder.extend([withInterrupts, withReplay]); try { await flow.run(shared); } catch (e) { if (e instanceof InterruptError) { const saved = e.savedShared as State; // e.g. interrupt was at step 2, so resume from step 3: flow.withReplay(3); await flow.run({ ...saved, approved: true }); } } ``` ## See Also * [humanNode](../agent/human-node.md) — higher-level human-in-the-loop with prompt storage and a `resumeFlow` helper. * [Errors](../../core/errors.md) — full error hierarchy. --- --- url: /flowneer/plugins/persistence/manual-stepping.md --- # withManualStepping Pause a running flow before each step and resume it on demand. After calling `flow.run()`, execution suspends before each matched step and waits for `flow.stepper.continue()` to be called. The flow runs in-flight on the same async call stack — there is no serialisation or replay involved. Use this for human-in-the-loop approval gates, interactive debugging, test-driven step inspection, or any scenario where you need to observe and optionally modify shared state between steps. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withManualStepping } from "flowneer/plugins/persistence"; const AppFlow = FlowBuilder.extend([withManualStepping]); ``` ## API ### `.withManualStepping(options?)` Registers the pause gate. Must be called before `.run()`. Returns `this` for chaining. ```typescript interface ManualSteppingOptions { /** Called each time the flow pauses, before the gate blocks. */ onPause?: (meta: StepMeta, shared: S) => void | Promise; /** * Narrow which steps cause a pause. Steps not matching the filter * execute immediately without pausing. * Accepts label globs or a predicate — same semantics as StepFilter elsewhere. */ filter?: StepFilter; } ``` ### `flow.stepper` A `StepperController` is written onto the builder instance when `.withManualStepping()` is called. ```typescript interface StepperController { /** Current lifecycle status. */ readonly status: "idle" | "paused" | "running" | "done"; /** Metadata for the currently paused step, if any. */ readonly pausedAt: StepMeta | undefined; /** * Release the paused step and run it. * Returns a Promise that resolves when the step body finishes. * Errors from the step body surface through flow.run(), not here. * Throws if status is not "paused". */ continue(): Promise; /** * Returns a Promise that resolves with StepMeta when the flow next pauses, * or null when the flow completes. Resolves immediately if already paused or done. */ waitUntilPaused(): Promise; } ``` ## Usage ### Explicit steps (Style A) Call `waitUntilPaused()` and `continue()` manually for fine-grained control. Awaiting `continue()` lets you inspect `shared` after the step body finishes. ```typescript const flow = new AppFlow() .withManualStepping() .then(fetchData, { label: "fetch" }) .then(transform, { label: "transform" }) .then(save, { label: "save" }); const done = flow.run(shared); await flow.stepper.waitUntilPaused(); console.log("paused at:", flow.stepper.pausedAt?.label); // "fetch" await flow.stepper.continue(); // runs fetch, resolves when done await flow.stepper.waitUntilPaused(); await flow.stepper.continue(); // runs transform await flow.stepper.waitUntilPaused(); await flow.stepper.continue(); // runs save await done; ``` ### Loop pattern (Style B) Use `waitUntilPaused()` in a `while` loop — the cleanest approach for automated stepping. The loop exits naturally when the flow finishes. ```typescript const done = flow.run(shared); let meta: StepMeta | null; while ((meta = await flow.stepper.waitUntilPaused()) !== null) { console.log("paused at:", meta.label, "| status:", shared); await flow.stepper.continue(); } await done; ``` ### Filter — pause only on specific steps Steps that don't match `filter` run immediately without pausing. Use this to gate only the expensive or sensitive steps (e.g. LLM calls) while letting I/O steps run freely. ```typescript const flow = new AppFlow() .withManualStepping({ filter: ["llm:*"] }) .then(loadContext, { label: "load" }) // runs freely .then(callLlm, { label: "llm:generate" }) // pauses .then(persistResult, { label: "persist" }); // runs freely const done = flow.run(shared); const meta = await flow.stepper.waitUntilPaused(); // loadContext has already run console.log("approve LLM call?", meta?.label); await flow.stepper.continue(); await done; ``` ### `onPause` callback Fires synchronously after `status` is set to `"paused"` but before the gate blocks. Use it for logging, notifications, or side-effecting inspection without modifying the drive loop. ```typescript const flow = new AppFlow() .withManualStepping({ onPause: (meta, shared) => { console.log( `[pause] step "${meta.label}" | keys: ${Object.keys(shared).join(", ")}`, ); }, }) .then(stepA, { label: "a" }) .then(stepB, { label: "b" }); ``` ### Human-in-the-loop approval gate ```typescript const flow = new AppFlow() .withManualStepping({ filter: ["llm:*"] }) .then(gatherContext, { label: "gather" }) .then(generateDraft, { label: "llm:draft" }) // pauses for review .then(refineDraft, { label: "llm:refine" }) // pauses for review .then(publishDraft, { label: "publish" }); const done = flow.run(shared); while ((await flow.stepper.waitUntilPaused()) !== null) { const { label } = flow.stepper.pausedAt!; const approved = await askUser(`Approve "${label}"?`); if (!approved) { done.catch(() => {}); break; } await flow.stepper.continue(); } await done; ``` ## Error Handling Step errors propagate through `flow.run()`, not through `continue()`. `continue()` always resolves once the step body has finished — whether it succeeded or threw. ```typescript const done = flow.run(shared); await flow.stepper.waitUntilPaused(); await flow.stepper.continue(); // resolves even if the step throws // Error surfaces here await done.catch((err) => console.error("step failed:", err)); ``` ## Graph Plugin Compatibility `withManualStepping` composes with `withGraph`. The DAG handler fires `wrapStep` per node internally, so the pause gate triggers once per graph node in topological order. ```typescript const GraphManualFlow = FlowBuilder.extend([withGraph, withManualStepping]); const flow = new GraphManualFlow() .withManualStepping() .addNode("fetch", fetchFn) .addNode("process", processFn) .addNode("save", saveFn) .addEdge("fetch", "process") .addEdge("process", "save") .compile(); const done = flow.run(shared); let meta: StepMeta | null; while ((meta = await flow.stepper.waitUntilPaused()) !== null) { console.log("node:", meta.label); // "fetch", "process", "save" await flow.stepper.continue(); } await done; ``` ## JsonFlowBuilder Compatibility Pass your extended `FlowClass` as the third argument to `JsonFlowBuilder.build()`, then call `.withManualStepping()` on the returned instance. ```typescript import { JsonFlowBuilder } from "flowneer/presets/config"; const ManualJsonFlow = FlowBuilder.extend([withManualStepping]); const flow = JsonFlowBuilder.build( config, registry, ManualJsonFlow as any, ) as InstanceType; flow.withManualStepping({ filter: ["llm:*"] }); const done = flow.run(shared); // ... drive with waitUntilPaused / continue ``` ## Composing with `withCheckpoint` Combine with `withCheckpoint` to save state at each pause point — useful for long-running human-in-the-loop flows where you want fault tolerance between approvals. ```typescript const AppFlow = FlowBuilder.extend([withCheckpoint, withManualStepping]); const flow = new AppFlow() .withCheckpoint({ save: (snap, meta) => db.save(snap, meta) }) .withManualStepping({ filter: ["llm:*"] }) .then(callLlm, { label: "llm:generate" }); ``` ## Notes * `status` transitions: `"idle"` → `"paused"` → `"running"` → `"idle"` (per step), then `"done"` after `afterFlow`. * Calling `continue()` when `status !== "paused"` throws immediately with a descriptive message. * `waitUntilPaused()` uses a one-shot listener — no polling, no race conditions. * The flow is genuinely suspended in-flight; the async call stack is preserved. There is no serialisation or replay. * `filter` follows the same `StepFilter` semantics as all other Flowneer plugins: string arrays support `*` wildcards, predicates receive `StepMeta`. --- --- url: /flowneer/plugins/memory/with-memory.md --- # withMemory Attaches a `Memory` instance to `shared.__memory` before the flow starts, making it available in all steps without manual wiring. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withMemory } from "flowneer/plugins/memory"; const AppFlow = FlowBuilder.extend([withMemory]); ``` ## Usage ```typescript import { BufferWindowMemory, withMemory } from "flowneer/plugins/memory"; const memory = new BufferWindowMemory({ maxMessages: 20 }); const flow = new AppFlow() .withMemory(memory) .startWith(async (s) => { await s.__memory!.add({ role: "user", content: s.input }); const ctx = await s.__memory!.toContext(); s.response = await callLlm(ctx); await s.__memory!.add({ role: "assistant", content: s.response }); }); ``` ## What It Does Registers a `beforeFlow` hook that sets `shared.__memory = memory` before any step runs. Memory instances are **stateful** and live outside the flow. The same `memory` object is shared across all flow runs, naturally accumulating conversation history across multiple `.run()` calls. ## Accepted Memory Types Any object implementing the `Memory` interface works: * [`BufferWindowMemory`](./buffer-window.md) — sliding window of recent messages * [`KVMemory`](./kv-memory.md) — key-value entity store * [`SummaryMemory`](./summary-memory.md) — compressing long-form memory * Custom implementations ## State Keys | Key | Direction | Description | | ---------- | -------------------- | ---------------------------- | | `__memory` | **Read** (your step) | The attached Memory instance | --- --- url: /flowneer/plugins/dev/mocks.md --- # withMocks Replaces step bodies at specific indices with mock functions, while letting all other steps run normally. Ideal for unit testing individual steps in isolation. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withMocks } from "flowneer/plugins/dev"; const AppFlow = FlowBuilder.extend([withMocks]); ``` ## Usage ```typescript const flow = new AppFlow() .startWith(fetchData) // step 0 .then(processData) // step 1 .then(saveToDb) // step 2 — will be mocked .then(sendNotification); // step 3 // In your test: const savedItems: any[] = []; flow.withMocks({ 2: async (s) => { // Mock step 2: capture what would have been saved savedItems.push(structuredClone(s.processed)); }, }); await flow.run(testState); expect(savedItems).toHaveLength(1); expect(savedItems[0]).toMatchObject({ id: "test-123" }); ``` ## API ### `.withMocks(map: Record>)` | Parameter | Type | Description | | --------- | ------------------------ | --------------------------------- | | `map` | `Record` | Map of step index → mock function | Steps not in `map` execute their real bodies normally. ## How It Works Registers a `wrapStep` hook. When a step's index is in `map`, the mock function is called instead of `next()`. When not in `map`, `next()` is called normally. ## Test Pattern ```typescript import { describe, it, expect, vi } from "vitest"; describe("myFlow", () => { it("calls the notification service with correct data", async () => { const notifySpy = vi.fn(); const flow = buildMyFlow(); const AppFlow = FlowBuilder.extend([withMocks]); const trackedFlow = new AppFlow(); trackedFlow.withMocks({ 3: notifySpy, // step 3 = sendNotification }); await flow.run({ ...testInitialState }); expect(notifySpy).toHaveBeenCalledOnce(); expect(notifySpy.mock.calls[0][0].result).toBe("expected value"); }); }); ``` ## Notes * Multiple calls to `withMocks` stack — later registrations override earlier ones for the same index. * Mock functions receive the same `(shared, params)` arguments as real steps. * Combine with `withDryRun` on all other steps for fully isolated unit tests. --- --- url: /flowneer/plugins/dev/perf-analyzer.md --- # withPerfAnalyzer Per-step heap, CPU, and GC profiling using only Node.js built-in performance APIs — zero external dependencies. Records wall-clock duration, CPU time, heap delta, RSS delta, and GC pause stats for every step, then writes a flow-level summary to `shared.__perfReport`. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withPerfAnalyzer } from "flowneer/plugins/dev"; const AppFlow = FlowBuilder.extend([withPerfAnalyzer]); ``` *** ## Basic usage ```typescript const flow = new AppFlow() .withPerfAnalyzer({ onReport: (r) => console.log(JSON.stringify(r, null, 2)), }) .then(fetchData, { label: "fetch" }) .then(callLlm, { label: "llm:generate" }) .then(saveResult, { label: "save" }); await flow.run(shared); // Per-step stats in execution order console.log(shared.__perfStats); // [{ label: "fetch", durationMs: 42, heapDeltaBytes: 131072, ... }, ...] // Flow summary console.log(shared.__perfReport.slowest?.label); // "llm:generate" console.log(shared.__perfReport.peakHeapUsedBytes); // 18874368 ``` *** ## `withPerfAnalyzer(options?, filter?)` | Parameter | Type | Description | | --------- | --------------------- | ----------------------------------------------------------------------------------------------------- | | `options` | `PerfAnalyzerOptions` | Profiling options (see below). | | `filter` | `StepFilter` | Only profile matching steps; others run without instrumentation. Supports label globs and predicates. | ### `PerfAnalyzerOptions` | Option | Type | Default | Description | | ---------- | ------------------------------ | ------- | ----------------------------------------------------------------------------------------------- | | `trackGc` | `boolean` | `true` | Accumulate GC pause events via `PerformanceObserver`. Disabled gracefully on non-Node runtimes. | | `onReport` | `(report: PerfReport) => void` | — | Called with the final `PerfReport` in `afterFlow`. Use to log or ship metrics. | *** ## `StepPerfStats` Each entry in `shared.__perfStats[]`: | Field | Type | Description | | -------------------- | --------- | -------------------------------------------------------------------------- | | `index` | `number` | Step index (0-based). | | `type` | `string` | Step type: `"fn"`, `"branch"`, `"loop"`, `"batch"`, `"parallel"`, `"dag"`. | | `label` | `string?` | Step label if set via `NodeOptions.label`. | | `durationMs` | `number` | Wall-clock duration (high-res via `performance.now()`). | | `cpuUserMs` | `number` | User-space CPU time consumed during this step (ms). | | `cpuSystemMs` | `number` | Kernel CPU time consumed during this step (ms). | | `heapUsedBefore` | `number` | V8 heap used at step start (bytes). | | `heapUsedAfter` | `number` | V8 heap used at step end (bytes). | | `heapDeltaBytes` | `number` | Net change in V8 heap (positive = allocated, negative = freed by GC). | | `rssDeltaBytes` | `number` | Net change in Resident Set Size (bytes). | | `externalDeltaBytes` | `number` | Net change in external (C++ / Buffer) memory (bytes). | | `gcCount` | `number` | GC events attributed to this step. Best-effort (see GC note below). | | `gcDurationMs` | `number` | Total GC pause duration attributed to this step (ms). Best-effort. | | `threw` | `boolean` | `true` if the step threw (stats still recorded via `finally`). | *** ## `PerfReport` Written to `shared.__perfReport` in `afterFlow`: | Field | Type | Description | | ------------------- | ----------------------- | ------------------------------------------------------------------------- | | `totalDurationMs` | `number` | Sum of all step `durationMs` (parallel steps overlap in wall-clock time). | | `totalCpuUserMs` | `number` | Sum of all step `cpuUserMs`. | | `totalCpuSystemMs` | `number` | Sum of all step `cpuSystemMs`. | | `totalGcDurationMs` | `number` | Authoritative total GC pause time for the whole flow (ms). | | `totalGcCount` | `number` | Total GC event count during the flow. | | `peakHeapUsedBytes` | `number` | Highest `heapUsedAfter` seen across all steps (bytes). | | `steps` | `StepPerfStats[]` | All per-step stats in execution order. | | `slowest` | `StepPerfStats \| null` | Step with the longest wall-clock duration, or `null` if no steps ran. | | `heaviest` | `StepPerfStats \| null` | Step with the largest heap delta, or `null` if no steps ran. | *** ## Filter — profile only specific steps ```typescript // Only profile LLM steps; all others run without instrumentation const flow = new AppFlow() .withPerfAnalyzer({}, ["llm:*"]) .then(loadContext, { label: "load" }) // not profiled .then(callLlm, { label: "llm:generate" }) // profiled .then(callEmbedding, { label: "llm:embed" }) // profiled .then(saveResult, { label: "save" }); // not profiled ``` *** ## State keys `withPerfAnalyzer` writes to two keys on `shared`. Extend `AugmentedState` to get them typed automatically: ```typescript import type { AugmentedState } from "flowneer"; interface MyState extends AugmentedState { topic: string; results: string[]; } // shared.__perfStats → StepPerfStats[] // shared.__perfReport → PerfReport ``` *** ## GC note `PerformanceObserver` fires asynchronously, so GC events received between two step boundaries are attributed to the step that just ended. This makes per-step `gcCount` / `gcDurationMs` best-effort. Use `__perfReport.totalGcDurationMs` for an authoritative flow-level GC measurement. GC tracking is silently disabled on runtimes that do not support `PerformanceObserver` (e.g. non-Node environments). Set `trackGc: false` to opt out explicitly. *** ## Graph plugin compatibility `wrapStep` fires once per graph node in topological order, so every DAG node gets its own `StepPerfStats` entry. ```typescript const GraphPerfFlow = FlowBuilder.extend([withGraph, withPerfAnalyzer]); const flow = new GraphPerfFlow() .withPerfAnalyzer({ onReport: (r) => console.log(r.slowest) }) .addNode("fetch", fetchFn) .addNode("process", processFn) .addNode("save", saveFn) .addEdge("fetch", "process") .addEdge("process", "save") .compile(); await flow.run(shared); ``` *** ## Composing with other dev plugins ```typescript const AppFlow = FlowBuilder.extend([withPerfAnalyzer, withDryRun]); const flow = new AppFlow() .withPerfAnalyzer() .withDryRun() // step bodies skipped, but hook overhead is still measured .then(fetchData, { label: "fetch" }) .then(callLlm, { label: "llm:generate" }); await flow.run(shared); // shared.__perfReport.totalDurationMs → hook overhead only ``` --- --- url: /flowneer/plugins/llm/rate-limit.md --- # withRateLimit Enforces a minimum interval between consecutive step executions. Useful when calling rate-limited APIs (LLM providers, external services, etc.) to avoid `429 Too Many Requests` errors. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withRateLimit } from "flowneer/plugins/llm"; const AppFlow = FlowBuilder.extend([withRateLimit]); ``` ## Usage ```typescript const flow = new AppFlow() .withRateLimit({ intervalMs: 1000 }) // at least 1 s between steps .startWith(callLlmStep) .then(callLlmStep) .then(callLlmStep); ``` ## Options | Option | Type | Required | Description | | ------------ | -------- | -------- | ------------------------------------------------------------------ | | `intervalMs` | `number` | ✅ | Minimum milliseconds between end of one step and start of the next | ## Behaviour * Measures elapsed time from when the **previous step ended**. * If the elapsed time is less than `intervalMs`, waits for the remainder. * The first step is never delayed. * Multiple `withRateLimit` registrations are stacked — both limits are applied. ## Example: Batching with Rate Limit ```typescript const flow = new AppFlow() .withRateLimit({ intervalMs: 500 }) // 2 requests/s max .batch( (s) => s.prompts, (b) => b.startWith(async (s) => { s.results.push(await callLlm(s.__batchItem as string)); }), ); ``` > **Note:** The rate limit applies between each step execution globally, not just between LLM calls. For per-call throttling inside a step, implement that in your LLM utility function. --- --- url: /flowneer/presets/agent/react-loop.md --- # withReActLoop Inserts a built-in [ReAct](https://arxiv.org/abs/2210.03629) (Reason + Act) agent loop into the flow. Automatically handles the `think → tool calls → observation → repeat` cycle until the agent signals it's done or the iteration limit is reached. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withReActLoop } from "flowneer/presets/agent"; import { withTools } from "flowneer/plugins/tools"; const AppFlow = FlowBuilder.extend([withTools, withReActLoop]); ``` ## Usage ```typescript import { z } from "zod"; interface AgentState { question: string; history: string[]; __toolResults?: any[]; __reactOutput?: string; __reactExhausted?: boolean; } const calculatorTool = { name: "calculator", description: "Evaluate a math expression", params: { expression: { type: "string" as const, description: "The expression to evaluate", }, }, execute: ({ expression }: { expression: string }) => eval(expression), }; const flow = new AppFlow() .withTools([calculatorTool]) .withReActLoop({ think: async (s) => { const response = await callLlm(buildReActPrompt(s)); // Parse tool calls from LLM response const toolCalls = parseToolCalls(response); if (toolCalls.length > 0) { return { action: "tool", calls: toolCalls }; } return { action: "finish", output: response }; }, maxIterations: 10, onObservation: async (results, s) => { s.history.push(`Tool results: ${JSON.stringify(results)}`); }, }) .then((s) => { if (s.__reactExhausted) { console.log("Agent hit iteration limit"); } else { console.log("Answer:", s.__reactOutput); } }); ``` ## Options | Option | Type | Default | Description | | --------------- | --------------------------------------------------------- | ------- | -------------------------------------- | | `think` | `(shared, params) => ThinkResult \| Promise` | — | The reasoning step | | `maxIterations` | `number` | `10` | Maximum think→act cycles | | `onObservation` | `(results, shared, params) => void \| Promise` | — | Called after each tool execution round | ## `ThinkResult` Type ```typescript type ThinkResult = | { action: "finish"; output?: unknown } // done — stop the loop | { action: "tool"; calls: ToolCall[] }; // call these tools and loop ``` ## State Keys | Key | Direction | Description | | ------------------- | ------------------- | -------------------------------------------------- | | `__tools` | Set by `withTools` | The `ToolRegistry` — required | | `__toolResults` | **Read** in `think` | Results from the last tool round | | `__reactIterations` | Internal | Running iteration count — reset each `.run()` call | | `__reactOutput` | **Read** after loop | The `output` from the final `{ action: "finish" }` | | `__reactExhausted` | **Read** after loop | `true` if `maxIterations` was reached | ## How It Works Behind the scenes, `withReActLoop` compiles down to: ``` .loop( (s) => !s.__reactFinished && (s.__reactIterations ?? 0) < maxIterations, (b) => b .startWith(increment __reactIterations; think → set __pendingToolCalls or __reactFinished) .then(execute tools from __pendingToolCalls → set __toolResults) ) .then(mark __reactExhausted if needed; delete __reactIterations) ``` The iteration counter lives on `shared.__reactIterations` (not in a closure), so the loop resets correctly when `.run()` is called multiple times on the same flow instance. ## Requires `withTools` `withReActLoop` expects `shared.__tools` to be a `ToolRegistry`. Always call `.withTools([...])` before `.withReActLoop()`. ## See Also * [`withTools`](../../plugins/tools/overview.md) — tool registry and execution * [Agent Patterns](./patterns.md) — multi-agent orchestration --- --- url: /flowneer/plugins/persistence/replay.md --- # withReplay Skips step execution for all steps before a given index. Use together with `withCheckpoint` to resume a flow from the last saved state after a crash or interruption. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withReplay } from "flowneer/plugins/persistence"; const AppFlow = FlowBuilder.extend([withCheckpoint, withReplay]); ``` ## Usage ```typescript // Normal run — save checkpoints const checkpoints = new Map(); const flow = new AppFlow() .withCheckpoint({ save: (i, s) => checkpoints.set(i, structuredClone(s)) }) .startWith(stepA) .then(stepB) .then(stepC) // crashes here .then(stepD); try { await flow.run(initialState); } catch { // Resume from last checkpoint const lastSaved = Math.max(...checkpoints.keys()); const savedState = checkpoints.get(lastSaved)!; // Skip steps 0..lastSaved, resume from lastSaved + 1 flow.withReplay(lastSaved + 1); await flow.run(savedState); } ``` ## How It Works Registers a `wrapStep` hook that skips (does not call `next()`) for any step whose index is less than `fromStep`. Steps at `fromStep` and above execute normally. `beforeStep` and `afterStep` hooks still fire for skipped steps — only the **body** is skipped. ## API ### `.withReplay(fromStep: number)` | Parameter | Type | Description | | ---------- | -------- | ------------------------------------------------- | | `fromStep` | `number` | The first step index that should actually execute | ## Combining with `humanNode` / `interruptIf` ```typescript import { resumeFlow } from "flowneer/plugins/agent"; // resumeFlow automatically applies withReplay internally: await resumeFlow(flow, savedState, { approved: true }, interruptedAtStep); ``` See [`resumeFlow`](../agent/human-node.md#resumeflow) for a complete human-in-the-loop example. --- --- url: /flowneer/plugins/compliance/runtime-compliance.md --- # withRuntimeCompliance Installs hook-based runtime compliance inspectors on a flow. Each inspector examines shared state immediately before a step executes and can throw, warn, or silently record violations. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withRuntimeCompliance } from "flowneer/plugins/compliance"; const AppFlow = FlowBuilder.extend([withRuntimeCompliance]); ``` ## Usage ```typescript import { withRuntimeCompliance, scanShared } from "flowneer/plugins/compliance"; const flow = new AppFlow() .then(fetchUser, { label: "pii:fetchUser" }) .then(callExternalApi, { label: "external:send" }); flow.withRuntimeCompliance([ { // Only fires before steps matching this filter filter: (meta) => meta.label?.startsWith("external:") ?? false, check: (shared) => { const hits = scanShared(shared, ["user.email", "user.phone"]); return hits.length > 0 ? `PII found before external call: ${hits.map((h) => h.path).join(", ")}` : null; }, onViolation: "throw", // default }, ]); await flow.run(shared); ``` ## `RuntimeInspector` ```typescript interface RuntimeInspector { /** If provided, only fires for steps matching this filter. Omit to fire for all steps. */ filter?: StepFilter; /** * Called before the step body executes. * Return a non-null string to signal a violation. * Return null to pass. */ check: (shared: S, meta: StepMeta) => string | null | Promise; /** Defaults to "throw". */ onViolation?: "throw" | "warn" | "record"; } ``` ## Violation actions | Action | Behaviour | | ------------------- | ------------------------------------------------------------------------------ | | `"throw"` (default) | Throws `ComplianceError` immediately, aborting the flow | | `"warn"` | Logs to `console.warn`, flow continues | | `"record"` | Appends `{ message, meta }` to `shared.__complianceViolations`, flow continues | ### `"record"` mode — collecting violations ```typescript interface State { __complianceViolations?: Array<{ message: string; meta: StepMeta }>; } const shared: State = {}; await flow.run(shared); if (shared.__complianceViolations?.length) { console.error("Compliance issues found:", shared.__complianceViolations); } ``` ## `ComplianceError` Thrown when `onViolation: "throw"` and the inspector returns a violation string. ```typescript import { ComplianceError } from "flowneer/plugins/compliance"; try { await flow.run(shared); } catch (err) { if (err instanceof ComplianceError) { console.error("Step:", err.meta.label, "—", err.message); } } ``` ## Default action Set a flow-level default so individual inspectors can omit `onViolation`: ```typescript flow.withRuntimeCompliance(inspectors, { defaultAction: "record" }); ``` ## PII helpers — `scanShared` `scanShared` is a detection-agnostic helper that walks a shared object and returns fields that match built-in PII patterns. ```typescript import { scanShared } from "flowneer/plugins/compliance"; const hits = scanShared(shared); // [{ path: "user.email", pattern: "email", value: "alice@example.com" }] // Scope the scan to specific key paths const scoped = scanShared(shared, ["user.email", "billing.phone"]); ``` Built-in patterns: `email`, `phone` (E.164 & NANP), `ssn`, `ipv4`, `creditCard`. ## State Keys | Key | Direction | Description | | ------------------------ | -------------------- | ------------------------------------------------------ | | `__complianceViolations` | **Written** (plugin) | Populated only when any inspector uses `"record"` mode | ## Notes * Inspectors are called in array order. All inspectors are checked even if an earlier one records a violation (for `"warn"/"record"` modes). For `"throw"`, the first violation aborts immediately. * `filter` accepts any [`StepFilter`](/core/plugins#stepfilter): string array or predicate. * Combine with [`withAuditFlow`](./audit-flow.md) for structural pre-flight checks before the flow runs. --- --- url: /flowneer/plugins/dev/step-limit.md --- # withStepLimit Throws if the total number of step executions in a single `.run()` call exceeds a configured maximum. A safety net for flows that use anchors and loops. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withStepLimit } from "flowneer/plugins/dev"; const AppFlow = FlowBuilder.extend([withStepLimit]); ``` ## Usage ```typescript const flow = new AppFlow() .withStepLimit(500) // abort if more than 500 steps execute .anchor("loop") .then(async (s) => { await processItem(s); if (!s.done) return "#loop"; }); ``` ## API ### `.withStepLimit(max?: number)` | Parameter | Type | Default | Description | | --------- | -------- | ------- | ------------------------------------- | | `max` | `number` | `1000` | Maximum total step executions per run | ## Error When exceeded: `"step limit exceeded: N > max"`. ## Counter Resets The counter resets at `beforeFlow`, so each `.run()` call starts from 0. ## Relationship with `withCycles` | Plugin | What it counts | | --------------- | ----------------------------------------- | | `withStepLimit` | Total step executions (including repeats) | | `withCycles` | Anchor jump events only | Use `withCycles` for anchor-specific safeguards and `withStepLimit` as a global execution ceiling. --- --- url: /flowneer/plugins/messaging/stream.md --- # withStream & emit() Push-based streaming for token-by-token or progress output. Register a subscriber with `.withStream()`, then call `emit(shared, chunk)` from any step to trigger it in real-time. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withStream, emit } from "flowneer/plugins/messaging"; const AppFlow = FlowBuilder.extend([withStream]); ``` ## Usage ```typescript import { emit } from "flowneer/plugins/messaging"; const flow = new AppFlow() .withStream((chunk) => { process.stdout.write(String(chunk)); }) .startWith(async (s) => { for await (const token of streamFromLlm(s.prompt)) { s.response += token; emit(s, token); // triggers the subscriber immediately } }); await flow.run({ prompt: "Tell me a story", response: "" }); ``` ## `.withStream(subscriber)` Plugin ```typescript type StreamSubscriber = (chunk: T) => void; .withStream(subscriber: StreamSubscriber): this ``` Stores `subscriber` on `shared.__stream` before the first step. Because it lives on `shared`, it is automatically inherited by all sub-flows (loop bodies, batch processors, etc.). ## `emit(shared, chunk)` Helper ```typescript function emit(shared: { __stream?: StreamSubscriber }, chunk: T): void; ``` A safe no-op when no subscriber is registered. Call it freely — it only executes if `.withStream()` was called. ## Multiple Subscribers Multiple calls to `.withStream()` replace the subscriber (only the last one is active at runtime). For multiple consumers, compose them in your subscriber: ```typescript .withStream((chunk) => { wsClient.send(chunk); logStream.write(chunk); }) ``` ## Comparison with `.stream()` | | `withStream` + `emit` | `FlowBuilder.stream()` | | ----------------- | -------------------------- | ----------------------------------------------------- | | API style | Push (subscriber callback) | Pull (async generator) | | Event granularity | Chunk only | `step:before`, `step:after`, `chunk`, `error`, `done` | | Use case | Simple token streaming | Full observability pipeline | Both work with generator step functions — `yield` in a generator step sends to whichever mechanism is active. ## Example: HTTP Server ```typescript Bun.serve({ async fetch(req) { const { prompt } = await req.json(); const shared = { prompt, response: "" }; const body = new ReadableStream({ async start(controller) { const enc = new TextEncoder(); flow.withStream((chunk) => controller.enqueue(enc.encode(String(chunk))), ); await flow.run(shared); controller.close(); }, }); return new Response(body, { headers: { "Content-Type": "text/plain; charset=utf-8" }, }); }, }); ``` --- --- url: /flowneer/plugins/llm/structured-output.md --- # withStructuredOutput Validates LLM output against a schema after each step. Retries on validation failure by storing the error on `shared.__validationError` so your LLM step can adapt its prompt. Works with **Zod**, **ArkType**, **Valibot**, or any object with a `.parse(input): T` method. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withStructuredOutput } from "flowneer/plugins/llm"; const AppFlow = FlowBuilder.extend([withStructuredOutput]); ``` ## Usage ```typescript import { z } from "zod"; const AnalysisSchema = z.object({ sentiment: z.enum(["positive", "negative", "neutral"]), confidence: z.number().min(0).max(1), summary: z.string(), }); type Analysis = z.infer; interface State { text: string; __llmOutput?: string; __structuredOutput?: Analysis; __validationError?: { message: string; raw: string; attempts: number }; } const flow = new AppFlow() .withStructuredOutput(AnalysisSchema, { retries: 2 }) .startWith(async (s) => { const errorHint = s.__validationError ? `\nPrevious attempt failed: ${s.__validationError.message}. Fix it.` : ""; s.__llmOutput = await callLlm( `Analyse the sentiment of: "${s.text}". Return JSON.${errorHint}`, ); }) .then((s) => { const result = s.__structuredOutput!; console.log(result.sentiment, result.confidence); }); ``` ## Options | Option | Type | Default | Description | | ----------- | -------------------------- | ---------------------- | ------------------------------------------------------------ | | `retries` | `number` | `1` | Total validation attempts (1 = no retry) | | `outputKey` | `string` | `"__llmOutput"` | Key on `shared` where the raw LLM output string is read from | | `resultKey` | `string` | `"__structuredOutput"` | Key on `shared` where the validated result is written | | `parse` | `(raw: string) => unknown` | `JSON.parse` | Pre-validator parse function | ## Behaviour After each step: 1. Reads `shared[outputKey]`. If absent, skips (step didn't produce output). 2. Runs `parse(raw)` to turn the raw string into a value. 3. Calls `validator.parse(value)`. 4. On success: stores the result on `shared[resultKey]` and clears `__validationError`. 5. On failure: if `retries > 1`, stores the error on `shared.__validationError` for the next step's prompt to consume. Exhausting all attempts leaves the error on shared but does **not** throw. ## State Keys | Key | Direction | Description | | -------------------- | --------------------- | ------------------------------------ | | `__llmOutput` | **Write** (your step) | Raw LLM response string | | `__structuredOutput` | **Read** (your step) | Validated typed result | | `__validationError` | **Read** (your step) | Error context from failed validation | ## Custom Parse Function Strip markdown fences before JSON-parsing: ````typescript .withStructuredOutput(MySchema, { parse: (raw) => { const match = raw.match(/```json\s*([\s\S]*?)```/); return JSON.parse(match ? match[1]! : raw); }, }) ```` Or use the built-in `parseJsonOutput` helper: ```typescript import { parseJsonOutput } from "flowneer/plugins/output"; .withStructuredOutput(MySchema, { parse: parseJsonOutput }) ``` --- --- url: /flowneer/plugins/resilience/timeout.md --- # withTimeout Applies a per-step wall-clock timeout to **every** step in the flow. If any step exceeds the limit it throws `"step N timed out after Xms"`. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withTimeout } from "flowneer/plugins/resilience"; const AppFlow = FlowBuilder.extend([withTimeout]); ``` ## Usage ```typescript const flow = new AppFlow() .withTimeout(5000) // 5 s per step max .startWith(callLlm) .then(processResult); ``` ## API ### `.withTimeout(ms: number)` | Parameter | Type | Description | | --------- | -------- | ----------------------------- | | `ms` | `number` | Maximum milliseconds per step | ## Per-Step Timeout For individual step timeouts without a global limit, use `NodeOptions.timeoutMs`: ```typescript .then(slowStep, { timeoutMs: 10_000 }) ``` Both mechanisms can coexist — the more restrictive limit wins. ## How It Works Uses `Promise.race` between the step execution and a `setTimeout` rejection. Registered as a `wrapStep` hook so it composes naturally with other plugins. ## Notes * The timeout applies to the **step body execution time** — hook overhead is not included. * Timed-out steps throw a plain `Error`, which is wrapped in a `FlowError` by the executor. * Combine with `retries` to retry timed-out steps: `.then(step, { retries: 2, timeoutMs: 3000 })`. --- --- url: /flowneer/plugins/observability/timing.md --- # withTiming Records the wall-clock duration of each step in `shared.__timings`, keyed by step index. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withTiming } from "flowneer/plugins/observability"; const AppFlow = FlowBuilder.extend([withTiming]); ``` ## Usage ```typescript interface State { data: any; __timings?: Record; } const flow = new AppFlow() .withTiming() .startWith(async (s) => { s.data = await fetchData(); }) .then(async (s) => { s.data = transform(s.data); }) .then(async (s) => { await save(s.data); }); const state: State = { data: null }; await flow.run(state); console.log(state.__timings); // { 0: 142, 1: 8, 2: 31 } ← milliseconds per step ``` ## State Keys | Key | Direction | Description | | ----------- | -------------------- | ------------------------------- | | `__timings` | **Read** (your step) | Map of `stepIndex → durationMs` | ## Accessing Timings ```typescript .then((s) => { const timings = s.__timings ?? {}; const total = Object.values(timings).reduce((a, b) => a + b, 0); console.log(`Total: ${total}ms`); for (const [step, ms] of Object.entries(timings)) { console.log(` Step ${step}: ${ms}ms`); } }) ``` ## Tips * Timings are in milliseconds (integer) via `Date.now()`. * Sub-flow steps (inside `.loop()`, `.batch()`) are tracked at their own step indices within the sub-flow — they do not appear in the outer flow's `__timings`. * Combine with `withHistory` to correlate timing data with state snapshots. --- --- url: /flowneer/plugins/llm/token-budget.md --- # withTokenBudget Aborts the flow before any step runs if the cumulative token usage has reached or exceeded a configured limit. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withTokenBudget } from "flowneer/plugins/llm"; const AppFlow = FlowBuilder.extend([withTokenBudget]); ``` ## Usage ```typescript interface State { prompt: string; response: string; tokensUsed: number; } const flow = new AppFlow() .withTokenBudget(100_000) // abort if tokensUsed >= 100 000 .startWith(async (s) => { const { text, usage } = await callLlmWithUsage(s.prompt); s.response = text; s.tokensUsed = (s.tokensUsed ?? 0) + usage.totalTokens; }) .then(async (s) => { // If this step runs, we know tokensUsed < 100 000 await summarize(s); }); await flow.run({ prompt: "...", response: "", tokensUsed: 0 }); ``` ## Behaviour * Before each step, reads `shared.tokensUsed ?? 0`. * If `tokensUsed >= limit`, throws `Error("token budget exceeded: {used} >= {limit}")`. * Wrapped in a `FlowError` like any other step error. ## State Keys | Key | Direction | Description | | ------------ | --------------------- | ------------------------------------------------ | | `tokensUsed` | **Write** (your step) | Running token count; your steps must update this | ## Combining with withCostTracker ```typescript const flow = new AppFlow() .withTokenBudget(50_000) // hard cap .withCostTracker() // accumulate dollar cost in __cost .startWith(async (s) => { const { text, usage } = await callLlm(s.prompt); s.response = text; s.tokensUsed = (s.tokensUsed ?? 0) + usage.totalTokens; s.__stepCost = usage.totalTokens * 0.000002; }); ``` --- --- url: /flowneer/plugins/tools/overview.md --- # withTools & ToolRegistry Register and execute tools (function-calling) in your flows. `withTools` attaches a `ToolRegistry` instance to `shared.__tools` before the flow starts. Steps call tools directly or via `withReActLoop`. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withTools } from "flowneer/plugins/tools"; const AppFlow = FlowBuilder.extend([withTools]); ``` ## Defining Tools ```typescript import type { Tool } from "flowneer/plugins/tools"; const calculatorTool: Tool = { name: "calculator", description: "Evaluate a mathematical expression and return the result", params: { expression: { type: "string", description: "A valid JavaScript math expression, e.g. '2 + 2 * 3'", required: true, }, }, execute: ({ expression }: { expression: string }) => { return Function(`"use strict"; return (${expression})`)(); }, }; const searchTool: Tool = { name: "web_search", description: "Search the web for up-to-date information", params: { query: { type: "string", description: "The search query" }, }, execute: async ({ query }: { query: string }) => { return fetchSearchResults(query); }, }; ``` ## Registering Tools ```typescript const flow = new AppFlow() .withTools([calculatorTool, searchTool]) .startWith(async (s) => { const registry = s.__tools!; const result = await registry.execute({ name: "calculator", args: { expression: "42 * 7" }, }); s.answer = result.result as number; }); ``` ## `ToolRegistry` API The registry is attached to `shared.__tools`: | Method | Signature | Description | | ------------- | ------------------------------- | ---------------------------------------- | | `get` | `(name) => Tool \| undefined` | Look up a tool by name | | `has` | `(name) => boolean` | Check if a tool is registered | | `names` | `() => string[]` | List all tool names | | `definitions` | `() => ToolDefinition[]` | OpenAI-compatible tool schema objects | | `execute` | `async (call) => ToolResult` | Execute a single tool call | | `executeAll` | `async (calls) => ToolResult[]` | Execute multiple tool calls concurrently | ## Helper Functions Import for use inside steps: ```typescript import { getTools, executeTool, executeTools } from "flowneer/plugins/tools"; // Inside a step: async (s) => { const result = await executeTool(s, { name: "calculator", args: { expression: "2+2" }, }); // result: { name: "calculator", result: 4 } const results = await executeTools(s, toolCalls); }; ``` ## `ToolResult` Type ```typescript interface ToolResult { callId?: string; // matches the call's id if provided name: string; result?: unknown; // present on success error?: string; // present on failure (tool errors don't throw) } ``` Tool errors are returned as `{ error }` rather than thrown — your step decides how to handle them. ## LLM Tool Schemas Use `registry.definitions()` to get OpenAI-compatible tool schemas for your LLM API call: ```typescript async (s) => { const tools = s.__tools!.definitions(); const response = await openai.chat.completions.create({ model: "gpt-4o", messages: buildMessages(s), tools: tools.map((t) => ({ type: "function", function: t })), }); // parse tool_calls from response... }; ``` ## See Also * [`withReActLoop`](../../presets/agent/react-loop.md) — automated ReAct agent loop with tools --- --- url: /flowneer/plugins/resilience/try-catch.md --- # withTryCatch Structured try / catch / finally blocks for flow steps. Wraps one or more steps in an exception-safe block without reaching for top-level error handlers. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withTryCatch } from "flowneer/plugins/resilience"; const AppFlow = FlowBuilder.extend([withTryCatch]); ``` ## Usage ```typescript import { FlowBuilder, fragment } from "flowneer"; import { withTryCatch } from "flowneer/plugins/resilience"; const AppFlow = FlowBuilder.extend([withTryCatch]); const flow = new AppFlow() .try(fragment().then(fetchData).then(processData)) .catch( fragment().then((s) => { console.error("Pipeline failed:", s.__tryError); s.result = "fallback"; }), ) .finally(fragment().then(cleanup)) .then(sendResult); ``` ## API ### `.try(fragment)` Executes all steps in `fragment`. If any step throws, control passes to the `.catch()` fragment (if registered), or the error propagates. ### `.catch(fragment)` Handles an error thrown inside the preceding `.try()`. The error is available on `shared.__tryError` before the fragment runs and is removed once the fragment completes. If the catch fragment also throws, the error propagates (and the `.finally()` fragment still runs). ### `.finally(fragment)` Always runs after the `.try()` (and optional `.catch()`), regardless of success or failure. Calling `.finally()` closes the try/catch block. > **Note:** `.catch()` and `.finally()` must be called **immediately** after `.try()` — no other `.then()` or builder calls can appear between them. > **InterruptError:** `InterruptError` (thrown by `withHumanNode` and flow abort signals) is never caught by `.try()/.catch()` — it propagates immediately to the caller just like it would outside a try block. ## `__tryError` context The caught error is stored on `shared.__tryError` inside the catch fragment: ```typescript .catch( fragment().then((s) => { const err = s.__tryError; // original Error or value that was thrown if (err instanceof Error) { s.errorMessage = err.message; } s.usedFallback = true; }), ) ``` `__tryError` is always the **original** thrown value. If Flowneer wrapped it in a `FlowError`, the unwrapped cause is exposed here. ## Nested blocks Try/catch blocks can be nested: ```typescript const AppFlow = FlowBuilder.extend([withTryCatch]); const flow = new AppFlow() .try( fragment() .try(fragment().then(riskyInner)) .catch( fragment().then((s) => { s.innerFailed = true; }), ) .then(continueFrag), ) .catch( fragment().then((s) => { s.outerFailed = true; }), ); ``` ## Example — fetch with recovery ```typescript import { FlowBuilder, fragment } from "flowneer"; import { withTryCatch } from "flowneer/plugins/resilience"; const AppFlow = FlowBuilder.extend([withTryCatch]); interface State { userId: string; profile: Record | null; fromCache: boolean; } const flow = new AppFlow() .try( fragment().then(async (s) => { const res = await fetch(`/api/users/${s.userId}`); if (!res.ok) throw new Error(`HTTP ${res.status}`); s.profile = await res.json(); }), ) .catch( fragment().then(async (s) => { console.warn("Live fetch failed, loading from cache:", s.__tryError); s.profile = await loadFromCache(s.userId); s.fromCache = true; }), ) .then((s) => { console.log("Profile ready, fromCache:", s.fromCache); }); ``` --- --- url: /flowneer/plugins/observability/verbose.md --- # withVerbose Prints the full `shared` state to `stdout` after every step. The simplest way to debug a flow during development. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withVerbose } from "flowneer/plugins/observability"; const AppFlow = FlowBuilder.extend([withVerbose]); ``` ## Usage ```typescript const flow = new AppFlow() .withVerbose() .startWith(async (s) => { s.step1 = "done"; }) .then(async (s) => { s.step2 = "done"; }); await flow.run({}); // [flowneer] step 0 (fn): { // "step1": "done" // } // [flowneer] step 1 (fn): { // "step1": "done", // "step2": "done" // } ``` ## Output Format ``` [flowneer] step {index} ({type}): {JSON.stringify(shared, null, 2)} ``` ## Tips * Only enable during development — log output can be large with complex state. * For production tracing use [`withTiming`](./timing.md), [`withHistory`](./history.md), or [`TelemetryDaemon`](../telemetry/overview.md). * Combine with `withDryRun` to see hook execution without running step bodies. --- --- url: /flowneer/plugins/persistence/versioned-checkpoint.md --- # withVersionedCheckpoint Saves diff-based versioned checkpoints after each step. Each checkpoint records only the **keys that changed** from the previous step along with a parent pointer, making it efficient for long flows with large state objects. ## Setup ```typescript import { FlowBuilder } from "flowneer"; import { withVersionedCheckpoint } from "flowneer/plugins/persistence"; const AppFlow = FlowBuilder.extend([withVersionedCheckpoint]); ``` ## Store Interface ```typescript interface VersionedCheckpointEntry { version: string; // auto-assigned e.g. "v1", "v2" stepIndex: number; diff: Partial; // only changed keys parentVersion: string | null; // null for the first checkpoint timestamp: number; // Unix ms } interface VersionedCheckpointStore { save(entry: VersionedCheckpointEntry): void | Promise; resolve( version: string, ): | { stepIndex: number; snapshot: S } | Promise<{ stepIndex: number; snapshot: S }>; } ``` ## Usage ```typescript // In-memory versioned store const versions = new Map(); const snapshots = new Map(); const store: VersionedCheckpointStore = { save(entry) { versions.set(entry.version, entry); // Rebuild full snapshot from parent chain for resolve() let snap = {}; if (entry.parentVersion) { snap = { ...snapshots.get(entry.parentVersion)?.snapshot }; } Object.assign(snap, entry.diff); snapshots.set(entry.version, { stepIndex: entry.stepIndex, snapshot: snap, }); }, resolve(version) { return snapshots.get(version)!; }, }; const flow = new AppFlow() .withVersionedCheckpoint(store) .startWith(stepA) .then(stepB) .then(stepC); await flow.run(initialState); ``` ## Resuming from a Version ```typescript // resumeFrom skips all steps up to and including the saved stepIndex flow.resumeFrom("v2", store); await flow.run({ ...restoredSnapshot }); ``` `.resumeFrom(version, store)` resolves the saved step index on the first step execution and skips all steps up to and including that index. ## Notes * Only checkpoints are saved when **something actually changed** in shared state. * Diffs compare JSON serializations — non-JSON-serializable values are not supported. * Version IDs (`"v1"`, `"v2"`, ...) are assigned by a global counter inside the plugin. In production, override with a UUID-based `save` implementation. --- --- url: /flowneer/core/plugins.md --- # Writing Plugins Plugins extend `FlowBuilder` with new methods by registering **lifecycle hooks**. They are the primary extension mechanism in Flowneer. ## Plugin Shape A plugin is an object whose keys become methods on `FlowBuilder.prototype`: ```typescript import type { FlowneerPlugin, PluginContext } from "flowneer"; export const myPlugin: FlowneerPlugin = { myMethod(this: PluginContext, arg: string) { this._setHooks({ beforeStep: (meta, shared) => { console.log(`[${arg}] step ${meta.index} starting`); }, }); return this; // always return `this` for chaining }, }; ``` ## Registering a Plugin ```typescript import { FlowBuilder } from "flowneer"; import { myPlugin } from "./myPlugin"; const AppFlow = FlowBuilder.extend([myPlugin]); // Now available on all AppFlow instances: new AppFlow().myMethod("prefix").startWith(step); ``` ## TypeScript Declaration Merging Add the method to the `FlowBuilder` interface so TypeScript knows about it: ```typescript declare module "flowneer" { interface FlowBuilder { myMethod(arg: string): this; } } ``` Place this in the same file as your plugin, or in a `*.d.ts` file. ## Available Lifecycle Hooks Registered via `this._setHooks(hooks)`: | Hook | Signature | Called | | ---------------- | -------------------------------------------------------- | ---------------------------------------------- | | `beforeFlow` | `(shared, params) => void` | Once before the first step | | `afterFlow` | `(shared, params) => void` | Once after the last step | | `beforeStep` | `(meta, shared, params) => void` | Before each step body | | `afterStep` | `(meta, shared, params) => void` | After each step body | | `wrapStep` | `(meta, next, shared, params) => Promise` | Wraps step execution — call `next()` to run it | | `wrapParallelFn` | `(meta, fnIndex, next, shared, params) => Promise` | Wraps each function in a `.parallel()` step | | `onError` | `(meta, error, shared, params) => void` | Called when a step throws | ### `wrapStep` — Middleware `wrapStep` is the most powerful hook. It wraps step execution so you can run code **before and after**, handle errors, or skip steps entirely (dry-run, mock, etc.). ```typescript wrapStep: async (meta, next, shared, params) => { console.log("before", meta.index); try { await next(); // ← executes the step body } catch (err) { console.error("step failed", err); throw err; // re-throw to propagate } console.log("after", meta.index); }; ``` Omit `next()` to skip the step: ```typescript wrapStep: async (_meta, _next) => { // dry-run: step body never runs }; ``` Multiple `wrapStep` registrations are **composed innermost-first** — the last registered wraps the outermost. ## Multiple Hook Registrations Calling `_setHooks` multiple times stacks — each call adds a new entry. The `beforeFlow`/`afterStep`/etc. handlers all run in registration order. ## StepFilter — scoping hooks to specific steps Step-scoped hooks (`beforeStep`, `afterStep`, `onError`, `wrapStep`, `wrapParallelFn`) accept an optional `StepFilter` as the **second argument** to `_setHooks()`. Hooks without a filter run on every step. `beforeFlow` and `afterFlow` are unaffected by filters. ```typescript type StepFilter = string[] | ((meta: StepMeta) => boolean); ``` ### String array — label matching with glob wildcards Pass an array of step labels. The `*` character is a glob wildcard: `"llm:*"` matches `"llm:summarise"`, `"llm:embed"`, etc. Steps that have no label are **never matched** by a positive string filter. ```typescript (this as any)._setHooks( { beforeStep: (meta, shared) => { console.log("LLM step starting:", meta.label); }, }, ["llm:*", "embed:*"], // only fires for steps whose label matches ); ``` ### Negation — exclude steps with `!` Prefix any pattern with `!` to exclude matching steps. **Negation veto always wins** over a positive match in the same array. ```typescript // Negation-only — apply everywhere except human-in-loop steps this._setHooks({ wrapStep: rateLimiter }, ["!human:*"]); // Mixed — apply to llm steps but never human steps this._setHooks({ wrapStep: rateLimiter }, ["!human:*", "llm:*"]); // Negation veto beats a matching wildcard in the same array this._setHooks( { beforeStep: log }, ["!llm:generate", "llm:*"], // fires on all llm:* except llm:generate ); ``` **Unlabelled steps and negation:** negation patterns require a label to match against. An unlabelled step cannot be vetoed, so it is included by a negation-only filter (but still excluded by a positive-pattern filter). ### Predicate — runtime condition or multi-criteria logic Pass a function that receives `StepMeta` and returns `true` to match: ```typescript this._setHooks( { wrapStep: rateLimitedWrap }, (meta) => meta.label?.startsWith("llm:") ?? false, ); ``` ### Auto-`next()` for unmatched `wrapStep` / `wrapParallelFn` When a `wrapStep` or `wrapParallelFn` hook is filtered out for a particular step, Flowneer automatically calls `next()` on its behalf. **The middleware chain is never broken** by a filter. ### `addHooks(hooks, filter?)` — dynamic hook registration End users (outside of a plugin) can register hooks at runtime via `addHooks`: ```typescript const dispose = flow.addHooks( { beforeStep: (meta) => console.log("->", meta.label) }, ["llm:*"], ); await flow.run(shared); dispose(); // removes the hooks ``` `addHooks` accepts the same `StepFilter` second argument as `_setHooks` and returns a `dispose()` function to remove the registered hooks. ## Complete Plugin Example ```typescript import { FlowBuilder } from "flowneer"; import type { FlowneerPlugin, PluginContext, StepMeta } from "flowneer"; declare module "flowneer" { interface FlowBuilder { withRetryLog(prefix?: string): this; } } export const withRetryLog: FlowneerPlugin = { withRetryLog(this: PluginContext, prefix = "[retry]") { this._setHooks({ onError: (meta: StepMeta, err: unknown) => { console.warn( `${prefix} step ${meta.index} (${meta.type}) failed:`, err instanceof Error ? err.message : err, ); }, }); return this; }, }; // Usage: const AppFlow = FlowBuilder.extend([withRetryLog]); const flow = new AppFlow(); flow.withRetryLog("MyApp").startWith(riskyStep, { retries: 3 }); ```