Core Building Blocks / Stream
What is a Stream?
Long-running request/response handlers that emit multiple frames for incremental delivery and SSE.
A stream is a long-running request/response handler that emits multiple frames (start, chunk, complete, error, cancel) instead of a single payload. Use streams for incremental delivery, token-by-token AI responses, progress feeds, or any flow that maps well to Server-Sent Events (SSE).
Think of a stream as a command that returns a sequence of partial results rather than one final result. The client connects, receives chunks as they become available, and the connection closes when the stream emits a terminal frame.
Unlike a command which returns a single response, a stream emits a sequence of frames. If you need fire-and-forget behavior, use a subscription instead.
Anatomy of a stream
A stream has four contracts:
| Contract | Method | Purpose |
|---|---|---|
| Payload | addPayloadSchema() | The main input body |
| Parameter | addParameterSchema() | Query/path params and metadata |
| Chunk | addChunkSchema() | The shape of each intermediate frame |
| Final | addFinalSchema() | The shape of the terminal frame |
import { z } from 'zod'
import { pingV1ServiceBuilder } from './pingService.js'
const liveMetricsStreamBuilder = pingV1ServiceBuilder
.getStreamBuilder('liveMetrics', 'Stream live system metrics')
.addPayloadSchema(z.object({ metricType: z.enum(['cpu', 'memory', 'disk']) }))
.addParameterSchema(z.object({ intervalMs: z.number().default(1000) }))
.addChunkSchema(z.object({ timestamp: z.number(), value: z.number() }))
.addFinalSchema(z.object({ dataPoints: z.number(), average: z.number() }))
.setStreamFunction(async function (context, payload, parameter, writer) {
const collector = context.resources.metricsCollector
const readings = []
writer.onCancel((reason) => {
context.logger.info({ reason }, 'Stream cancelled by client')
})
for (let i = 0; i < 60; i++) {
if (writer.cancelled) break
const reading = await collector.read(payload.metricType)
readings.push(reading)
await writer.write({ timestamp: Date.now(), value: reading })
await new Promise(r => setTimeout(r, parameter.intervalMs))
}
const average = readings.reduce((a, b) => a + b, 0) / readings.length
await writer.close({ dataPoints: readings.length, average })
})
Every stream must end with writer.close(final), writer.fail(error), or be cancelled. Clients wait for the terminal frame to know the stream is complete. Forgetting to close a stream leaves clients hanging.
Stream lifecycle
flowchart LR
A[Client Opens Stream] --> B[Input Validation]
B --> C[Before Guards]
C --> D[Stream Function Starts]
D --> E[writer.write chunk]
E --> F{More data?}
F -->|Yes| E
F -->|No| G[writer.close final]
D -.->|Cancel| H[writer.cancelled = true]
D -.->|Error| I[writer.fail error]
B -.->|Invalid| J[Bad Request]
C -.->|Denied| K[Unauthorized]
- Client opens stream — via HTTP SSE, cross-service call, or direct invocation.
- Input validation — payload and parameter schemas are validated.
- Before guards — auth and preconditions run in parallel.
- Stream function executes — your handler receives
context,payload,parameter, andwriter. - Chunks are emitted — call
writer.write(chunk)for each partial result. - Terminal frame — call
writer.close(final)when done,writer.fail(error)on error, or handle cancellation.
The stream writer
The writer is your interface to the stream consumer:
| Method | Purpose |
|---|---|
writer.write(chunk) | Emit a chunk frame |
writer.close(final?) | Emit a complete frame and close the stream |
writer.fail(error) | Emit an error frame and close the stream |
writer.onCancel(cb) | Register a callback for when the client cancels |
writer.cancelled | Boolean — true if the client has cancelled |
.setStreamFunction(async function (context, payload, parameter, writer) {
const upstream = await context.stream.AiService['1'].generateTokens(payload, parameter)
writer.onCancel((reason) => {
context.logger.info({ reason }, 'Client cancelled the stream')
})
for await (const frame of upstream) {
if (writer.cancelled) {
context.logger.info('Stopping early due to cancellation')
break
}
if (frame.payload.frameType === 'chunk' && frame.payload.chunk) {
await writer.write(frame.payload.chunk)
}
if (frame.payload.frameType === 'complete') {
await writer.close(frame.payload.final)
return
}
if (frame.payload.frameType === 'error') {
await writer.fail(new Error(frame.payload.error?.message || 'Upstream error'))
return
}
}
})
Frame types
PURISTA streams use these frame types:
| Frame | Meaning |
|---|---|
open | Internal control frame for stream initialization |
start | Stream has started, initial metadata may be included |
chunk | Partial data payload |
complete | Stream finished successfully, may include final payload |
error | An error occurred |
cancel | Client or server cancelled the stream |
heartbeat | Keep-alive frame to prevent timeouts |
The stream function context
The stream context is identical to the command context — it includes message, logger, resources, secrets, configs, states, emit, service, stream, and queue:
.setStreamFunction(async function (context, payload, parameter, writer) {
// context.message — the StreamOpenRequest with trace info
context.logger.info({ metricType: payload.metricType }, 'Starting metrics stream')
// context.resources — service resources
const collector = context.resources.metricsCollector
// context.service — invoke commands in other services
const config = await context.service.ConfigService['1'].getMetricConfig({ type: payload.metricType }, {})
// context.stream — consume other streams
// context.emit — emit events
// context.queue — enqueue jobs
})
Scaffold a new stream
Use the CLI to generate boilerplate:
purista add stream
This creates:
schema.ts— payload, parameter, chunk, and final schemasindex.ts— the stream builder with function implementationindex.test.ts— Vitest specs
When to use streams
- You need incremental progress/results instead of one response body.
- Clients expect long-lived connections (SSE) with keep-alive frames.
- You are streaming AI tokens, telemetry updates, or chunked file conversions.
When NOT to use streams
- The operation is short and returns a single result. Use a command instead.
- The work should continue even if the caller disconnects. Use a queue instead.
- You need to react to events without returning a result. Use a subscription.
Common pitfalls
- Forgetting to emit a terminal frame. Handlers must call
writer.close(),writer.fail(), or handle cancellation. - Mixing queue-style async work into streams. Use queues when consumers pull jobs.
- Exposing streams via REST without
text/event-stream. Use.exposeAsHttpStreamEndpoint()which sets the correct content type. - Not checking
writer.cancelledin loops. Clients disconnect; your handler should stop producing data.
Checklist
- Input, chunk, and final schemas are defined.
- Stream function handles
writer.cancelledandwriter.onCancel. - Every code path calls
writer.close(),writer.fail(), or breaks on cancellation. - SSE exposure uses
.exposeAsHttpStreamEndpoint(). - Unit tests verify chunk generation and terminal frame delivery.
- Integration tests simulate client disconnects.