Core Building Blocks / Queue & Worker

Queue Worker

Configure worker modes, intervals, parallel handlers, and poison message policies.

A queue worker defines how jobs are processed. While the queue definition describes what work exists, the worker configures execution mode, parallelism, error handling, and recovery.

Worker modes

Workers support three modes:

ModeBehaviorUse case
continuous (default)Worker runs constantly, picking up jobs as they arriveHigh-throughput, always-on processing
intervalWorker polls at a fixed intervalResource-conscious, batch-like processing
sequentialWorker processes one job at a time, then waitsStrict ordering, low-volume queues
import { imageV1ServiceBuilder } from '../imageV1ServiceBuilder.js'

export const imageWorker = imageV1ServiceBuilder
  .getQueueWorkerBuilder('imageWorker', 'Process image queue jobs')
  .forQueue('processImage')
  .setMode('continuous')
  .setMaxParallelHandlers(4)
  .setWorkerFunction(async function (ctx, payload) {
    // payload is the typed job payload
    const result = await processImage(payload.imageUrl, {
      maxWidth: payload.maxWidth,
      maxHeight: payload.maxHeight,
      format: payload.format,
    })

    await ctx.resources.storage.save(result)
    await ctx.job.complete({ processedUrl: result.url })
  })

Interval mode

export const reportWorker = reportV1ServiceBuilder
  .getQueueWorkerBuilder('reportWorker', 'Process report generation jobs')
  .forQueue('generateReport')
  .setMode('interval')
  .setIntervalMs(60_000) // Check every minute
  .setWorkerFunction(async function (ctx, payload) {
    const report = await generateReport(payload.month, payload.year)
    await ctx.job.complete({ reportUrl: report.url })
  })

Sequential mode

export const importWorker = importV1ServiceBuilder
  .getQueueWorkerBuilder('importWorker', 'Process data import jobs')
  .forQueue('importData')
  .setMode('sequential')
  .setWorkerFunction(async function (ctx, payload) {
    // One at a time, no parallelism
    await importData(payload.fileUrl)
    await ctx.job.complete({ imported: true })
  })

Job lifecycle methods

The worker function receives a ctx.job object with methods to control the job:

.setWorkerFunction(async function (ctx, payload) {
  try {
    const result = await processImage(payload.imageUrl)
    await ctx.job.complete({ processedUrl: result.url })
  } catch (error) {
    // Throw or handle — retry is managed by the queue lifecycle config
    throw error
  }
})
MethodDescription
complete(output?, headers?)Mark job as completed successfully
retry(request?)Schedule a retry
fail(reason, fatal?)Mark job as failed; fatal: true skips further retries
moveToDeadLetter(reason?)Move job directly to the dead-letter queue
extendLease(durationMs)Extend the lease if processing takes longer than expected
cancelRequested()Returns boolean — true if cancellation was requested

Parallel handler count

Control how many jobs a worker processes concurrently:

Chain .setMaxParallelHandlers(4) when defining the worker:

imageV1ServiceBuilder
  .getQueueWorkerBuilder('imageWorker', 'Process image queue jobs')
  .forQueue('processImage')
  .setMaxParallelHandlers(4) // process up to 4 images at once
  .setWorkerFunction(async function (ctx, payload) { /* ... */ })

Lifecycle behavior — retries, TTL, timeouts — is configured on the queue builder via setLifecycleConfig, not on the worker itself.

Poison messages

A poison message is a job that repeatedly fails. Protect your workers with thresholds:

imageQueue.setLifecycleConfig({
  poisonMessageFailureThreshold: 10,
  poisonMessageAction: 'pause-worker',
})
ActionBehavior
noneKeep retrying (default)
pause-workerPause the worker after threshold failures

When paused, you can resume the worker:

service.resumeQueueWorkers('processImage')

Cancelling jobs

Workers can check if cancellation was requested:

.setWorkerFunction(async function (ctx, payload) {
  for (const item of payload.items) {
    if (ctx.job.cancelRequested()) {
      // Stop processing — cancellation is handled by the queue system
      return
    }
    await processItem(item)
  }
  await ctx.job.complete()
})

Worker function signature

The worker function receives:

async function (ctx, payload) {
  // ctx contains:
  // - ctx.job: job lifecycle methods (complete, retry, fail, moveToDeadLetter, extendLease, cancelRequested)
  // - ctx.logger: scoped logger
  // - ctx.resources: typed service resources
  // payload: the typed job payload (from the queue's addPayloadSchema)
}

Use async function to preserve this context. Arrow functions will lose service instance binding.

Full worker example

import { imageV1ServiceBuilder } from '../imageV1ServiceBuilder.js'

export const imageWorker = imageV1ServiceBuilder
  .getQueueWorkerBuilder('imageWorker', 'Process image queue jobs')
  .forQueue('processImage')
  .setMode('continuous')
  .setMaxParallelHandlers(4)
  .setWorkerFunction(async function (ctx, payload) {
    try {
      const result = await processImage(payload.imageUrl, {
        maxWidth: payload.maxWidth,
        maxHeight: payload.maxHeight,
        format: payload.format,
      })

      await ctx.job.complete({ processedUrl: result.url })
    } catch (error) {
      // Retry and dead-letter behavior is managed by queue lifecycle config
      throw error
    }
  })