Core Building Blocks / Queue & Worker

Testing

Test queue handlers and workers with mocks, harnesses, and manual scheduling.

Queues have two testing levels:

LevelAPIValidates
Handler testcreateQueueWorkerContextMock(...)Job processing logic, error handling, lifecycle calls
Runtime testcreateQueueWorkerTestHarness(...)Full pipeline including workers and bridge interactions

Handler test

Use the context mock to test your worker handler in isolation. The handler is extracted from the definition and called directly with the mock context and message:

import { describe, test, expect, vi } from 'vitest'
import { createQueueWorkerContextMock } from '@purista/core'
import { imageWorker } from './workers/imageWorker.js'

describe('imageWorker', () => {
  test('completes successfully', async () => {
    const mock = createQueueWorkerContextMock(imageWorker, {
      queueName: 'processImage',
      payload: {
        imageUrl: 'https://example.com/photo.jpg',
        format: 'webp',
      },
      parameter: {},
    })

    // Retrieve the handler from the worker definition
    const definition = await imageWorker.getDefinition()
    await definition.handler(mock.context as never, mock.message as never)

    expect(mock.stubs.job.complete.calledOnce).toBe(true)
    expect(mock.stubs.job.complete.firstCall.args[0]).toMatchObject({
      processedUrl: expect.any(String),
    })
  })

  test('propagates thrown errors for queue retry', async () => {
    const mock = createQueueWorkerContextMock(imageWorker, {
      queueName: 'processImage',
      payload: { imageUrl: 'bad-url', format: 'webp' },
      parameter: {},
    })

    const definition = await imageWorker.getDefinition()

    // Handler throws — the queue runtime (not the handler) schedules the retry
    await expect(
      definition.handler(mock.context as never, mock.message as never)
    ).rejects.toThrow()
  })
})

The mock provides:

  • context.job.complete — mock function (accepts output? and optional headers?)
  • context.job.retry — mock function (accepts optional QueueRetryRequest)
  • context.job.fail — mock function (accepts reason: string, fatal?: boolean)
  • context.job.moveToDeadLetter — mock function (accepts optional reason)
  • context.job.extendLease — mock function (accepts durationMs as positional number)
  • context.job.cancelRequested — mock function returning boolean

Testing cancellation

test('checks cancellation and stops early', async () => {
  const mock = createQueueWorkerContextMock(longWorker, {
    queueName: 'longJob',
    payload: { items: ['a', 'b', 'c'] },
    parameter: {},
  })

  // Simulate cancellation being requested
  mock.stubs.job.cancelRequested.returns(true)

  const definition = await longWorker.getDefinition()
  await definition.handler(mock.context as never, mock.message as never)

  // Should have stopped early without completing
  expect(mock.stubs.job.complete.called).toBe(false)
})

Testing lifecycle configuration

import { imageQueue } from './queues/imageQueue.js'

test('has correct lifecycle config', () => {
  const definition = imageQueue.getDefinition()

  expect(definition.lifecycle.maxAttempts).toBe(4)
  expect(definition.lifecycle.poisonMessageAction).toBe('pause-worker')
})

Runtime test

Use createQueueWorkerTestHarness when you need to verify the full worker runtime — guards, bridge interactions, and job control methods:

import { createQueueWorkerTestHarness } from '@purista/core'

test('full runtime: worker acknowledges a completed job', async () => {
  const harness = await createQueueWorkerTestHarness(imageV1ServiceBuilder, imageWorker)

  try {
    const result = await harness.run({
      id: 'job-1',
      queueName: 'processImage',
      payload: {
        imageUrl: 'https://example.com/photo.jpg',
        format: 'webp',
      },
      parameter: {},
      attempt: 1,
    })

    // Verify the job was acknowledged (complete was called)
    expect(result.ackCalls).toHaveLength(1)
    expect(result.nackCalls).toHaveLength(0)
  } finally {
    await harness.destroy()
  }
})

test('full runtime: worker nacks a failed job', async () => {
  const harness = await createQueueWorkerTestHarness(imageV1ServiceBuilder, imageWorker)

  try {
    const result = await harness.run({
      id: 'job-2',
      queueName: 'processImage',
      payload: { imageUrl: 'not-a-url', format: 'jpeg' },
      parameter: {},
      attempt: 1,
    })

    expect(result.nackCalls.length + result.deadLetterCalls.length).toBeGreaterThan(0)
  } finally {
    await harness.destroy()
  }
})

The runtime harness:

  • Starts a real service instance with an in-memory queue bridge mock
  • Runs the worker’s full execution cycle for one message
  • Returns ackCalls, nackCalls, deadLetterCalls, and extendLeaseCalls from the queue bridge
  • Requires await harness.destroy() to clean up

The harness does not simulate scheduling or poison-message thresholds — those require an integration test with a real queue bridge.

Which level should you use?

ScenarioRecommended level
Job processing logicHandler test
Error handling and error propagationHandler test
Cancellation logicHandler test
Lifecycle configUnit test on definition
Worker mode behaviorRuntime test
Retry mechanicsRuntime test
Poison message handlingRuntime test
SchedulingRuntime test
Full pipelineRuntime test