Core Building Blocks / Stream

The Stream Builder

Define typed stream payloads, chunk/final schemas, guards, and SSE endpoints.

The stream builder collects every aspect of a stream — schemas, transforms, guards, and the stream function — into a single typed definition. That definition is attached to a service builder and registered at the event bridge when the service starts.

Schemas

Streams have four schemas: payload, parameter, chunk, and final.

import { z } from 'zod'

const payloadSchema = z.object({
  metricType: z.enum(['cpu', 'memory', 'disk']),
})

const parameterSchema = z.object({
  intervalMs: z.number().default(1000),
})

const chunkSchema = z.object({
  timestamp: z.number(),
  value: z.number(),
})

const finalSchema = z.object({
  dataPoints: z.number(),
  average: z.number(),
})
MethodValidated when
addPayloadSchema(schema)Before stream function starts
addParameterSchema(schema)Before stream function starts
addChunkSchema(schema, validateChunks?)On every writer.write(chunk)
addFinalSchema(schema, validateFinal?)On writer.close(final)

The validateChunks and validateFinal parameters default to true. You can disable per-chunk validation for hot paths, but keep the schema defined for type inference and documentation.

The stream function

export const liveMetricsStreamBuilder = pingV1ServiceBuilder
  .getStreamBuilder('liveMetrics', 'Stream live system metrics')
  .addPayloadSchema(payloadSchema)
  .addParameterSchema(parameterSchema)
  .addChunkSchema(chunkSchema, true)
  .addFinalSchema(finalSchema, true)
  .setStreamFunction(async function (context, payload, parameter, writer) {
    const collector = context.resources.metricsCollector
    const readings = []

    writer.onCancel((reason) => {
      context.logger.info({ reason }, 'Stream cancelled')
    })

    for (let i = 0; i < 60; i++) {
      if (writer.cancelled) break
      const reading = await collector.read(payload.metricType)
      readings.push(reading)
      await writer.write({ timestamp: Date.now(), value: reading })
      await new Promise(r => setTimeout(r, parameter.intervalMs))
    }

    const average = readings.reduce((a, b) => a + b, 0) / readings.length
    await writer.close({ dataPoints: readings.length, average })
  })

The function signature is (context, payload, parameter, writer) => Promise<void>. Unlike commands, streams do not return a value — they communicate through the writer.

Use async function, not arrow functions

Just like commands, stream functions must use async function to preserve this binding to the service instance.

Guards and transforms

Streams support the same guard and transform hooks as commands:

.setBeforeGuardHooks({
  requireAuth: async function (context, payload, parameter) {
    if (!context.message.principalId) {
      throw new HandledError(StatusCode.Unauthorized, 'Authentication required')
    }
  },
})
.setAfterGuardHooks({
  audit: async function (context, result, originalPayload, originalParameter) {
    context.logger.info({ metricType: originalPayload.metricType }, 'Metrics stream completed')
  },
})

Before guards run in parallel. After guards run after the stream function completes (either through writer.close() or writer.fail()).

HTTP/SSE exposure

Use .exposeAsHttpStreamEndpoint() to expose the stream via HTTP with Server-Sent Events:

.exposeAsHttpStreamEndpoint('GET', '/api/v1/metrics/live')

The HTTP adapter:

  • Sets response content-type to text/event-stream
  • Sends frames as SSE events (start, chunk, complete, error, cancel)
  • Includes the stream frame schema and endpoint metadata in OpenAPI

You can also set the streaming mode:

.setHttpStreamingMode('stream') // incremental chunks (default)
// or
.setHttpStreamingMode('aggregate') // buffer and return final only

Consuming streams from commands

Commands can declare stream consumption capabilities:

.canConsumeStream(
  'AnalyticsService',
  '1',
  'liveMetrics',
  z.object({ timestamp: z.number(), value: z.number() }), // chunk schema
  z.object({ metricType: z.string() }), // payload schema
  z.object({}), // parameter schema
  z.object({ summary: z.string() }), // final schema
)
.setCommandFunction(async function (context, payload, parameter) {
  const handle = await context.stream.AnalyticsService['1'].liveMetrics(
    { metricType: 'cpu' },
    {},
  )

  for await (const frame of handle) {
    if (frame.payload.frameType === 'chunk' && frame.payload.chunk) {
      // process chunk
    }
    if (frame.payload.frameType === 'complete') {
      // process final
      break
    }
  }
})

Chunk aggregation

Enable automatic chunk aggregation to build a { chunkCount, chunks[] } final payload when no explicit final is provided:

.enableChunkAggregation(true)

With this enabled, calling writer.close() without arguments produces { chunkCount: number, chunks: Chunk[] }.

Full example

import { z } from 'zod'
import { StatusCode, HandledError } from '@purista/core'
import { chatServiceV1ServiceBuilder } from '../chatService.js'

const tokenChunkSchema = z.object({ token: z.string() })
const finalSchema = z.object({ totalTokens: z.number() })

export const aiResponseStreamBuilder = chatServiceV1ServiceBuilder
  .getStreamBuilder('generateResponse', 'Stream AI-generated text tokens')
  .addPayloadSchema(z.object({ prompt: z.string().min(1) }))
  .addParameterSchema(z.object({ model: z.string().default('gpt-4') }))
  .addChunkSchema(tokenChunkSchema, true)
  .addFinalSchema(finalSchema, true)
  .canConsumeStream(
    'AiService', '1', 'generateTokens',
    tokenChunkSchema,
    z.object({ prompt: z.string() }),
    z.object({ model: z.string() }),
    finalSchema,
  )
  .setBeforeGuardHooks({
    checkQuota: async function (context, payload, parameter) {
      const { quota } = await context.states.getState(`quota:${context.message.principalId}`)
      if (quota !== undefined && quota <= 0) {
        throw new HandledError(StatusCode.TooManyRequests, 'Token quota exceeded')
      }
    },
  })
  .setStreamFunction(async function (context, payload, parameter, writer) {
    const upstream = await context.stream.AiService['1'].generateTokens(payload, parameter)

    writer.onCancel((reason) => {
      context.logger.info({ reason, prompt: payload.prompt.slice(0, 50) }, 'AI stream cancelled')
    })

    for await (const frame of upstream) {
      if (writer.cancelled) break

      if (frame.payload.frameType === 'chunk' && frame.payload.chunk) {
        await writer.write(frame.payload.chunk)
      }

      if (frame.payload.frameType === 'complete') {
        await writer.close(frame.payload.final)
        return
      }

      if (frame.payload.frameType === 'error') {
        await writer.fail(new Error(frame.payload.error?.message || 'Upstream error'))
        return
      }
    }
  })