Batch Document Processing
Process a large collection of documents: extract structured data from each one in parallel batches, collect results with a reducer, then aggregate into a final report. Demonstrates .batch(), nested .parallel(), withStructuredOutput, and safe concurrent writes.
Plugins used: withStructuredOutput (LLM), .batch() + .parallel() (core)
The code
typescript
import "dotenv/config";
import { FlowBuilder } from "flowneer";
import { withStructuredOutput } from "flowneer/plugins/llm";
import { withRateLimit } from "flowneer/plugins/llm";
import { callLlm } from "./utils/callLlm";
const AppFlow = FlowBuilder.extend([withStructuredOutput, withRateLimit]);
// ─── Types ───────────────────────────────────────────────────────────────────
interface Document {
id: string;
text: string;
}
interface ExtractedData {
documentId: string;
summary: string;
sentiment: "positive" | "neutral" | "negative";
keyTopics: string[];
wordCount: number;
}
interface ProcessingState {
documents: Document[];
results: ExtractedData[];
report: string;
__batchItem?: Document; // injected by .batch()
__llmOutput?: string; // used by withStructuredOutput
}
// ─── Extraction prompt ───────────────────────────────────────────────────────
function extractionPrompt(doc: Document) {
return (
`Analyse the following document and return a JSON object with these fields:\n` +
`- summary: string (1-2 sentences)\n` +
`- sentiment: "positive" | "neutral" | "negative"\n` +
`- keyTopics: string[] (up to 5 topics)\n\n` +
`Document:\n${doc.text}\n\n` +
`Return only valid JSON.`
);
}
// ─── Inner flow — processes one document ─────────────────────────────────────
const extractFlow = new AppFlow<ProcessingState>()
.withRateLimit({ requestsPerMinute: 30 })
.startWith(async (s) => {
const doc = s.__batchItem!;
s.__llmOutput = await callLlm(extractionPrompt(doc));
})
.withStructuredOutput({
parse: (raw) => JSON.parse(raw),
retries: 2,
onSuccess: (parsed, s) => {
const doc = s.__batchItem!;
s.results.push({
documentId: doc.id,
wordCount: doc.text.split(/\s+/).length,
...parsed,
});
},
});
// ─── Outer flow — batches all documents, then aggregates ─────────────────────
const pipeline = new AppFlow<ProcessingState>()
.startWith((s) => {
s.results = [];
})
// Process all documents via the inner flow, one per batch item
.batch(
(s) => s.documents,
(b) => b.add(extractFlow),
)
// Aggregate results into a report
.then(async (s) => {
const pos = s.results.filter((r) => r.sentiment === "positive").length;
const neg = s.results.filter((r) => r.sentiment === "negative").length;
const neu = s.results.filter((r) => r.sentiment === "neutral").length;
const allTopics = s.results.flatMap((r) => r.keyTopics);
const topicFreq = allTopics.reduce<Record<string, number>>((acc, t) => {
acc[t] = (acc[t] ?? 0) + 1;
return acc;
}, {});
const topTopics = Object.entries(topicFreq)
.sort((a, b) => b[1] - a[1])
.slice(0, 10)
.map(([t]) => t);
s.report = [
`# Document Processing Report`,
``,
`**Total documents:** ${s.results.length}`,
`**Sentiment:** ${pos} positive / ${neu} neutral / ${neg} negative`,
`**Top topics:** ${topTopics.join(", ")}`,
``,
`## Summaries`,
...s.results.map((r) => `- **${r.documentId}** — ${r.summary}`),
].join("\n");
});
// ─── Run ─────────────────────────────────────────────────────────────────────
const documents: Document[] = [
{
id: "doc-1",
text: "TypeScript 5.0 was released with exciting new features...",
},
{
id: "doc-2",
text: "The new machine learning framework struggled with performance...",
},
{
id: "doc-3",
text: "Open source contributors celebrated a major milestone today...",
},
// Add as many as you need — .batch() processes them sequentially,
// or switch to .parallel() for concurrent processing (see variation below)
];
const state: ProcessingState = {
documents,
results: [],
report: "",
};
await pipeline.run(state);
console.log(state.report);Variation — parallel processing
Replace .batch() with .parallel() to process all documents concurrently. Use a reducer to safely merge results back:
typescript
.parallel(
documents.map((doc) => async (s: ProcessingState) => {
s.__batchItem = doc;
await extractFlow.run(s);
}),
undefined,
// Reducer: merge results arrays from each isolated draft
(shared, drafts) => {
shared.results = drafts.flatMap((d) => d.results);
},
)Rate limits
Running many documents in parallel increases your LLM request rate significantly. Make sure withRateLimit is configured before enabling this.
Variation — chunked parallel batches
Process in chunks of N concurrently using nested .batch() + .parallel():
typescript
// chunk the documents array first
const chunkSize = 5;
const chunks: Document[][] = [];
for (let i = 0; i < documents.length; i += chunkSize) {
chunks.push(documents.slice(i, i + chunkSize));
}
// outer batch over chunks, inner parallel over each chunk
const pipeline = new AppFlow<ProcessingState>()
.startWith((s) => {
s.results = [];
})
.batch(
() => chunks,
(b) =>
b
.startWith((s) => {
// parallel within the chunk
})
.parallel(
(s.__batchItem as Document[]).map((doc) => async (s) => {
/* extract doc */
}),
undefined,
(shared, drafts) => {
shared.results.push(...drafts.flatMap((d) => d.results));
},
),
);