# Testing

Test stream handlers with context mocks or validate runtime frame delivery with harnesses.

---
Canonical: /handbook/blocks/stream-pattern/stream-testing/
Source: web/src/content/handbook-cards/blocks/stream-pattern/stream-testing.mdx
Format: Markdown for agents
---

Streams have two testing levels:

| Level | API | Validates |
|---|---|---|
| **Handler test** | `createStreamContextMock(...)` | Chunk generation, final payload, branching, cancellation |
| **Runtime test** | `createStreamTestHarness(...)` | Before/after guards, frame emission, SSE behavior |

## Handler test

Use the context mock to test your stream function in isolation:

```typescript [liveMetricsStream.test.ts]
import { describe, test, expect, vi } from 'vitest'
import { createStreamContextMock, safeBind } from '@purista/core'
import { liveMetricsStreamBuilder } from './liveMetricsStream.js'
import { pingV1Service } from '../pingV1Service.js'

describe('liveMetrics stream', () => {
  test('emits chunks and a final frame', async () => {
    const { context, writer } = createStreamContextMock(liveMetricsStreamBuilder, {
      payload: { metricType: 'cpu' },
      parameter: { intervalMs: 100 },
    })

    const streamFn = safeBind(liveMetricsStreamBuilder.getStreamFunction(), pingV1Service)
    await streamFn(context, { metricType: 'cpu' }, { intervalMs: 100 }, writer)

    // The mock writer records all calls
    expect(writer.write.mock.calls.length).toBeGreaterThan(0)
    expect(writer.close.mock.calls).toHaveLength(1)

    const finalCall = writer.close.mock.calls[0][0]
    expect(finalCall.dataPoints).toBeGreaterThan(0)
    expect(finalCall.average).toBeDefined()
  })

  test('handles cancellation gracefully', async () => {
    const { context, writer } = createStreamContextMock(liveMetricsStreamBuilder, {
      payload: { metricType: 'cpu' },
      parameter: { intervalMs: 100 },
    })

    // Simulate cancellation after first chunk
    let callCount = 0
    const originalWrite = writer.write
    writer.write = vi.fn(async (chunk) => {
      callCount++
      if (callCount >= 1) {
        writer.cancelled = true
      }
      return originalWrite(chunk)
    })

    const streamFn = safeBind(liveMetricsStreamBuilder.getStreamFunction(), pingV1Service)
    await streamFn(context, { metricType: 'cpu' }, { intervalMs: 100 }, writer)

    // Should have stopped early
    expect(writer.write.mock.calls.length).toBe(1)
  })
})
```

The mock writer provides:
- `writer.write` — mock function recording all chunk writes
- `writer.close` — mock function recording the final frame
- `writer.fail` — mock function recording errors
- `writer.onCancel` — registers cancellation callbacks
- `writer.cancelled` — boolean you can set to simulate cancellation

## Testing upstream stream consumption

When your stream consumes another stream, test the frame handling logic:

```typescript [aiResponseStream.test.ts]
test('forwards upstream chunks and final', async () => {
  const { context, writer, stubs } = createStreamContextMock(aiResponseStreamBuilder, {
    payload: { prompt: 'Hello' },
    parameter: { model: 'gpt-4' },
  })

  // Stub the upstream stream
  stubs.stream.AiService['1'].generateTokens = vi.fn().mockReturnValue({
    async *[Symbol.asyncIterator]() {
      yield { payload: { frameType: 'chunk', chunk: { token: 'Hello' } } }
      yield { payload: { frameType: 'chunk', chunk: { token: ' world' } } }
      yield { payload: { frameType: 'complete', final: { totalTokens: 2 } } }
    },
  })

  const streamFn = safeBind(aiResponseStreamBuilder.getStreamFunction(), chatV1Service)
  await streamFn(context, { prompt: 'Hello' }, { model: 'gpt-4' }, writer)

  expect(writer.write).toHaveBeenCalledTimes(2)
  expect(writer.close).toHaveBeenCalledWith({ totalTokens: 2 })
})
```

## Runtime test

Use the harness when you need to verify the full stream runtime:

```typescript [liveMetricsStream.test.ts]
import { createStreamTestHarness } from '@purista/core'

test('full runtime: emits frames through event bridge', async () => {
  const harness = await createStreamTestHarness(pingV1Service, liveMetricsStreamBuilder)

  try {
    const result = await harness.run({
      payload: { metricType: 'cpu' },
      parameter: { intervalMs: 100 },
    })

    expect(result.frames.length).toBeGreaterThan(0)
    expect(result.chunks.length).toBeGreaterThan(0)
    expect(result.final).toBeDefined()
  } finally {
    await harness.destroy()
  }
})
```

The runtime harness:
- Starts a real service with the stream registered
- Runs the full pipeline including validation and guards
- Collects all frames, chunks, and the final payload
- Requires `await harness.destroy()` to clean up

## Testing guards in isolation

```typescript [liveMetricsStream.test.ts]
test('before guard rejects unauthorized requests', async () => {
  const guard = liveMetricsStreamBuilder.getBeforeGuardHook('requireAuth')

  const { context } = createStreamContextMock(liveMetricsStreamBuilder, {
    payload: { metricType: 'cpu' },
  })

  // Simulate missing principalId
  context.message.principalId = undefined

  await expect(
    guard!(context, { metricType: 'cpu' }, {})
  ).rejects.toThrow('Authentication required')
})
```

## Which level should you use?

| Scenario | Recommended level |
|---|---|
| Chunk generation logic | Handler test |
| Final payload calculation | Handler test |
| Cancellation handling | Handler test |
| Upstream stream consumption | Handler test |
| Validation behavior | Runtime test |
| Guard integration | Runtime test |
| SSE frame format | Runtime test |
