# The Stream Builder

Define typed stream payloads, chunk/final schemas, guards, and SSE endpoints.

---
Canonical: /handbook/blocks/stream-pattern/stream-builder/
Source: web/src/content/handbook-cards/blocks/stream-pattern/stream-builder.mdx
Format: Markdown for agents
---

The **stream builder** collects every aspect of a stream — schemas, transforms, guards, and the stream function — into a single typed definition. That definition is attached to a [service builder](/handbook/service/service-builder/) and registered at the event bridge when the service starts.

## Schemas

Streams have four schemas: payload, parameter, chunk, and final.

```typescript [liveMetricsStream.ts]
import { z } from 'zod'

const payloadSchema = z.object({
  metricType: z.enum(['cpu', 'memory', 'disk']),
})

const parameterSchema = z.object({
  intervalMs: z.number().default(1000),
})

const chunkSchema = z.object({
  timestamp: z.number(),
  value: z.number(),
})

const finalSchema = z.object({
  dataPoints: z.number(),
  average: z.number(),
})
```

| Method | Validated when |
|---|---|
| `addPayloadSchema(schema)` | Before stream function starts |
| `addParameterSchema(schema)` | Before stream function starts |
| `addChunkSchema(schema, validateChunks?)` | On every `writer.write(chunk)` |
| `addFinalSchema(schema, validateFinal?)` | On `writer.close(final)` |

The `validateChunks` and `validateFinal` parameters default to `true`. You can disable per-chunk validation for hot paths, but keep the schema defined for type inference and documentation.

## The stream function

```typescript [liveMetricsStream.ts]
export const liveMetricsStreamBuilder = pingV1ServiceBuilder
  .getStreamBuilder('liveMetrics', 'Stream live system metrics')
  .addPayloadSchema(payloadSchema)
  .addParameterSchema(parameterSchema)
  .addChunkSchema(chunkSchema, true)
  .addFinalSchema(finalSchema, true)
  .setStreamFunction(async function (context, payload, parameter, writer) {
    const collector = context.resources.metricsCollector
    const readings = []

    writer.onCancel((reason) => {
      context.logger.info({ reason }, 'Stream cancelled')
    })

    for (let i = 0; i < 60; i++) {
      if (writer.cancelled) break
      const reading = await collector.read(payload.metricType)
      readings.push(reading)
      await writer.write({ timestamp: Date.now(), value: reading })
      await new Promise(r => setTimeout(r, parameter.intervalMs))
    }

    const average = readings.reduce((a, b) => a + b, 0) / readings.length
    await writer.close({ dataPoints: readings.length, average })
  })
```

The function signature is `(context, payload, parameter, writer) => Promise<void>`. Unlike commands, streams do not return a value — they communicate through the writer.

<div class="callout callout--warning">
  <div class="callout__title">Use async function, not arrow functions</div>
  <p>Just like commands, stream functions must use <code>async function</code> to preserve <code>this</code> binding to the service instance.</p>
</div>

## Guards and transforms

Streams support the same guard and transform hooks as commands:

```typescript [liveMetricsStream.ts]
.setBeforeGuardHooks({
  requireAuth: async function (context, payload, parameter) {
    if (!context.message.principalId) {
      throw new HandledError(StatusCode.Unauthorized, 'Authentication required')
    }
  },
})
.setAfterGuardHooks({
  audit: async function (context, result, originalPayload, originalParameter) {
    context.logger.info({ metricType: originalPayload.metricType }, 'Metrics stream completed')
  },
})
```

Before guards run in parallel. After guards run after the stream function completes (either through `writer.close()` or `writer.fail()`).

## HTTP/SSE exposure

Use `.exposeAsHttpStreamEndpoint()` to expose the stream via HTTP with Server-Sent Events:

```typescript [liveMetricsStream.ts]
.exposeAsHttpStreamEndpoint('GET', '/api/v1/metrics/live')
```

The HTTP adapter:
- Sets response `content-type` to `text/event-stream`
- Sends frames as SSE events (`start`, `chunk`, `complete`, `error`, `cancel`)
- Includes the stream frame schema and endpoint metadata in OpenAPI

You can also set the streaming mode:

```typescript [liveMetricsStream.ts]
.setHttpStreamingMode('stream') // incremental chunks (default)
// or
.setHttpStreamingMode('aggregate') // buffer and return final only
```

## Consuming streams from commands

Commands can declare stream consumption capabilities:

```typescript [reportCommand.ts]
.canConsumeStream(
  'AnalyticsService',
  '1',
  'liveMetrics',
  z.object({ timestamp: z.number(), value: z.number() }), // chunk schema
  z.object({ metricType: z.string() }), // payload schema
  z.object({}), // parameter schema
  z.object({ summary: z.string() }), // final schema
)
.setCommandFunction(async function (context, payload, parameter) {
  const handle = await context.stream.AnalyticsService['1'].liveMetrics(
    { metricType: 'cpu' },
    {},
  )

  for await (const frame of handle) {
    if (frame.payload.frameType === 'chunk' && frame.payload.chunk) {
      // process chunk
    }
    if (frame.payload.frameType === 'complete') {
      // process final
      break
    }
  }
})
```

## Chunk aggregation

Enable automatic chunk aggregation to build a `{ chunkCount, chunks[] }` final payload when no explicit final is provided:

```typescript [liveMetricsStream.ts]
.enableChunkAggregation(true)
```

With this enabled, calling `writer.close()` without arguments produces `{ chunkCount: number, chunks: Chunk[] }`.

## Full example

```typescript [aiResponseStream.ts]
import { z } from 'zod'
import { StatusCode, HandledError } from '@purista/core'
import { chatServiceV1ServiceBuilder } from '../chatService.js'

const tokenChunkSchema = z.object({ token: z.string() })
const finalSchema = z.object({ totalTokens: z.number() })

export const aiResponseStreamBuilder = chatServiceV1ServiceBuilder
  .getStreamBuilder('generateResponse', 'Stream AI-generated text tokens')
  .addPayloadSchema(z.object({ prompt: z.string().min(1) }))
  .addParameterSchema(z.object({ model: z.string().default('gpt-4') }))
  .addChunkSchema(tokenChunkSchema, true)
  .addFinalSchema(finalSchema, true)
  .canConsumeStream(
    'AiService', '1', 'generateTokens',
    tokenChunkSchema,
    z.object({ prompt: z.string() }),
    z.object({ model: z.string() }),
    finalSchema,
  )
  .setBeforeGuardHooks({
    checkQuota: async function (context, payload, parameter) {
      const { quota } = await context.states.getState(`quota:${context.message.principalId}`)
      if (quota !== undefined && quota <= 0) {
        throw new HandledError(StatusCode.TooManyRequests, 'Token quota exceeded')
      }
    },
  })
  .setStreamFunction(async function (context, payload, parameter, writer) {
    const upstream = await context.stream.AiService['1'].generateTokens(payload, parameter)

    writer.onCancel((reason) => {
      context.logger.info({ reason, prompt: payload.prompt.slice(0, 50) }, 'AI stream cancelled')
    })

    for await (const frame of upstream) {
      if (writer.cancelled) break

      if (frame.payload.frameType === 'chunk' && frame.payload.chunk) {
        await writer.write(frame.payload.chunk)
      }

      if (frame.payload.frameType === 'complete') {
        await writer.close(frame.payload.final)
        return
      }

      if (frame.payload.frameType === 'error') {
        await writer.fail(new Error(frame.payload.error?.message || 'Upstream error'))
        return
      }
    }
  })
```
