Core Building Blocks / Queue & Worker
Testing
Test queue handlers and workers with mocks, harnesses, and manual scheduling.
Queues have two testing levels:
| Level | API | Validates |
|---|---|---|
| Handler test | createQueueWorkerContextMock(...) | Job processing logic, error handling, lifecycle calls |
| Runtime test | createQueueWorkerTestHarness(...) | Full pipeline including workers and bridge interactions |
Handler test
Use the context mock to test your worker handler in isolation. The handler is extracted from the definition and called directly with the mock context and message:
import { describe, test, expect, vi } from 'vitest'
import { createQueueWorkerContextMock } from '@purista/core'
import { imageWorker } from './workers/imageWorker.js'
describe('imageWorker', () => {
test('completes successfully', async () => {
const mock = createQueueWorkerContextMock(imageWorker, {
queueName: 'processImage',
payload: {
imageUrl: 'https://example.com/photo.jpg',
format: 'webp',
},
parameter: {},
})
// Retrieve the handler from the worker definition
const definition = await imageWorker.getDefinition()
await definition.handler(mock.context as never, mock.message as never)
expect(mock.stubs.job.complete.calledOnce).toBe(true)
expect(mock.stubs.job.complete.firstCall.args[0]).toMatchObject({
processedUrl: expect.any(String),
})
})
test('propagates thrown errors for queue retry', async () => {
const mock = createQueueWorkerContextMock(imageWorker, {
queueName: 'processImage',
payload: { imageUrl: 'bad-url', format: 'webp' },
parameter: {},
})
const definition = await imageWorker.getDefinition()
// Handler throws — the queue runtime (not the handler) schedules the retry
await expect(
definition.handler(mock.context as never, mock.message as never)
).rejects.toThrow()
})
})
The mock provides:
context.job.complete— mock function (acceptsoutput?and optionalheaders?)context.job.retry— mock function (accepts optionalQueueRetryRequest)context.job.fail— mock function (acceptsreason: string, fatal?: boolean)context.job.moveToDeadLetter— mock function (accepts optionalreason)context.job.extendLease— mock function (acceptsdurationMsas positional number)context.job.cancelRequested— mock function returning boolean
Testing cancellation
test('checks cancellation and stops early', async () => {
const mock = createQueueWorkerContextMock(longWorker, {
queueName: 'longJob',
payload: { items: ['a', 'b', 'c'] },
parameter: {},
})
// Simulate cancellation being requested
mock.stubs.job.cancelRequested.returns(true)
const definition = await longWorker.getDefinition()
await definition.handler(mock.context as never, mock.message as never)
// Should have stopped early without completing
expect(mock.stubs.job.complete.called).toBe(false)
})
Testing lifecycle configuration
import { imageQueue } from './queues/imageQueue.js'
test('has correct lifecycle config', () => {
const definition = imageQueue.getDefinition()
expect(definition.lifecycle.maxAttempts).toBe(4)
expect(definition.lifecycle.poisonMessageAction).toBe('pause-worker')
})
Runtime test
Use createQueueWorkerTestHarness when you need to verify the full worker runtime — guards, bridge interactions, and job control methods:
import { createQueueWorkerTestHarness } from '@purista/core'
test('full runtime: worker acknowledges a completed job', async () => {
const harness = await createQueueWorkerTestHarness(imageV1ServiceBuilder, imageWorker)
try {
const result = await harness.run({
id: 'job-1',
queueName: 'processImage',
payload: {
imageUrl: 'https://example.com/photo.jpg',
format: 'webp',
},
parameter: {},
attempt: 1,
})
// Verify the job was acknowledged (complete was called)
expect(result.ackCalls).toHaveLength(1)
expect(result.nackCalls).toHaveLength(0)
} finally {
await harness.destroy()
}
})
test('full runtime: worker nacks a failed job', async () => {
const harness = await createQueueWorkerTestHarness(imageV1ServiceBuilder, imageWorker)
try {
const result = await harness.run({
id: 'job-2',
queueName: 'processImage',
payload: { imageUrl: 'not-a-url', format: 'jpeg' },
parameter: {},
attempt: 1,
})
expect(result.nackCalls.length + result.deadLetterCalls.length).toBeGreaterThan(0)
} finally {
await harness.destroy()
}
})
The runtime harness:
- Starts a real service instance with an in-memory queue bridge mock
- Runs the worker’s full execution cycle for one message
- Returns
ackCalls,nackCalls,deadLetterCalls, andextendLeaseCallsfrom the queue bridge - Requires
await harness.destroy()to clean up
The harness does not simulate scheduling or poison-message thresholds — those require an integration test with a real queue bridge.
Which level should you use?
| Scenario | Recommended level |
|---|---|
| Job processing logic | Handler test |
| Error handling and error propagation | Handler test |
| Cancellation logic | Handler test |
| Lifecycle config | Unit test on definition |
| Worker mode behavior | Runtime test |
| Retry mechanics | Runtime test |
| Poison message handling | Runtime test |
| Scheduling | Runtime test |
| Full pipeline | Runtime test |