# Queue Worker

Configure worker modes, intervals, parallel handlers, and poison message policies.

---
Canonical: /handbook/blocks/queue-pattern/queue-worker/
Source: web/src/content/handbook-cards/blocks/queue-pattern/queue-worker.mdx
Format: Markdown for agents
---

A queue worker defines **how** jobs are processed. While the queue definition describes **what** work exists, the worker configures execution mode, parallelism, error handling, and recovery.

## Worker modes

Workers support three modes:

| Mode | Behavior | Use case |
|---|---|---|
| `continuous` (default) | Worker runs constantly, picking up jobs as they arrive | High-throughput, always-on processing |
| `interval` | Worker polls at a fixed interval | Resource-conscious, batch-like processing |
| `sequential` | Worker processes one job at a time, then waits | Strict ordering, low-volume queues |

```typescript [imageWorker.ts]
import { imageV1ServiceBuilder } from '../imageV1ServiceBuilder.js'

export const imageWorker = imageV1ServiceBuilder
  .getQueueWorkerBuilder('imageWorker', 'Process image queue jobs')
  .forQueue('processImage')
  .setMode('continuous')
  .setMaxParallelHandlers(4)
  .setWorkerFunction(async function (ctx, payload) {
    // payload is the typed job payload
    const result = await processImage(payload.imageUrl, {
      maxWidth: payload.maxWidth,
      maxHeight: payload.maxHeight,
      format: payload.format,
    })

    await ctx.resources.storage.save(result)
    await ctx.job.complete({ processedUrl: result.url })
  })
```

## Interval mode

```typescript [reportWorker.ts]
export const reportWorker = reportV1ServiceBuilder
  .getQueueWorkerBuilder('reportWorker', 'Process report generation jobs')
  .forQueue('generateReport')
  .setMode('interval')
  .setIntervalMs(60_000) // Check every minute
  .setWorkerFunction(async function (ctx, payload) {
    const report = await generateReport(payload.month, payload.year)
    await ctx.job.complete({ reportUrl: report.url })
  })
```

## Sequential mode

```typescript [importWorker.ts]
export const importWorker = importV1ServiceBuilder
  .getQueueWorkerBuilder('importWorker', 'Process data import jobs')
  .forQueue('importData')
  .setMode('sequential')
  .setWorkerFunction(async function (ctx, payload) {
    // One at a time, no parallelism
    await importData(payload.fileUrl)
    await ctx.job.complete({ imported: true })
  })
```

## Job lifecycle methods

The worker function receives a `ctx.job` object with methods to control the job:

```typescript [imageWorker.ts]
.setWorkerFunction(async function (ctx, payload) {
  try {
    const result = await processImage(payload.imageUrl)
    await ctx.job.complete({ processedUrl: result.url })
  } catch (error) {
    // Throw or handle — retry is managed by the queue lifecycle config
    throw error
  }
})
```

| Method | Description |
|---|---|
| `complete(output?, headers?)` | Mark job as completed successfully |
| `retry(request?)` | Schedule a retry |
| `fail(reason, fatal?)` | Mark job as failed; `fatal: true` skips further retries |
| `moveToDeadLetter(reason?)` | Move job directly to the dead-letter queue |
| `extendLease(durationMs)` | Extend the lease if processing takes longer than expected |
| `cancelRequested()` | Returns boolean — true if cancellation was requested |

## Parallel handler count

Control how many jobs a worker processes concurrently:

Chain `.setMaxParallelHandlers(4)` when defining the worker:

```typescript [imageWorker.ts]
imageV1ServiceBuilder
  .getQueueWorkerBuilder('imageWorker', 'Process image queue jobs')
  .forQueue('processImage')
  .setMaxParallelHandlers(4) // process up to 4 images at once
  .setWorkerFunction(async function (ctx, payload) { /* ... */ })
```

Lifecycle behavior — retries, TTL, timeouts — is configured on the queue builder via `setLifecycleConfig`, not on the worker itself.

## Poison messages

A poison message is a job that repeatedly fails. Protect your workers with thresholds:

```typescript [imageQueue.ts]
imageQueue.setLifecycleConfig({
  poisonMessageFailureThreshold: 10,
  poisonMessageAction: 'pause-worker',
})
```

| Action | Behavior |
|---|---|
| `none` | Keep retrying (default) |
| `pause-worker` | Pause the worker after threshold failures |

When paused, you can resume the worker:

```typescript
service.resumeQueueWorkers('processImage')
```

## Cancelling jobs

Workers can check if cancellation was requested:

```typescript [longWorker.ts]
.setWorkerFunction(async function (ctx, payload) {
  for (const item of payload.items) {
    if (ctx.job.cancelRequested()) {
      // Stop processing — cancellation is handled by the queue system
      return
    }
    await processItem(item)
  }
  await ctx.job.complete()
})
```

## Worker function signature

The worker function receives:

```typescript
async function (ctx, payload) {
  // ctx contains:
  // - ctx.job: job lifecycle methods (complete, retry, fail, moveToDeadLetter, extendLease, cancelRequested)
  // - ctx.logger: scoped logger
  // - ctx.resources: typed service resources
  // payload: the typed job payload (from the queue's addPayloadSchema)
}
```

Use `async function` to preserve `this` context. Arrow functions will lose service instance binding.

## Full worker example

```typescript [imageWorker.ts]
import { imageV1ServiceBuilder } from '../imageV1ServiceBuilder.js'

export const imageWorker = imageV1ServiceBuilder
  .getQueueWorkerBuilder('imageWorker', 'Process image queue jobs')
  .forQueue('processImage')
  .setMode('continuous')
  .setMaxParallelHandlers(4)
  .setWorkerFunction(async function (ctx, payload) {
    try {
      const result = await processImage(payload.imageUrl, {
        maxWidth: payload.maxWidth,
        maxHeight: payload.maxHeight,
        format: payload.format,
      })

      await ctx.job.complete({ processedUrl: result.url })
    } catch (error) {
      // Retry and dead-letter behavior is managed by queue lifecycle config
      throw error
    }
  })
```
