The subscription builder
A subscription reacts to messages/events from the event bridge and can optionally emit a new event.
Use the subscription builder from your service builder:
const mySubscriptionBuilder = myServiceBuilder.getSubscriptionBuilder(
'subscriptionName',
'some subscription description'
)Typical setup
import { z } from 'zod'
const inputPayloadSchema = z.object({
id: z.string(),
})
const inputParameterSchema = z.object({
source: z.string().optional(),
})
const outputSchema = z.object({
done: z.boolean(),
})
const mySubscriptionBuilder = myServiceBuilder
.getSubscriptionBuilder('userCreatedProjection', 'projects user-created events')
.addPayloadSchema(inputPayloadSchema)
.addParameterSchema(inputParameterSchema)
.addOutputSchema('userProjected', outputSchema)
.setSubscriptionFunction(async function (context, payload, parameter) {
context.logger.info({ payload, parameter }, 'projecting user')
return { done: true }
})addOutputSchema(eventName, schema) is optional. If set, a returned value is validated and emitted as a custom event using eventName.
Message filters
Subscriptions should be as specific as possible. The builder supports multiple filters:
import { EBMessageType } from '@purista/core'
const builder = myServiceBuilder
.getSubscriptionBuilder('onlyUserCreated', 'react on specific event')
.subscribeToEvent('userCreated', '1')
.filterSentFrom('UserService', '1', 'createUser', undefined)
.filterReceivedBy(undefined, undefined, undefined, undefined)
.filterForMessageType(EBMessageType.CommandSuccessResponse)
.filterPrincipalId('my-principal')
.filterTenantId('my-tenant')Use only the filters you need. Over-filtering can prevent subscriptions from matching messages.
Delivery behavior hints
You can advise broker behavior for this subscription:
const builder = myServiceBuilder
.getSubscriptionBuilder('mySub', '...')
.adviceDurable(true)
.adviceAutoacknowledgeMessage(false)
.adviceConsumerFailureHandling({
mode: 'strict',
maxAttempts: 5,
retryDelayMs: 1000,
deadLetterTarget: 'billing.userCreated.dead-letter',
})
.receiveMessageOnEveryInstance(true)adviceDurable(true)asks the broker to persist messages while subscriber is offline.adviceAutoacknowledgeMessage(false)prefers ack after successful execution.adviceConsumerFailureHandling(...)declares bounded retry semantics, strictness, and the dead-letter target that exhausted messages should use.receiveMessageOnEveryInstance(true)disables shared-consumer mode and fans out to each instance.
Support depends on the selected event bridge/broker.
Failure handling modes
mode: 'strict'is the recommended default. PURISTA validates the selected event bridge at startup and rejects unsupported retry/DLQ requirements.mode: 'best-effort'allows adapter-specific degradation when you explicitly accept weaker semantics.- Exhausted subscription messages are dead-lettered. Set
deadLetterTargetexplicitly when you want a stable operator inbox, or rely on the adapter default suffix when the selected bridge documents one.
Explicit handler outcomes
Subscription handlers can return explicit control outcomes instead of only throwing exceptions:
const builder = myServiceBuilder
.getSubscriptionBuilder('mySub', '...')
.setSubscriptionFunction(async function (context, payload) {
if (payload.temporaryUnavailable) {
return { status: 'retry', reason: 'temporary downstream outage', delayMs: 5_000 }
}
if (payload.invalidData) {
return { status: 'deadLetter', reason: 'invalid payload for projection' }
}
if (payload.ignoreForNow) {
return { status: 'drop', reason: 'known irrelevant event variant' }
}
if (payload.poisonSequenceDetected) {
return { status: 'stop-consumer', reason: 'manual operator review required' }
}
return { status: 'ack' }
})acksettles the delivery as successful.retrysignals transient failure with optional delay hint.deadLetterroutes immediately to dead-letter handling.dropacknowledges and discards the current delivery with warning logging.stop-consumerpauses the affected subscription consumer until explicitly resumed by an operator/runtime call.- Thrown errors still map through the configured retry/DLQ policy for backward-compatible behavior.
stop-consumer support is capability-gated per adapter. In mode: 'strict', unsupported adapters reject this outcome with a startup/runtime validation error instead of silently degrading behavior.
When to use subscription retries vs queues
Use subscription retry / dead-letter handling for reactive push workloads where the broker should make a bounded number of delivery attempts and then move the message aside for investigation.
Use a queue instead when you need:
- long backoff windows
- controlled replay / re-drive operations
- worker heartbeats / leases
- operator-visible backlog metrics and remediation flows
Queues are the stronger abstraction for workflow-style work. Subscription retries should stay focused on bounded event-consumer hardening.
Invoke and emit from subscriptions
Subscriptions can invoke commands and emit custom events with full type support.
const invokeResultSchema = z.object({ ok: z.boolean() })
const invokePayloadSchema = z.object({ id: z.string() })
const builder = myServiceBuilder
.getSubscriptionBuilder('mySub', '...')
.canInvoke('AuditService', '1', 'writeAudit', invokeResultSchema, invokePayloadSchema)
.canEmit('auditWritten', z.object({ id: z.string() }))
.setSubscriptionFunction(async function (context, payload) {
await context.service.AuditService['1'].writeAudit({ id: payload.id }, {})
context.emit('auditWritten', { id: payload.id })
})Subscriptions can also consume streams with typed contracts:
const builder = myServiceBuilder
.getSubscriptionBuilder('mySub', '...')
.canConsumeStream('SearchService', '1', 'searchUsers', chunkSchema, payloadSchema, parameterSchema, finalSchema)
.setSubscriptionFunction(async function (context, payload, parameter) {
const handle = await context.stream.SearchService['1'].searchUsers(payload, parameter)
for await (const frame of handle) {
if (frame.payload.frameType === 'chunk') {
// consume chunk
}
}
})Context
The subscription function context provides:
context.message: original event-bridge messagecontext.logger: logger with tracing contextcontext.resources: service resourcescontext.secrets/context.configs/context.states: store accesscontext.service: typed invoke API fromcanInvokecontext.stream: typed stream consume API fromcanConsumeStreamcontext.emit: typed custom event API fromcanEmit
Guards and transformers
Subscriptions support the same transformer and guard concepts as commands:
setTransformInput(...)setTransformOutput(...)setBeforeGuardHooks(...)setAfterGuardHooks(...)
Details and execution order are the same as in: Transformer & Guards.
Important
Use function and not arrow functions for builder hooks/functions to preserve this binding:
// good
async function (context, payload, parameter) {}
// bad
async (context, payload, parameter) => {}