Classes · @purista/amqpbridge
AmqpBridge
EventBridge implementation for AMQP brokers such as RabbitMQ.
Signature
class AmqpBridge Examples
import { AmqpBridge } from '@purista/amqpbridge'
// create and init our eventbridge
const config = {
url: 39;amqp://localhost'
}
const eventBridge = new AmqpBridge(config)
await eventBridge.start() Constructors
1 entry
constructor
Constructor
new constructor(config?: { deadLetterExchangeName: string; deadLetterRoutingKey: string; defaultCommandTimeout: number; encoder: Encoder; encrypter: Encrypter; exchangeName: string; ... }) Creates an AMQP bridge with defaults merged into the provided config.
Properties
21 entries
capabilities
Property
capabilities: EventBridgeCapabilities Runtime capability matrix used for strict startup validation.
channel
Property
channel: ConfirmChannel Shared publishing channel used for event and command publication.
config
Property
config: Complete<EventBridgeConfig<ConfigType>> connection
Property
connection: ChannelModel Active AMQP connection, available after start.
consumerRegistrations
Property
consumerRegistrations: { channel: ConfirmChannel; tag: string }[] Consumer tags grouped with their channels for shutdown cleanup.
defaultCommandTimeout
Property
defaultCommandTimeout: number The default time until when a command invocation automatically returns a time out error
encoder
Property
encoder: Encoder Content-type codecs used before encryption.
encrypter
Property
encrypter: Encrypter Content-encoding encryption handlers used after encoding.
healthy
Property
healthy: boolean Last known broker/channel health state.
inFlightExecutions
Property
inFlightExecutions: InFlightExecutionTracker instanceId
Property
instanceId: string Stable runtime instance id used to distinguish bridge processes.
logger
Property
logger: Logger metricsRecorder
Property
metricsRecorder: PuristaMetricsRecorder name
Property
name: string Human-readable bridge name used in logs, traces, and metrics.
pausedSubscriptionConsumers
Property
pausedSubscriptionConsumers: Map<string, PausedSubscriptionState> Subscription consumers paused by control signals.
pendingInvocations
Property
pendingInvocations: PendingInvocationRegistry<unknown> Pending command invocations waiting for a correlated response.
ready
Property
ready: boolean Indicates whether startup completed and the bridge can handle traffic.
replyQueueName
Property
replyQueueName: string Broker-created exclusive queue that receives command responses.
serviceFunctions
Property
serviceFunctions: Map<string, { cb: (message: { contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: Command; ... }) => Promise<unknown | unknown>; channel: ConfirmChannel }> Registered command handlers keyed by generated AMQP queue name.
subscriptions
Property
subscriptions: Map<string, RegisteredSubscription> Registered subscription consumers keyed by generated subscription queue name.
traceProvider
Property
traceProvider: NodeTracerProvider Methods
36 entries
addConsumerRegistration
Method
addConsumerRegistration(channel: ConfirmChannel, tag: string): void Tracks a broker consumer tag so shutdown can cancel it.
createPublishingChannel
Method
createPublishingChannel(): Promise<ConfirmChannel> Creates a publishing channel, preferring confirm channels when supported.
deadLetterSubscriptionMessage
Method
deadLetterSubscriptionMessage(channel: ConfirmChannel, subscription: Subscription, msg: ConsumeMessage, reason: string): Promise<void> Hands a failed subscription message to the configured dead-letter target.
decodeContent
Method
decodeContent<T>(input: Buffer, contentType: string, contentEncoding: string): Promise<T> Decode buffer into given type
destroy
Method
destroy(): Promise<void> Gracefully stops all consumers, waits for in-flight subscription handlers, closes AMQP resources and rejects unresolved pending invocations.
emitMessage
Method
emitMessage<T>(message: Omit<EBMessage, "id" | "timestamp" | "correlationId">, contentType: string, contentEncoding: string): Promise<Readonly<EBMessage>> Emits a message via AMQP headers exchange.
encodeContent
Method
encodeContent<T>(input: T, contentType: string, contentEncoding: string): Promise<Buffer<ArrayBufferLike>> Encode given payload to buffer
ensureSubscriptionRetryQueue
Method
ensureSubscriptionRetryQueue(channel: ConfirmChannel, sourceQueueName: string, retryQueueName: string, retryDelayMs: number): Promise<void> Declares the TTL retry queue that routes expired messages back to the source queue.
getConsumerAttempt
Method
getConsumerAttempt(headers: unknown): number Reads the PURISTA retry attempt header from an AMQP message.
getInFlightExecutionCount
Method
getInFlightExecutionCount(): number Number of currently running handlers across all work kinds.
getInFlightExecutionCounts
Method
getInFlightExecutionCounts(): InFlightExecutionCounts Number of currently running handlers grouped by work kind.
getPausedSubscriptionConsumers
Method
getPausedSubscriptionConsumers(): object Returns paused subscription consumer states keyed by adapter registration key.
getSubscriptionDeadLetterTarget
Method
getSubscriptionDeadLetterTarget(subscription: Subscription): string | undefined Resolves the dead-letter routing key for a subscription failure.
getSubscriptionFailureReason
Method
getSubscriptionFailureReason(error: unknown): string Converts subscription handler failures to a concise dead-letter reason.
getSubscriptionRetryQueueName
Method
getSubscriptionRetryQueueName(queueName: string, retryDelayMs: number): string Builds the queue name used for delayed subscription retry.
getTracer
Method
getTracer(): Tracer Returns open telemetry tracer of this service
invoke
Method
invoke<T>(input: Omit<Command, "id" | "messageType" | "timestamp" | "correlationId">, commandTimeout: number): Promise<T> Invokes a remote command and waits for a matching command response.
isHealthy
Method
isHealthy(): Promise<boolean> Indicates if the bridge connection and channels are currently healthy.
isReady
Method
isReady(): Promise<boolean> Indicates if the bridge finished startup and is ready to process traffic.
openStream
Method
openStream<Chunk, Final>(_input: Omit<StreamOpenRequest, "id" | "messageType" | "timestamp" | "correlationId">, _ttl?: number): Promise<StreamHandle<Chunk, Final>> Open a stream invocation.
registerCommand
Method
registerCommand(address: EBMessageAddress, cb: (message: { contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: Command; ... }) => Promise<{ contentEncoding: "utf-8"; contentType: "application/json"; correlationId: string; eventName: string; id: string; isHandledError: boolean; ... } | { contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: CommandSuccessResponse; ... }>, metadata: CommandDefinitionMetadataBase, eventBridgeConfig: DefinitionEventBridgeConfig): Promise<string> Register a service function and ensure that there is a queue for all incoming command requests.
registerStream
Method
registerStream(_address: EBMessageAddress, _cb: (message: StreamMessage) => Promise<void>, _metadata: StreamDefinitionMetadataBase, _eventBridgeConfig: DefinitionEventBridgeConfig): Promise<string> Register a service stream handler for a service target.
registerSubscription
Method
registerSubscription(subscription: Subscription, cb: (message: EBMessage) => Promise<Omit<{ contentEncoding: unknown; contentType: unknown; correlationId: unknown; eventName: unknown; id: unknown; messageType: unknown; ... }, unknown | unknown> | undefined>): Promise<string> Registers a subscription consumer and returns its stable subscription key.
removeConsumerRegistration
Method
removeConsumerRegistration(channel: ConfirmChannel, tag: string): void Removes one tracked consumer tag.
removeConsumerRegistrationsForChannel
Method
removeConsumerRegistrationsForChannel(channel: ConfirmChannel): void Removes all tracked consumer tags for a closed channel.
resumeSubscriptionConsumer
Method
resumeSubscriptionConsumer(registrationKey: string): Promise<void> Resumes a subscription consumer paused by a `stop-consumer` control signal.
retrySubscriptionMessage
Method
retrySubscriptionMessage(channel: ConfirmChannel, queueName: string, msg: ConsumeMessage, nextAttempt: number, retryDelayMs: number, durable: boolean): Promise<void> Republishes a failed subscription message for immediate or delayed retry.
runInFlight
Method
runInFlight<T>(fn: () => Promise<T>, kind?: "command" | "subscription" | "stream" | "generic"): Promise<T> sendToQueueAndConfirm
Method
sendToQueueAndConfirm(channel: ConfirmChannel, queueName: string, content: Buffer, options: Publish | undefined): Promise<void> Sends a message to a queue and waits for broker confirms when available.
start
Method
start(): Promise<void> Connects to the AMQP broker and declares the exchange and reply queue.
startActiveSpan
Method
startActiveSpan<F>(name: string, opts: SpanOptions, context: Context | undefined, fn: (span: Span) => Promise<F>): Promise<F> Start a child span for opentelemetry tracking
unregisterCommand
Method
unregisterCommand(address: EBMessageAddress): Promise<void> Unregisters a command consumer and closes the dedicated command channel.
unregisterStream
Method
unregisterStream(_address: EBMessageAddress): Promise<void> Unregister a service stream
unregisterSubscription
Method
unregisterSubscription(address: EBMessageAddress): Promise<void> Unregisters a subscription consumer and closes its channel.
waitForInFlightDrain
Method
waitForInFlightDrain(timeoutMs?: number): Promise<boolean> wrapInSpan
Method
wrapInSpan<F>(name: string, opts: SpanOptions, fn: (span: Span) => Promise<F>, context?: Context): Promise<F> Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!