Core Building Blocks / Stream

Testing

Test stream handlers with context mocks or validate runtime frame delivery with harnesses.

Streams have two testing levels:

LevelAPIValidates
Handler testcreateStreamContextMock(...)Chunk generation, final payload, branching, cancellation
Runtime testcreateStreamTestHarness(...)Before/after guards, frame emission, SSE behavior

Handler test

Use the context mock to test your stream function in isolation:

import { describe, test, expect, vi } from 'vitest'
import { createStreamContextMock, safeBind } from '@purista/core'
import { liveMetricsStreamBuilder } from './liveMetricsStream.js'
import { pingV1Service } from '../pingV1Service.js'

describe('liveMetrics stream', () => {
  test('emits chunks and a final frame', async () => {
    const { context, writer } = createStreamContextMock(liveMetricsStreamBuilder, {
      payload: { metricType: 'cpu' },
      parameter: { intervalMs: 100 },
    })

    const streamFn = safeBind(liveMetricsStreamBuilder.getStreamFunction(), pingV1Service)
    await streamFn(context, { metricType: 'cpu' }, { intervalMs: 100 }, writer)

    // The mock writer records all calls
    expect(writer.write.mock.calls.length).toBeGreaterThan(0)
    expect(writer.close.mock.calls).toHaveLength(1)

    const finalCall = writer.close.mock.calls[0][0]
    expect(finalCall.dataPoints).toBeGreaterThan(0)
    expect(finalCall.average).toBeDefined()
  })

  test('handles cancellation gracefully', async () => {
    const { context, writer } = createStreamContextMock(liveMetricsStreamBuilder, {
      payload: { metricType: 'cpu' },
      parameter: { intervalMs: 100 },
    })

    // Simulate cancellation after first chunk
    let callCount = 0
    const originalWrite = writer.write
    writer.write = vi.fn(async (chunk) => {
      callCount++
      if (callCount >= 1) {
        writer.cancelled = true
      }
      return originalWrite(chunk)
    })

    const streamFn = safeBind(liveMetricsStreamBuilder.getStreamFunction(), pingV1Service)
    await streamFn(context, { metricType: 'cpu' }, { intervalMs: 100 }, writer)

    // Should have stopped early
    expect(writer.write.mock.calls.length).toBe(1)
  })
})

The mock writer provides:

  • writer.write — mock function recording all chunk writes
  • writer.close — mock function recording the final frame
  • writer.fail — mock function recording errors
  • writer.onCancel — registers cancellation callbacks
  • writer.cancelled — boolean you can set to simulate cancellation

Testing upstream stream consumption

When your stream consumes another stream, test the frame handling logic:

test('forwards upstream chunks and final', async () => {
  const { context, writer, stubs } = createStreamContextMock(aiResponseStreamBuilder, {
    payload: { prompt: 'Hello' },
    parameter: { model: 'gpt-4' },
  })

  // Stub the upstream stream
  stubs.stream.AiService['1'].generateTokens = vi.fn().mockReturnValue({
    async *[Symbol.asyncIterator]() {
      yield { payload: { frameType: 'chunk', chunk: { token: 'Hello' } } }
      yield { payload: { frameType: 'chunk', chunk: { token: ' world' } } }
      yield { payload: { frameType: 'complete', final: { totalTokens: 2 } } }
    },
  })

  const streamFn = safeBind(aiResponseStreamBuilder.getStreamFunction(), chatV1Service)
  await streamFn(context, { prompt: 'Hello' }, { model: 'gpt-4' }, writer)

  expect(writer.write).toHaveBeenCalledTimes(2)
  expect(writer.close).toHaveBeenCalledWith({ totalTokens: 2 })
})

Runtime test

Use the harness when you need to verify the full stream runtime:

import { createStreamTestHarness } from '@purista/core'

test('full runtime: emits frames through event bridge', async () => {
  const harness = await createStreamTestHarness(pingV1Service, liveMetricsStreamBuilder)

  try {
    const result = await harness.run({
      payload: { metricType: 'cpu' },
      parameter: { intervalMs: 100 },
    })

    expect(result.frames.length).toBeGreaterThan(0)
    expect(result.chunks.length).toBeGreaterThan(0)
    expect(result.final).toBeDefined()
  } finally {
    await harness.destroy()
  }
})

The runtime harness:

  • Starts a real service with the stream registered
  • Runs the full pipeline including validation and guards
  • Collects all frames, chunks, and the final payload
  • Requires await harness.destroy() to clean up

Testing guards in isolation

test('before guard rejects unauthorized requests', async () => {
  const guard = liveMetricsStreamBuilder.getBeforeGuardHook('requireAuth')

  const { context } = createStreamContextMock(liveMetricsStreamBuilder, {
    payload: { metricType: 'cpu' },
  })

  // Simulate missing principalId
  context.message.principalId = undefined

  await expect(
    guard!(context, { metricType: 'cpu' }, {})
  ).rejects.toThrow('Authentication required')
})

Which level should you use?

ScenarioRecommended level
Chunk generation logicHandler test
Final payload calculationHandler test
Cancellation handlingHandler test
Upstream stream consumptionHandler test
Validation behaviorRuntime test
Guard integrationRuntime test
SSE frame formatRuntime test