Core Building Blocks / Queue & Worker
Queue Worker
Configure worker modes, intervals, parallel handlers, and poison message policies.
A queue worker defines how jobs are processed. While the queue definition describes what work exists, the worker configures execution mode, parallelism, error handling, and recovery.
Worker modes
Workers support three modes:
| Mode | Behavior | Use case |
|---|---|---|
continuous (default) | Worker runs constantly, picking up jobs as they arrive | High-throughput, always-on processing |
interval | Worker polls at a fixed interval | Resource-conscious, batch-like processing |
sequential | Worker processes one job at a time, then waits | Strict ordering, low-volume queues |
import { imageV1ServiceBuilder } from '../imageV1ServiceBuilder.js'
export const imageWorker = imageV1ServiceBuilder
.getQueueWorkerBuilder('imageWorker', 'Process image queue jobs')
.forQueue('processImage')
.setMode('continuous')
.setMaxParallelHandlers(4)
.setWorkerFunction(async function (ctx, payload) {
// payload is the typed job payload
const result = await processImage(payload.imageUrl, {
maxWidth: payload.maxWidth,
maxHeight: payload.maxHeight,
format: payload.format,
})
await ctx.resources.storage.save(result)
await ctx.job.complete({ processedUrl: result.url })
})
Interval mode
export const reportWorker = reportV1ServiceBuilder
.getQueueWorkerBuilder('reportWorker', 'Process report generation jobs')
.forQueue('generateReport')
.setMode('interval')
.setIntervalMs(60_000) // Check every minute
.setWorkerFunction(async function (ctx, payload) {
const report = await generateReport(payload.month, payload.year)
await ctx.job.complete({ reportUrl: report.url })
})
Sequential mode
export const importWorker = importV1ServiceBuilder
.getQueueWorkerBuilder('importWorker', 'Process data import jobs')
.forQueue('importData')
.setMode('sequential')
.setWorkerFunction(async function (ctx, payload) {
// One at a time, no parallelism
await importData(payload.fileUrl)
await ctx.job.complete({ imported: true })
})
Job lifecycle methods
The worker function receives a ctx.job object with methods to control the job:
.setWorkerFunction(async function (ctx, payload) {
try {
const result = await processImage(payload.imageUrl)
await ctx.job.complete({ processedUrl: result.url })
} catch (error) {
// Throw or handle — retry is managed by the queue lifecycle config
throw error
}
})
| Method | Description |
|---|---|
complete(output?, headers?) | Mark job as completed successfully |
retry(request?) | Schedule a retry |
fail(reason, fatal?) | Mark job as failed; fatal: true skips further retries |
moveToDeadLetter(reason?) | Move job directly to the dead-letter queue |
extendLease(durationMs) | Extend the lease if processing takes longer than expected |
cancelRequested() | Returns boolean — true if cancellation was requested |
Parallel handler count
Control how many jobs a worker processes concurrently:
Chain .setMaxParallelHandlers(4) when defining the worker:
imageV1ServiceBuilder
.getQueueWorkerBuilder('imageWorker', 'Process image queue jobs')
.forQueue('processImage')
.setMaxParallelHandlers(4) // process up to 4 images at once
.setWorkerFunction(async function (ctx, payload) { /* ... */ })
Lifecycle behavior — retries, TTL, timeouts — is configured on the queue builder via setLifecycleConfig, not on the worker itself.
Poison messages
A poison message is a job that repeatedly fails. Protect your workers with thresholds:
imageQueue.setLifecycleConfig({
poisonMessageFailureThreshold: 10,
poisonMessageAction: 'pause-worker',
})
| Action | Behavior |
|---|---|
none | Keep retrying (default) |
pause-worker | Pause the worker after threshold failures |
When paused, you can resume the worker:
service.resumeQueueWorkers('processImage')
Cancelling jobs
Workers can check if cancellation was requested:
.setWorkerFunction(async function (ctx, payload) {
for (const item of payload.items) {
if (ctx.job.cancelRequested()) {
// Stop processing — cancellation is handled by the queue system
return
}
await processItem(item)
}
await ctx.job.complete()
})
Worker function signature
The worker function receives:
async function (ctx, payload) {
// ctx contains:
// - ctx.job: job lifecycle methods (complete, retry, fail, moveToDeadLetter, extendLease, cancelRequested)
// - ctx.logger: scoped logger
// - ctx.resources: typed service resources
// payload: the typed job payload (from the queue's addPayloadSchema)
}
Use async function to preserve this context. Arrow functions will lose service instance binding.
Full worker example
import { imageV1ServiceBuilder } from '../imageV1ServiceBuilder.js'
export const imageWorker = imageV1ServiceBuilder
.getQueueWorkerBuilder('imageWorker', 'Process image queue jobs')
.forQueue('processImage')
.setMode('continuous')
.setMaxParallelHandlers(4)
.setWorkerFunction(async function (ctx, payload) {
try {
const result = await processImage(payload.imageUrl, {
maxWidth: payload.maxWidth,
maxHeight: payload.maxHeight,
format: payload.format,
})
await ctx.job.complete({ processedUrl: result.url })
} catch (error) {
// Retry and dead-letter behavior is managed by queue lifecycle config
throw error
}
})