Core Building Blocks / Stream
The Stream Builder
Define typed stream payloads, chunk/final schemas, guards, and SSE endpoints.
The stream builder collects every aspect of a stream — schemas, transforms, guards, and the stream function — into a single typed definition. That definition is attached to a service builder and registered at the event bridge when the service starts.
Schemas
Streams have four schemas: payload, parameter, chunk, and final.
import { z } from 'zod'
const payloadSchema = z.object({
metricType: z.enum(['cpu', 'memory', 'disk']),
})
const parameterSchema = z.object({
intervalMs: z.number().default(1000),
})
const chunkSchema = z.object({
timestamp: z.number(),
value: z.number(),
})
const finalSchema = z.object({
dataPoints: z.number(),
average: z.number(),
})
| Method | Validated when |
|---|---|
addPayloadSchema(schema) | Before stream function starts |
addParameterSchema(schema) | Before stream function starts |
addChunkSchema(schema, validateChunks?) | On every writer.write(chunk) |
addFinalSchema(schema, validateFinal?) | On writer.close(final) |
The validateChunks and validateFinal parameters default to true. You can disable per-chunk validation for hot paths, but keep the schema defined for type inference and documentation.
The stream function
export const liveMetricsStreamBuilder = pingV1ServiceBuilder
.getStreamBuilder('liveMetrics', 'Stream live system metrics')
.addPayloadSchema(payloadSchema)
.addParameterSchema(parameterSchema)
.addChunkSchema(chunkSchema, true)
.addFinalSchema(finalSchema, true)
.setStreamFunction(async function (context, payload, parameter, writer) {
const collector = context.resources.metricsCollector
const readings = []
writer.onCancel((reason) => {
context.logger.info({ reason }, 'Stream cancelled')
})
for (let i = 0; i < 60; i++) {
if (writer.cancelled) break
const reading = await collector.read(payload.metricType)
readings.push(reading)
await writer.write({ timestamp: Date.now(), value: reading })
await new Promise(r => setTimeout(r, parameter.intervalMs))
}
const average = readings.reduce((a, b) => a + b, 0) / readings.length
await writer.close({ dataPoints: readings.length, average })
})
The function signature is (context, payload, parameter, writer) => Promise<void>. Unlike commands, streams do not return a value — they communicate through the writer.
Just like commands, stream functions must use async function to preserve this binding to the service instance.
Guards and transforms
Streams support the same guard and transform hooks as commands:
.setBeforeGuardHooks({
requireAuth: async function (context, payload, parameter) {
if (!context.message.principalId) {
throw new HandledError(StatusCode.Unauthorized, 'Authentication required')
}
},
})
.setAfterGuardHooks({
audit: async function (context, result, originalPayload, originalParameter) {
context.logger.info({ metricType: originalPayload.metricType }, 'Metrics stream completed')
},
})
Before guards run in parallel. After guards run after the stream function completes (either through writer.close() or writer.fail()).
HTTP/SSE exposure
Use .exposeAsHttpStreamEndpoint() to expose the stream via HTTP with Server-Sent Events:
.exposeAsHttpStreamEndpoint('GET', '/api/v1/metrics/live')
The HTTP adapter:
- Sets response
content-typetotext/event-stream - Sends frames as SSE events (
start,chunk,complete,error,cancel) - Includes the stream frame schema and endpoint metadata in OpenAPI
You can also set the streaming mode:
.setHttpStreamingMode('stream') // incremental chunks (default)
// or
.setHttpStreamingMode('aggregate') // buffer and return final only
Consuming streams from commands
Commands can declare stream consumption capabilities:
.canConsumeStream(
'AnalyticsService',
'1',
'liveMetrics',
z.object({ timestamp: z.number(), value: z.number() }), // chunk schema
z.object({ metricType: z.string() }), // payload schema
z.object({}), // parameter schema
z.object({ summary: z.string() }), // final schema
)
.setCommandFunction(async function (context, payload, parameter) {
const handle = await context.stream.AnalyticsService['1'].liveMetrics(
{ metricType: 'cpu' },
{},
)
for await (const frame of handle) {
if (frame.payload.frameType === 'chunk' && frame.payload.chunk) {
// process chunk
}
if (frame.payload.frameType === 'complete') {
// process final
break
}
}
})
Chunk aggregation
Enable automatic chunk aggregation to build a { chunkCount, chunks[] } final payload when no explicit final is provided:
.enableChunkAggregation(true)
With this enabled, calling writer.close() without arguments produces { chunkCount: number, chunks: Chunk[] }.
Full example
import { z } from 'zod'
import { StatusCode, HandledError } from '@purista/core'
import { chatServiceV1ServiceBuilder } from '../chatService.js'
const tokenChunkSchema = z.object({ token: z.string() })
const finalSchema = z.object({ totalTokens: z.number() })
export const aiResponseStreamBuilder = chatServiceV1ServiceBuilder
.getStreamBuilder('generateResponse', 'Stream AI-generated text tokens')
.addPayloadSchema(z.object({ prompt: z.string().min(1) }))
.addParameterSchema(z.object({ model: z.string().default('gpt-4') }))
.addChunkSchema(tokenChunkSchema, true)
.addFinalSchema(finalSchema, true)
.canConsumeStream(
'AiService', '1', 'generateTokens',
tokenChunkSchema,
z.object({ prompt: z.string() }),
z.object({ model: z.string() }),
finalSchema,
)
.setBeforeGuardHooks({
checkQuota: async function (context, payload, parameter) {
const { quota } = await context.states.getState(`quota:${context.message.principalId}`)
if (quota !== undefined && quota <= 0) {
throw new HandledError(StatusCode.TooManyRequests, 'Token quota exceeded')
}
},
})
.setStreamFunction(async function (context, payload, parameter, writer) {
const upstream = await context.stream.AiService['1'].generateTokens(payload, parameter)
writer.onCancel((reason) => {
context.logger.info({ reason, prompt: payload.prompt.slice(0, 50) }, 'AI stream cancelled')
})
for await (const frame of upstream) {
if (writer.cancelled) break
if (frame.payload.frameType === 'chunk' && frame.payload.chunk) {
await writer.write(frame.payload.chunk)
}
if (frame.payload.frameType === 'complete') {
await writer.close(frame.payload.final)
return
}
if (frame.payload.frameType === 'error') {
await writer.fail(new Error(frame.payload.error?.message || 'Upstream error'))
return
}
}
})