Skip to content

The Queue Worker Builder

Workers consume jobs produced by queue definitions. Use serviceBuilder.getQueueWorkerBuilder(queueId, description) to declare mode, concurrency, and handler logic.

CLI scaffolding

bash
purista add queue-worker

The wizard prompts for the queue, worker name, polling mode, and whether to reuse the existing resource wiring/tests.

Modes

modebehavior
sequential (default)Fetch the next job only after the handler resolves. Ideal for per-tenant serial workloads or long-running jobs.
intervalRun on a fixed interval (setInterval style). Suitable for scheduled scans or low-frequency background tasks.
continuousLong-poll the queue with a minimal delay between leases. Use for high-throughput worker pools.
ts
export const pingJobWorkerBuilder = pingV1ServiceBuilder
  .getQueueWorkerBuilder('pingJob', 'Ping job worker')
  .setMode('continuous')
  .setIntervalMs(250) // only for interval mode
  .setHandler(async function (context, job) {
    context.logger.info({ jobId: job.id }, 'processing async ping')
    // ... do work ...
    await context.job.complete()
  })

Handler utilities

Inside the handler you receive:

  • context.job.complete(result?) – acknowledge success. Optional payload is passed to metrics/telemetry.
  • context.job.retry({ delayMs?, reason? }) – release the job back to the queue with optional delay.
  • context.job.extendLease(extensionMs?) – extend the visibility timeout for long-running work.
  • context.job.moveToDeadLetter(reason?) – skip retries and push to the DLQ.
  • context.job.fail(error) – mark as failed (counts toward retry budget).

All helpers emit OpenTelemetry spans so you get timing and failure statistics automatically.

Error handling

Unhandled exceptions trigger context.job.retry() automatically until maxAttempts is exceeded. Use HandledError to control the reason/status stored alongside the job. For critical failures, call context.job.moveToDeadLetter() yourself to bypass retries.

Workers + resources

Workers share the same service resources defined through the ServiceBuilder. Inject DB clients, OpenAI SDKs, etc., via serviceBuilder.addResource(...) and they become available as context.resources.<name> inside the worker.

Testing workers

  • The CLI scaffolds Vitest suites that instantiate the worker handler with the default queue bridge.
  • Use DefaultQueueBridge for deterministic tests (no external dependencies).
  • Simulate retries by throwing from the handler and asserting that the queue metrics increment.