Skip to content

Async HTTP Exposure

Expose commands with .exposeAsHttpEndpoint(..., { mode: 'async' }) when a request should enqueue work and respond quickly. The pattern mirrors CQRS write → async read flows.

Producer command

ts
export const pingAsyncCommandBuilder = pingV1ServiceBuilder
  .getCommandBuilder('pingAsync', 'Async ping endpoint')
  .addPayloadSchema(pingAsyncPayloadSchema)
  .addParameterSchema(pingAsyncParameterSchema)
  .canEnqueue('pingJob', pingJobPayloadSchema, pingJobParameterSchema)
  .exposeAsHttpEndpoint('POST', 'ping/async', undefined, undefined, undefined, undefined, { mode: 'async' })
  .setCommandFunction(async function (context, payload, parameter) {
    const job = await context.queue.enqueue.pingJob(payload, parameter)
    return {
      queueId: job.queueName,
      jobId: job.jobId,
      statusUrl: `/api/v1/ping/status/${job.jobId}`,
    }
  })

The HTTP adapter returns 202 Accepted with the JSON payload above.

Status command

ts
export const pingStatusCommandBuilder = pingV1ServiceBuilder
  .getCommandBuilder('pingStatus', 'Check ping job status')
  .addParameterSchema(z.object({ jobId: z.string().uuid() }))
  .setCommandFunction(async function (context, _payload, parameter) {
    const job = await context.queue.metrics.pingJob(parameter.jobId)
    if (!job) {
      throw new HandledError(StatusCode.Gone, 'Job expired or unknown')
    }

    return {
      state: job.state, // pending | inFlight | completed | failed | deadLetter
      attempts: job.attempt,
      lastError: job.lastError,
      queueId: job.queueName,
      jobId: job.id,
    }
  })

Map queue states to HTTP codes:

queue stateHTTP responseguidance
pending / inFlight202 AcceptedAsk the client to poll later.
completed200 OK with final resultOptionally include output payload from context.job.complete(result).
deadLetter / failed500 Internal Server ErrorSurface error details or redirect to manual remediation.
expired / missing410 GoneTell clients the job was purged or never existed.
redirect (custom)303 See OtherInclude Location header to alternate status endpoint.

Delayed jobs

Use scheduleAt to defer execution:

ts
await context.queue.scheduleAt.pingJob(Date.now() + 5 * 60_000, payload, parameter)

This is ideal for backoff strategies, nightly batches, or cron replacements.

Client guidance

  • Poll the status endpoint with exponential backoff.
  • Cache the statusUrl in your frontend/API clients.
  • Handle 410 Gone by showing a “job expired” message rather than retrying indefinitely.