Classes · @purista/amqpbridge

AmqpBridge

EventBridge implementation for AMQP brokers such as RabbitMQ.

Signature

AmqpBridge.ts typescript
class AmqpBridge

Examples

example-1.ts typescript
import { AmqpBridge } from '@purista/amqpbridge'

// create and init our eventbridge
const config = {
   url: 'amqp://localhost'
}

const eventBridge = new AmqpBridge(config)
await eventBridge.start()

Constructors

1 entry

constructor

Constructor

Source
constructor.ts typescript
new constructor(config?: { deadLetterExchangeName: string; deadLetterRoutingKey: string; defaultCommandTimeout: number; encoder: Encoder; encrypter: Encrypter; exchangeName: string; ... })

Creates an AMQP bridge with defaults merged into the provided config.

Properties

21 entries

capabilities

Property

capabilities.ts typescript
capabilities: EventBridgeCapabilities

Runtime capability matrix used for strict startup validation.

channel

Property

Source
channel.ts typescript
channel: ConfirmChannel

Shared publishing channel used for event and command publication.

config

Property

config.ts typescript
config: Complete<EventBridgeConfig<ConfigType>>

connection

Property

Source
connection.ts typescript
connection: ChannelModel

Active AMQP connection, available after start.

consumerRegistrations

Property

Source
consumerRegistrations.ts typescript
consumerRegistrations: { channel: ConfirmChannel; tag: string }[]

Consumer tags grouped with their channels for shutdown cleanup.

defaultCommandTimeout

Property

defaultCommandTimeout.ts typescript
defaultCommandTimeout: number

The default time until when a command invocation automatically returns a time out error

encoder

Property

Source
encoder.ts typescript
encoder: Encoder

Content-type codecs used before encryption.

encrypter

Property

Source
encrypter.ts typescript
encrypter: Encrypter

Content-encoding encryption handlers used after encoding.

healthy

Property

Source
healthy.ts typescript
healthy: boolean

Last known broker/channel health state.

inFlightExecutions

Property

inFlightExecutions.ts typescript
inFlightExecutions: InFlightExecutionTracker

instanceId

Property

instanceId.ts typescript
instanceId: string

Stable runtime instance id used to distinguish bridge processes.

logger

Property

logger.ts typescript
logger: Logger

metricsRecorder

Property

metricsRecorder.ts typescript
metricsRecorder: PuristaMetricsRecorder

name

Property

name.ts typescript
name: string

Human-readable bridge name used in logs, traces, and metrics.

pausedSubscriptionConsumers

Property

Source
pausedSubscriptionConsumers.ts typescript
pausedSubscriptionConsumers: Map<string, PausedSubscriptionState>

Subscription consumers paused by control signals.

pendingInvocations

Property

Source
pendingInvocations.ts typescript
pendingInvocations: PendingInvocationRegistry<unknown>

Pending command invocations waiting for a correlated response.

ready

Property

Source
ready.ts typescript
ready: boolean

Indicates whether startup completed and the bridge can handle traffic.

replyQueueName

Property

Source
replyQueueName.ts typescript
replyQueueName: string

Broker-created exclusive queue that receives command responses.

serviceFunctions

Property

Source
serviceFunctions.ts typescript
serviceFunctions: Map<string, { cb: (message: { contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: Command; ... }) => Promise<unknown | unknown>; channel: ConfirmChannel }>

Registered command handlers keyed by generated AMQP queue name.

subscriptions

Property

Source
subscriptions.ts typescript
subscriptions: Map<string, RegisteredSubscription>

Registered subscription consumers keyed by generated subscription queue name.

traceProvider

Property

traceProvider.ts typescript
traceProvider: NodeTracerProvider

Methods

36 entries

addConsumerRegistration

Method

Source
addConsumerRegistration.ts typescript
addConsumerRegistration(channel: ConfirmChannel, tag: string): void

Tracks a broker consumer tag so shutdown can cancel it.

createPublishingChannel

Method

Source
createPublishingChannel.ts typescript
createPublishingChannel(): Promise<ConfirmChannel>

Creates a publishing channel, preferring confirm channels when supported.

deadLetterSubscriptionMessage

Method

Source
deadLetterSubscriptionMessage.ts typescript
deadLetterSubscriptionMessage(channel: ConfirmChannel, subscription: Subscription, msg: ConsumeMessage, reason: string): Promise<void>

Hands a failed subscription message to the configured dead-letter target.

decodeContent

Method

Source
decodeContent.ts typescript
decodeContent<T>(input: Buffer, contentType: string, contentEncoding: string): Promise<T>

Decode buffer into given type

destroy

Method

Source
destroy.ts typescript
destroy(): Promise<void>

Gracefully stops all consumers, waits for in-flight subscription handlers, closes AMQP resources and rejects unresolved pending invocations.

emitMessage

Method

Source
emitMessage.ts typescript
emitMessage<T>(message: Omit<EBMessage, "id" | "timestamp" | "correlationId">, contentType: string, contentEncoding: string): Promise<Readonly<EBMessage>>

Emits a message via AMQP headers exchange.

encodeContent

Method

Source
encodeContent.ts typescript
encodeContent<T>(input: T, contentType: string, contentEncoding: string): Promise<Buffer<ArrayBufferLike>>

Encode given payload to buffer

ensureSubscriptionRetryQueue

Method

Source
ensureSubscriptionRetryQueue.ts typescript
ensureSubscriptionRetryQueue(channel: ConfirmChannel, sourceQueueName: string, retryQueueName: string, retryDelayMs: number): Promise<void>

Declares the TTL retry queue that routes expired messages back to the source queue.

getConsumerAttempt

Method

Source
getConsumerAttempt.ts typescript
getConsumerAttempt(headers: unknown): number

Reads the PURISTA retry attempt header from an AMQP message.

getInFlightExecutionCount

Method

getInFlightExecutionCount.ts typescript
getInFlightExecutionCount(): number

Number of currently running handlers across all work kinds.

getInFlightExecutionCounts

Method

getInFlightExecutionCounts.ts typescript
getInFlightExecutionCounts(): InFlightExecutionCounts

Number of currently running handlers grouped by work kind.

getPausedSubscriptionConsumers

Method

Source
getPausedSubscriptionConsumers.ts typescript
getPausedSubscriptionConsumers(): object

Returns paused subscription consumer states keyed by adapter registration key.

getSubscriptionDeadLetterTarget

Method

Source
getSubscriptionDeadLetterTarget.ts typescript
getSubscriptionDeadLetterTarget(subscription: Subscription): string | undefined

Resolves the dead-letter routing key for a subscription failure.

getSubscriptionFailureReason

Method

Source
getSubscriptionFailureReason.ts typescript
getSubscriptionFailureReason(error: unknown): string

Converts subscription handler failures to a concise dead-letter reason.

getSubscriptionRetryQueueName

Method

Source
getSubscriptionRetryQueueName.ts typescript
getSubscriptionRetryQueueName(queueName: string, retryDelayMs: number): string

Builds the queue name used for delayed subscription retry.

getTracer

Method

getTracer.ts typescript
getTracer(): Tracer

Returns open telemetry tracer of this service

invoke

Method

Source
invoke.ts typescript
invoke<T>(input: Omit<Command, "id" | "messageType" | "timestamp" | "correlationId">, commandTimeout: number): Promise<T>

Invokes a remote command and waits for a matching command response.

isHealthy

Method

Source
isHealthy.ts typescript
isHealthy(): Promise<boolean>

Indicates if the bridge connection and channels are currently healthy.

isReady

Method

Source
isReady.ts typescript
isReady(): Promise<boolean>

Indicates if the bridge finished startup and is ready to process traffic.

openStream

Method

openStream.ts typescript
openStream<Chunk, Final>(_input: Omit<StreamOpenRequest, "id" | "messageType" | "timestamp" | "correlationId">, _ttl?: number): Promise<StreamHandle<Chunk, Final>>

Open a stream invocation.

registerCommand

Method

Source
registerCommand.ts typescript
registerCommand(address: EBMessageAddress, cb: (message: { contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: Command; ... }) => Promise<{ contentEncoding: "utf-8"; contentType: "application/json"; correlationId: string; eventName: string; id: string; isHandledError: boolean; ... } | { contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: CommandSuccessResponse; ... }>, metadata: CommandDefinitionMetadataBase, eventBridgeConfig: DefinitionEventBridgeConfig): Promise<string>

Register a service function and ensure that there is a queue for all incoming command requests.

registerStream

Method

registerStream.ts typescript
registerStream(_address: EBMessageAddress, _cb: (message: StreamMessage) => Promise<void>, _metadata: StreamDefinitionMetadataBase, _eventBridgeConfig: DefinitionEventBridgeConfig): Promise<string>

Register a service stream handler for a service target.

registerSubscription

Method

Source
registerSubscription.ts typescript
registerSubscription(subscription: Subscription, cb: (message: EBMessage) => Promise<Omit<{ contentEncoding: unknown; contentType: unknown; correlationId: unknown; eventName: unknown; id: unknown; messageType: unknown; ... }, unknown | unknown> | undefined>): Promise<string>

Registers a subscription consumer and returns its stable subscription key.

removeConsumerRegistration

Method

Source
removeConsumerRegistration.ts typescript
removeConsumerRegistration(channel: ConfirmChannel, tag: string): void

Removes one tracked consumer tag.

removeConsumerRegistrationsForChannel

Method

Source
removeConsumerRegistrationsForChannel.ts typescript
removeConsumerRegistrationsForChannel(channel: ConfirmChannel): void

Removes all tracked consumer tags for a closed channel.

resumeSubscriptionConsumer

Method

Source
resumeSubscriptionConsumer.ts typescript
resumeSubscriptionConsumer(registrationKey: string): Promise<void>

Resumes a subscription consumer paused by a `stop-consumer` control signal.

retrySubscriptionMessage

Method

Source
retrySubscriptionMessage.ts typescript
retrySubscriptionMessage(channel: ConfirmChannel, queueName: string, msg: ConsumeMessage, nextAttempt: number, retryDelayMs: number, durable: boolean): Promise<void>

Republishes a failed subscription message for immediate or delayed retry.

runInFlight

Method

runInFlight.ts typescript
runInFlight<T>(fn: () => Promise<T>, kind?: "command" | "subscription" | "stream" | "generic"): Promise<T>

sendToQueueAndConfirm

Method

Source
sendToQueueAndConfirm.ts typescript
sendToQueueAndConfirm(channel: ConfirmChannel, queueName: string, content: Buffer, options: Publish | undefined): Promise<void>

Sends a message to a queue and waits for broker confirms when available.

start

Method

Source
start.ts typescript
start(): Promise<void>

Connects to the AMQP broker and declares the exchange and reply queue.

startActiveSpan

Method

startActiveSpan.ts typescript
startActiveSpan<F>(name: string, opts: SpanOptions, context: Context | undefined, fn: (span: Span) => Promise<F>): Promise<F>

Start a child span for opentelemetry tracking

unregisterCommand

Method

Source
unregisterCommand.ts typescript
unregisterCommand(address: EBMessageAddress): Promise<void>

Unregisters a command consumer and closes the dedicated command channel.

unregisterStream

Method

unregisterStream.ts typescript
unregisterStream(_address: EBMessageAddress): Promise<void>

Unregister a service stream

unregisterSubscription

Method

Source
unregisterSubscription.ts typescript
unregisterSubscription(address: EBMessageAddress): Promise<void>

Unregisters a subscription consumer and closes its channel.

waitForInFlightDrain

Method

waitForInFlightDrain.ts typescript
waitForInFlightDrain(timeoutMs?: number): Promise<boolean>

wrapInSpan

Method

wrapInSpan.ts typescript
wrapInSpan<F>(name: string, opts: SpanOptions, fn: (span: Span) => Promise<F>, context?: Context): Promise<F>

Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!