Core Building Blocks / Subscription

The Subscription Builder

Define typed event subscriptions, filters, before guards, and explicit handler outcomes.

The subscription builder — obtained via serviceBuilder.getSubscriptionBuilder(name, description) — is a step-by-step API for declaring how your service reacts to events. Every method call returns a new builder instance with narrowed types, so TypeScript tracks what you’ve configured and what remains.

Builder workflow

flowchart LR
    A[Create Builder] --> B[Set Filters]
    B --> C[Define Input Schema]
    C --> D[Set Handler]
    D --> E[Optional: Add Guards]
    E --> F[Optional: Bridge Advice]
    F --> G[Add to Service]

Step 1 — Create the builder

import { notificationV1ServiceBuilder } from './notificationV1ServiceBuilder.js'

const orderNotificationSubscription = notificationV1ServiceBuilder
  .getSubscriptionBuilder('sendOrderNotification', 'Send a notification when an order is created')

Step 2 — Filter which events to receive

orderNotificationSubscription
  // Listen to a specific event from a specific service
  .subscribeToEvent('OrderService', '1', 'orderCreated')
  // Listen to ALL events from a service
  .subscribeToEvent('OrderService', '1', '*')
  // Only accept events from this service version
  .filterSentFrom('OrderService', '1')
  // Only accept events targeted at this service
  .filterReceivedBy('NotificationService', '1')
  // Filter by message type
  .filterForMessageType('event')

Filters are combined with AND logic. An event must match all specified filters to be delivered.

Step 3 — Define the payload schema

import { z } from 'zod'

const orderSchema = z.object({
  orderId: z.string(),
  customerId: z.string(),
  items: z.array(z.object({ productId: z.string(), qty: z.number() })),
})

orderNotificationSubscription.addPayloadSchema(orderSchema)

The payload schema drives TypeScript inference in the handler and is used for runtime validation.

Step 4 — Set the handler function

orderNotificationSubscription.setSubscriptionFunction(async function (context, payload, parameter) {
  // payload is typed from the input schema
  const { orderId, customerId } = payload

  await sendPushNotification(customerId, `Order ${orderId} confirmed`)

  return { status: 'ack' }
})

Use async function so this is bound to the service instance. Arrow functions lose this context.

Step 5 — Add before guards

Before guards run in parallel (all guards execute simultaneously with Promise.all):

import { HandledError, StatusCode } from '@purista/core'

orderNotificationSubscription.setBeforeGuardHooks({
  requireAuth: async function (context, payload, parameter) {
    if (!context.message.principalId) {
      throw new HandledError(StatusCode.Unauthorized, 'Authentication required')
    }
  },
  validatePayload: async function (context, payload, parameter) {
    // The payload is already validated by the input schema
    // Use this for business logic validation
    if (payload.items.length === 0) {
      throw new HandledError(StatusCode.BadRequest, 'Order must have at least one item')
    }
  },
})

Since guards run in parallel, they should not depend on each other. Any guard that throws cancels the handler.

Step 6 — Bridge advice

orderNotificationSubscription
  // Subscription survives consumer restarts
  .adviceDurable(true)
  // Auto-acknowledge after handler success
  .adviceAutoacknowledgeMessage(true)
  // Best-effort handling (vs strict default)
  .adviceConsumerFailureHandling({ mode: 'best-effort' })
  // Every instance receives the message (broadcast)
  .receiveMessageOnEveryInstance(true)

Step 7 — Can invoke commands

orderNotificationSubscription
  .canInvoke('CustomerService', '1', 'getCustomer', {
    payloadSchema: z.object({ customerId: z.string() }),
    outputSchema: z.object({ id: z.string(), name: z.string() }),
  })
  .setSubscriptionFunction(async function (context, payload, parameter) {
    const customer = await context.invoke('CustomerService', '1', 'getCustomer', {
      customerId: payload.customerId,
    })

    await sendPushNotification(customer.id, `Order ${payload.orderId} confirmed`)

    return { status: 'ack' }
  })

Step 8 — Add to the service

Because the subscription builder is obtained from notificationV1ServiceBuilder.getSubscriptionBuilder(...), it is already associated with that service. Register the finished definition back on the service builder:

import { ServiceBuilder } from '@purista/core'

export const notificationV1ServiceBuilder = new ServiceBuilder({
  serviceName: 'NotificationService',
  serviceVersion: '1',
  serviceDescription: 'Handles notifications',
})
// orderNotificationSubscription is built using notificationV1ServiceBuilder.getSubscriptionBuilder(...)
// and must be imported so the definition is registered before getInstance() is called.
import { orderNotificationSubscription } from './subscriptions/orderNotifications.js'
import { notificationV1ServiceBuilder } from './notificationV1ServiceBuilder.js'
import './subscriptions/orderNotifications.js' // ensure the definition is registered

const notificationService = await notificationV1ServiceBuilder.getInstance(eventBridge)
await notificationService.start()

Full example

import { z } from 'zod'
import { HandledError, StatusCode } from '@purista/core'
import { notificationV1ServiceBuilder } from './notificationV1ServiceBuilder.js'

const orderSchema = z.object({
  orderId: z.string(),
  customerId: z.string(),
  items: z.array(z.object({ productId: z.string(), qty: z.number() })),
})

export const orderNotificationSubscription = notificationV1ServiceBuilder
  .getSubscriptionBuilder('sendOrderNotification', 'Send a notification when an order is created')
  .subscribeToEvent('OrderService', '1', 'orderCreated')
  .addPayloadSchema(orderSchema)
  .canInvoke('CustomerService', '1', 'getCustomer', {
    payloadSchema: z.object({ customerId: z.string() }),
    outputSchema: z.object({ id: z.string(), name: z.string() }),
  })
  .setBeforeGuardHooks({
    requireAuth: async function (context, payload, parameter) {
      if (!context.message.principalId) {
        throw new HandledError(StatusCode.Unauthorized, 'Authentication required')
      }
    },
  })
  .setSubscriptionFunction(async function (context, payload, parameter) {
    const customer = await context.invoke('CustomerService', '1', 'getCustomer', {
      customerId: payload.customerId,
    })

    await sendPushNotification(customer.id, `Order ${payload.orderId} confirmed`)
    return { status: 'ack' }
  })