# AmqpBridge API

EventBridge implementation for AMQP brokers such as RabbitMQ.

---
Canonical: /handbook/api/classes/_purista_amqpbridge.AmqpBridge/
Source: amqpbridge/src/AmqpBridge.impl.ts
Format: Markdown for agents
---

EventBridge implementation for AMQP brokers such as RabbitMQ.

Package: `@purista/amqpbridge`

## Signature

```typescript
class AmqpBridge
```

## Examples

```typescript
import { AmqpBridge } from '@purista/amqpbridge'

// create and init our eventbridge
const config = {
   url: 'amqp://localhost'
}

const eventBridge = new AmqpBridge(config)
await eventBridge.start()
```

## Members

### Constructors

- `new constructor(config?: { deadLetterExchangeName: string; deadLetterRoutingKey: string; defaultCommandTimeout: number; encoder: Encoder; encrypter: Encrypter; exchangeName: string; ... })` — Creates an AMQP bridge with defaults merged into the provided config.

### Properties

- `capabilities: EventBridgeCapabilities` — Runtime capability matrix used for strict startup validation.
- `channel: ConfirmChannel` — Shared publishing channel used for event and command publication.
- `config: Complete<EventBridgeConfig<ConfigType>>`
- `connection: ChannelModel` — Active AMQP connection, available after start.
- `consumerRegistrations: { channel: ConfirmChannel; tag: string }[]` — Consumer tags grouped with their channels for shutdown cleanup.
- `defaultCommandTimeout: number` — The default time until when a command invocation automatically returns a time out error
- `encoder: Encoder` — Content-type codecs used before encryption.
- `encrypter: Encrypter` — Content-encoding encryption handlers used after encoding.
- `healthy: boolean` — Last known broker/channel health state.
- `inFlightExecutions: InFlightExecutionTracker`
- `instanceId: string` — Stable runtime instance id used to distinguish bridge processes.
- `logger: Logger`
- `metricsRecorder: PuristaMetricsRecorder`
- `name: string` — Human-readable bridge name used in logs, traces, and metrics.
- `pausedSubscriptionConsumers: Map<string, PausedSubscriptionState>` — Subscription consumers paused by control signals.
- `pendingInvocations: PendingInvocationRegistry<unknown>` — Pending command invocations waiting for a correlated response.
- `ready: boolean` — Indicates whether startup completed and the bridge can handle traffic.
- `replyQueueName: string` — Broker-created exclusive queue that receives command responses.
- `serviceFunctions: Map<string, { cb: (message: { contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: Command; ... }) => Promise<unknown | unknown>; channel: ConfirmChannel }>` — Registered command handlers keyed by generated AMQP queue name.
- `subscriptions: Map<string, RegisteredSubscription>` — Registered subscription consumers keyed by generated subscription queue name.
- `traceProvider: NodeTracerProvider`

### Methods

- `addConsumerRegistration(channel: ConfirmChannel, tag: string): void` — Tracks a broker consumer tag so shutdown can cancel it.
- `createPublishingChannel(): Promise<ConfirmChannel>` — Creates a publishing channel, preferring confirm channels when supported.
- `deadLetterSubscriptionMessage(channel: ConfirmChannel, subscription: Subscription, msg: ConsumeMessage, reason: string): Promise<void>` — Hands a failed subscription message to the configured dead-letter target.
- `decodeContent<T>(input: Buffer, contentType: string, contentEncoding: string): Promise<T>` — Decode buffer into given type
- `destroy(): Promise<void>` — Gracefully stops all consumers, waits for in-flight subscription handlers,
closes AMQP resources and rejects unresolved pending invocations.
- `emitMessage<T>(message: Omit<EBMessage, "id" | "timestamp" | "correlationId">, contentType: string, contentEncoding: string): Promise<Readonly<EBMessage>>` — Emits a message via AMQP headers exchange.
- `encodeContent<T>(input: T, contentType: string, contentEncoding: string): Promise<Buffer<ArrayBufferLike>>` — Encode given payload to buffer
- `ensureSubscriptionRetryQueue(channel: ConfirmChannel, sourceQueueName: string, retryQueueName: string, retryDelayMs: number): Promise<void>` — Declares the TTL retry queue that routes expired messages back to the source queue.
- `getConsumerAttempt(headers: unknown): number` — Reads the PURISTA retry attempt header from an AMQP message.
- `getInFlightExecutionCount(): number` — Number of currently running handlers across all work kinds.
- `getInFlightExecutionCounts(): InFlightExecutionCounts` — Number of currently running handlers grouped by work kind.
- `getPausedSubscriptionConsumers(): object` — Returns paused subscription consumer states keyed by adapter registration key.
- `getSubscriptionDeadLetterTarget(subscription: Subscription): string | undefined` — Resolves the dead-letter routing key for a subscription failure.
- `getSubscriptionFailureReason(error: unknown): string` — Converts subscription handler failures to a concise dead-letter reason.
- `getSubscriptionRetryQueueName(queueName: string, retryDelayMs: number): string` — Builds the queue name used for delayed subscription retry.
- `getTracer(): Tracer` — Returns open telemetry tracer of this service
- `invoke<T>(input: Omit<Command, "id" | "messageType" | "timestamp" | "correlationId">, commandTimeout: number): Promise<T>` — Invokes a remote command and waits for a matching command response.
- `isHealthy(): Promise<boolean>` — Indicates if the bridge connection and channels are currently healthy.
- `isReady(): Promise<boolean>` — Indicates if the bridge finished startup and is ready to process traffic.
- `openStream<Chunk, Final>(_input: Omit<StreamOpenRequest, "id" | "messageType" | "timestamp" | "correlationId">, _ttl?: number): Promise<StreamHandle<Chunk, Final>>` — Open a stream invocation.
- `registerCommand(address: EBMessageAddress, cb: (message: { contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: Command; ... }) => Promise<{ contentEncoding: "utf-8"; contentType: "application/json"; correlationId: string; eventName: string; id: string; isHandledError: boolean; ... } | { contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: CommandSuccessResponse; ... }>, metadata: CommandDefinitionMetadataBase, eventBridgeConfig: DefinitionEventBridgeConfig): Promise<string>` — Register a service function and ensure that there is a queue for all incoming command requests.
- `registerStream(_address: EBMessageAddress, _cb: (message: StreamMessage) => Promise<void>, _metadata: StreamDefinitionMetadataBase, _eventBridgeConfig: DefinitionEventBridgeConfig): Promise<string>` — Register a service stream handler for a service target.
- `registerSubscription(subscription: Subscription, cb: (message: EBMessage) => Promise<Omit<{ contentEncoding: unknown; contentType: unknown; correlationId: unknown; eventName: unknown; id: unknown; messageType: unknown; ... }, unknown | unknown> | undefined>): Promise<string>` — Registers a subscription consumer and returns its stable subscription key.
- `removeConsumerRegistration(channel: ConfirmChannel, tag: string): void` — Removes one tracked consumer tag.
- `removeConsumerRegistrationsForChannel(channel: ConfirmChannel): void` — Removes all tracked consumer tags for a closed channel.
- `resumeSubscriptionConsumer(registrationKey: string): Promise<void>` — Resumes a subscription consumer paused by a `stop-consumer` control signal.
- `retrySubscriptionMessage(channel: ConfirmChannel, queueName: string, msg: ConsumeMessage, nextAttempt: number, retryDelayMs: number, durable: boolean): Promise<void>` — Republishes a failed subscription message for immediate or delayed retry.
- `runInFlight<T>(fn: () => Promise<T>, kind?: "command" | "subscription" | "stream" | "generic"): Promise<T>`
- `sendToQueueAndConfirm(channel: ConfirmChannel, queueName: string, content: Buffer, options: Publish | undefined): Promise<void>` — Sends a message to a queue and waits for broker confirms when available.
- `start(): Promise<void>` — Connects to the AMQP broker and declares the exchange and reply queue.
- `startActiveSpan<F>(name: string, opts: SpanOptions, context: Context | undefined, fn: (span: Span) => Promise<F>): Promise<F>` — Start a child span for opentelemetry tracking
- `unregisterCommand(address: EBMessageAddress): Promise<void>` — Unregisters a command consumer and closes the dedicated command channel.
- `unregisterStream(_address: EBMessageAddress): Promise<void>` — Unregister a service stream
- `unregisterSubscription(address: EBMessageAddress): Promise<void>` — Unregisters a subscription consumer and closes its channel.
- `waitForInFlightDrain(timeoutMs?: number): Promise<boolean>`
- `wrapInSpan<F>(name: string, opts: SpanOptions, fn: (span: Span) => Promise<F>, context?: Context): Promise<F>` — Start span for opentelemetry tracking on same level.
The created span will not become the "active" span within opentelemetry!
