Resilient API Pipeline
Fetch data from a flaky external API with automatic retries, a per-step timeout, a circuit breaker to stop hammering a dead endpoint, and a fallback to cached data when everything fails.
Plugins used: withCircuitBreaker, withTimeout, withFallback (resilience), step-level retries + delaySec
The code
import "dotenv/config";
import { FlowBuilder } from "flowneer";
import {
withCircuitBreaker,
withTimeout,
withFallback,
} from "flowneer/plugins/resilience";
import { withTiming } from "flowneer/plugins/observability";
const AppFlow = FlowBuilder.extend([
withCircuitBreaker,
withFallback,
withTiming,
]);
// ─── State ───────────────────────────────────────────────────────────────────
interface PipelineState {
productId: string;
productData: Record<string, unknown> | null;
enrichedData: Record<string, unknown> | null;
finalRecord: Record<string, unknown> | null;
fromCache: boolean;
__timings?: Record<string, number>;
}
// ─── Helpers ─────────────────────────────────────────────────────────────────
async function fetchProduct(id: string) {
const res = await fetch(`https://api.example.com/products/${id}`, {
signal: AbortSignal.timeout(3000),
});
if (!res.ok) throw new Error(`HTTP ${res.status}`);
return res.json();
}
async function fetchCachedProduct(id: string) {
// Fall back to a local cache / DB
return { id, name: "Cached Product", price: 0, fromCache: true };
}
async function enrichProduct(product: Record<string, unknown>) {
const res = await fetch(
`https://api.enrich.example.com/enrich?id=${product.id}`,
);
if (!res.ok) throw new Error(`Enrich HTTP ${res.status}`);
return { ...product, ...(await res.json()) };
}
// ─── Flow ────────────────────────────────────────────────────────────────────
const pipeline = new AppFlow<PipelineState>()
.withTiming()
// Step 1 — Fetch product, retry up to 3×, 1 s delay, 5 s timeout
// Circuit breaker trips after 5 consecutive failures across all runs
.withCircuitBreaker({ failureThreshold: 5, resetTimeoutMs: 60_000 })
.withFallback(async (s) => {
// Circuit open or all retries exhausted — use cache
s.productData = (await fetchCachedProduct(s.productId)) as any;
s.fromCache = true;
})
.startWith(
async (s) => {
s.productData = await fetchProduct(s.productId);
s.fromCache = false;
},
{ retries: 3, delaySec: 1, timeoutMs: 5_000 },
)
// Step 2 — Enrich (skip if we're working from cache)
.then(
async (s) => {
if (s.fromCache) return; // no point enriching stale data
s.enrichedData = await enrichProduct(s.productData!);
},
{ retries: 2, delaySec: 0.5, timeoutMs: 4_000 },
)
// Step 3 — Normalise into a final record
.then(async (s) => {
const base = s.enrichedData ?? s.productData ?? {};
s.finalRecord = {
...base,
processedAt: new Date().toISOString(),
source: s.fromCache ? "cache" : "live",
};
});
// ─── Run ─────────────────────────────────────────────────────────────────────
const state: PipelineState = {
productId: "prod_123",
productData: null,
enrichedData: null,
finalRecord: null,
fromCache: false,
};
await pipeline.run(state);
console.log("Final record:", state.finalRecord);
console.log("Step timings (ms):", state.__timings);
if (state.fromCache) console.warn("Warning: data served from cache.");Resilience layers explained
Step-level retries + delay
Pass { retries: 3, delaySec: 1 } as the second argument to .startWith() or .then(). Flowneer retries the step up to 3 times total (not 3 additional — 1 attempt + 2 retries) with a 1 s gap.
.startWith(fetchFn, { retries: 3, delaySec: 1 })Per-step timeout
timeoutMs wraps the step in a Promise.race against a rejection timer. If the step exceeds the limit, Flowneer throws StepTimeoutError (which triggers the retry/fallback chain).
.then(slowFn, { timeoutMs: 5_000 })Circuit breaker
withCircuitBreaker tracks failures across all flow runs. After failureThreshold consecutive failures the circuit trips to open — subsequent runs skip the protected step(s) entirely (triggering the fallback) until resetTimeoutMs elapses.
.withCircuitBreaker({ failureThreshold: 5, resetTimeoutMs: 60_000 })Fallback
.withFallback(fn) runs fn whenever any step in the flow throws an unhandled error — after all retries are exhausted and the circuit is open. Use it to write safe recovery logic.
Variation — timeout at the flow level
To abort the entire pipeline after a wall-clock deadline, pass an AbortSignal:
const controller = new AbortController();
setTimeout(() => controller.abort(), 10_000); // 10 s total budget
await pipeline.run(state, {}, { signal: controller.signal });