Core Building Blocks / Stream
Testing
Test stream handlers with context mocks or validate runtime frame delivery with harnesses.
Streams have two testing levels:
| Level | API | Validates |
|---|---|---|
| Handler test | createStreamContextMock(...) | Chunk generation, final payload, branching, cancellation |
| Runtime test | createStreamTestHarness(...) | Before/after guards, frame emission, SSE behavior |
Handler test
Use the context mock to test your stream function in isolation:
import { describe, test, expect, vi } from 'vitest'
import { createStreamContextMock, safeBind } from '@purista/core'
import { liveMetricsStreamBuilder } from './liveMetricsStream.js'
import { pingV1Service } from '../pingV1Service.js'
describe('liveMetrics stream', () => {
test('emits chunks and a final frame', async () => {
const { context, writer } = createStreamContextMock(liveMetricsStreamBuilder, {
payload: { metricType: 'cpu' },
parameter: { intervalMs: 100 },
})
const streamFn = safeBind(liveMetricsStreamBuilder.getStreamFunction(), pingV1Service)
await streamFn(context, { metricType: 'cpu' }, { intervalMs: 100 }, writer)
// The mock writer records all calls
expect(writer.write.mock.calls.length).toBeGreaterThan(0)
expect(writer.close.mock.calls).toHaveLength(1)
const finalCall = writer.close.mock.calls[0][0]
expect(finalCall.dataPoints).toBeGreaterThan(0)
expect(finalCall.average).toBeDefined()
})
test('handles cancellation gracefully', async () => {
const { context, writer } = createStreamContextMock(liveMetricsStreamBuilder, {
payload: { metricType: 'cpu' },
parameter: { intervalMs: 100 },
})
// Simulate cancellation after first chunk
let callCount = 0
const originalWrite = writer.write
writer.write = vi.fn(async (chunk) => {
callCount++
if (callCount >= 1) {
writer.cancelled = true
}
return originalWrite(chunk)
})
const streamFn = safeBind(liveMetricsStreamBuilder.getStreamFunction(), pingV1Service)
await streamFn(context, { metricType: 'cpu' }, { intervalMs: 100 }, writer)
// Should have stopped early
expect(writer.write.mock.calls.length).toBe(1)
})
})
The mock writer provides:
writer.write— mock function recording all chunk writeswriter.close— mock function recording the final framewriter.fail— mock function recording errorswriter.onCancel— registers cancellation callbackswriter.cancelled— boolean you can set to simulate cancellation
Testing upstream stream consumption
When your stream consumes another stream, test the frame handling logic:
test('forwards upstream chunks and final', async () => {
const { context, writer, stubs } = createStreamContextMock(aiResponseStreamBuilder, {
payload: { prompt: 'Hello' },
parameter: { model: 'gpt-4' },
})
// Stub the upstream stream
stubs.stream.AiService['1'].generateTokens = vi.fn().mockReturnValue({
async *[Symbol.asyncIterator]() {
yield { payload: { frameType: 'chunk', chunk: { token: 'Hello' } } }
yield { payload: { frameType: 'chunk', chunk: { token: ' world' } } }
yield { payload: { frameType: 'complete', final: { totalTokens: 2 } } }
},
})
const streamFn = safeBind(aiResponseStreamBuilder.getStreamFunction(), chatV1Service)
await streamFn(context, { prompt: 'Hello' }, { model: 'gpt-4' }, writer)
expect(writer.write).toHaveBeenCalledTimes(2)
expect(writer.close).toHaveBeenCalledWith({ totalTokens: 2 })
})
Runtime test
Use the harness when you need to verify the full stream runtime:
import { createStreamTestHarness } from '@purista/core'
test('full runtime: emits frames through event bridge', async () => {
const harness = await createStreamTestHarness(pingV1Service, liveMetricsStreamBuilder)
try {
const result = await harness.run({
payload: { metricType: 'cpu' },
parameter: { intervalMs: 100 },
})
expect(result.frames.length).toBeGreaterThan(0)
expect(result.chunks.length).toBeGreaterThan(0)
expect(result.final).toBeDefined()
} finally {
await harness.destroy()
}
})
The runtime harness:
- Starts a real service with the stream registered
- Runs the full pipeline including validation and guards
- Collects all frames, chunks, and the final payload
- Requires
await harness.destroy()to clean up
Testing guards in isolation
test('before guard rejects unauthorized requests', async () => {
const guard = liveMetricsStreamBuilder.getBeforeGuardHook('requireAuth')
const { context } = createStreamContextMock(liveMetricsStreamBuilder, {
payload: { metricType: 'cpu' },
})
// Simulate missing principalId
context.message.principalId = undefined
await expect(
guard!(context, { metricType: 'cpu' }, {})
).rejects.toThrow('Authentication required')
})
Which level should you use?
| Scenario | Recommended level |
|---|---|
| Chunk generation logic | Handler test |
| Final payload calculation | Handler test |
| Cancellation handling | Handler test |
| Upstream stream consumption | Handler test |
| Validation behavior | Runtime test |
| Guard integration | Runtime test |
| SSE frame format | Runtime test |