Skip to content

PURISTA API


PURISTA API / @purista/amqpbridge / AmqpBridge

Class: AmqpBridge

Defined in: amqpbridge/src/AmqpBridge.impl.ts:92

The AMQP event bridge connects to a AMQP broker.

Example

typescript
import { AmqpBridge } from '@purista/amqpbridge'

// create and init our eventbridge
const config = {
   url: 'amqp://localhost'
}

const eventBridge = new AmqpBridge(config)
await eventBridge.start()

Extends

Implements

Constructors

Constructor

new AmqpBridge(config?): AmqpBridge

Defined in: amqpbridge/src/AmqpBridge.impl.ts:287

Parameters

config?
deadLetterExchangeName?

string

optional dead letter exchange name used for durable command/subscription queues

deadLetterRoutingKey?

string

optional dead letter routing key used for durable command/subscription queues

defaultCommandTimeout?

number

Overwrite the hardcoded default timeout of command invocations

encoder?

Encoder

the encoder(s) to be used for AMQP messages

Default

ts
jsonEncoder
encrypter?

Encrypter

the encrypter(s) to be used for AMQP messages

Default

ts
plain
exchangeName?

string

the AMQP exchage name to be used

Default

ts
purista
exchangeOptions?

AssertExchange

the AMQP exchange options

instanceId?

string

The instance id of the event bridge. If not set, a id will generated each time a instance is created. Use this if there is a need to always have the same instance id.

logger?

Logger

logLevel?

LogLevelName

metrics?

PuristaMetricsRuntimeOptions

metricsRecorder?

PuristaMetricsRecorderInterface

namePrefix?

string

the queue prefix to be used for all PURISTA queues except short living queues created by the broker on request

Default

ts
purista
prefetch?

number

max unacked messages per consumer channel

socketOptions?

SocketOptions

socket options

spanProcessor?

SpanProcessor

url?

string | Connect

the AMQP broker url

Default

ts
amqp://localhost

Returns

AmqpBridge

Overrides

EventBridgeBaseClass.constructor

Properties

capabilities

capabilities: EventBridgeCapabilities

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:28

Implementation of

EventBridge.capabilities

Inherited from

EventBridgeBaseClass.capabilities


channel?

protected optional channel?: ConfirmChannel

Defined in: amqpbridge/src/AmqpBridge.impl.ts:94


config

config: Complete<EventBridgeConfig<ConfigType>>

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:26

Inherited from

EventBridgeBaseClass.config


connection?

protected optional connection?: ChannelModel

Defined in: amqpbridge/src/AmqpBridge.impl.ts:93


consumerRegistrations

protected consumerRegistrations: object[] = []

Defined in: amqpbridge/src/AmqpBridge.impl.ts:99

channel

channel: ConfirmChannel

tag

tag: string


defaultCommandTimeout

defaultCommandTimeout: number

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:30

The default time until when a command invocation automatically returns a time out error

Implementation of

EventBridge.defaultCommandTimeout

Inherited from

EventBridgeBaseClass.defaultCommandTimeout


encoder

protected encoder: Encoder

Defined in: amqpbridge/src/AmqpBridge.impl.ts:119


encrypter

protected encrypter: Encrypter

Defined in: amqpbridge/src/AmqpBridge.impl.ts:123


healthy

protected healthy: boolean = false

Defined in: amqpbridge/src/AmqpBridge.impl.ts:96


inFlightExecutions

protected readonly inFlightExecutions: InFlightExecutionTracker

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:31

Inherited from

EventBridgeBaseClass.inFlightExecutions


instanceId

instanceId: string

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:29

Implementation of

EventBridge.instanceId

Inherited from

EventBridgeBaseClass.instanceId


logger

logger: Logger

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:23

Inherited from

AmqpBridge.logger


metricsRecorder

protected metricsRecorder: PuristaMetricsRecorderInterface

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:25

Inherited from

EventBridgeBaseClass.metricsRecorder


name

name: string

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:27

Implementation of

EventBridge.name

Inherited from

EventBridgeBaseClass.name


pausedSubscriptionConsumers

protected pausedSubscriptionConsumers: Map<string, PausedSubscriptionState>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:117


pendingInvocations

protected pendingInvocations: PendingInvocationRegistry<unknown>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:110


ready

protected ready: boolean = false

Defined in: amqpbridge/src/AmqpBridge.impl.ts:97


replyQueueName?

protected optional replyQueueName?: string

Defined in: amqpbridge/src/AmqpBridge.impl.ts:101


serviceFunctions

protected serviceFunctions: Map<string, { cb: (message) => Promise<{ contentEncoding: "utf-8"; contentType: "application/json"; correlationId: string; eventName?: string; id: string; isHandledError: boolean; messageType: CommandErrorResponse; otp?: string; payload: { data?: unknown; message: string; status: StatusCode; }; principalId?: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; } | { contentEncoding: string; contentType: string; correlationId: string; eventName?: string; id: string; messageType: CommandSuccessResponse; otp?: string; payload: unknown; principalId?: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; }>; channel: ConfirmChannel; }>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:102


subscriptions

protected subscriptions: Map<string, RegisteredSubscription>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:116


traceProvider

traceProvider: NodeTracerProvider

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:24

Inherited from

EventBridgeBaseClass.traceProvider

Methods

addConsumerRegistration()

protected addConsumerRegistration(channel, tag): void

Defined in: amqpbridge/src/AmqpBridge.impl.ts:127

Parameters

channel

ConfirmChannel

tag

string

Returns

void


createPublishingChannel()

protected createPublishingChannel(): Promise<ConfirmChannel>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:153

Returns

Promise<ConfirmChannel>


deadLetterSubscriptionMessage()

protected deadLetterSubscriptionMessage(channel, subscription, msg, reason): Promise<void>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:256

Parameters

channel

ConfirmChannel

subscription

Subscription

msg

ConsumeMessage

reason

string

Returns

Promise<void>


decodeContent()

protected decodeContent<T>(input, contentType, contentEncoding): Promise<T>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:1264

Decode buffer into given type

Type Parameters

T

T

Parameters

input

Buffer

the input buffer

contentType

string

the content type of buffer content

contentEncoding

string

the encoding type of buffer content

Returns

Promise<T>


destroy()

destroy(): Promise<void>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:1283

Gracefully stops all consumers, waits for in-flight subscription handlers, closes AMQP resources and rejects unresolved pending invocations.

Returns

Promise<void>

Implementation of

EventBridge.destroy

Overrides

EventBridgeBaseClass.destroy


emitMessage()

emitMessage<T>(message, contentType?, contentEncoding?): Promise<Readonly<EBMessage>>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:498

Emits a message via AMQP headers exchange. The message is encoded and encrypted according to configured codecs.

Type Parameters

T

T extends EBMessage

Parameters

message

Omit<EBMessage, "id" | "timestamp" | "correlationId">

contentType?

string = 'application/json'

contentEncoding?

string = 'utf-8'

Returns

Promise<Readonly<EBMessage>>

Implementation of

EventBridge.emitMessage


encodeContent()

protected encodeContent<T>(input, contentType, contentEncoding): Promise<Buffer<ArrayBufferLike>>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:1243

Encode given payload to buffer

Type Parameters

T

T

Parameters

input

T

contentType

string

contentEncoding

string

Returns

Promise<Buffer<ArrayBufferLike>>


ensureSubscriptionRetryQueue()

protected ensureSubscriptionRetryQueue(channel, sourceQueueName, retryQueueName, retryDelayMs): Promise<void>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:240

Parameters

channel

ConfirmChannel

sourceQueueName

string

retryQueueName

string

retryDelayMs

number

Returns

Promise<void>


getConsumerAttempt()

protected getConsumerAttempt(headers): number

Defined in: amqpbridge/src/AmqpBridge.impl.ts:163

Parameters

headers

unknown

Returns

number


getInFlightExecutionCount()

getInFlightExecutionCount(): number

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:69

Number of currently running handlers across all work kinds.

Returns

number

Implementation of

EventBridge.getInFlightExecutionCount

Inherited from

EventBridgeBaseClass.getInFlightExecutionCount


getInFlightExecutionCounts()

getInFlightExecutionCounts(): InFlightExecutionCounts

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:70

Number of currently running handlers grouped by work kind.

Returns

InFlightExecutionCounts

Implementation of

EventBridge.getInFlightExecutionCounts

Inherited from

EventBridgeBaseClass.getInFlightExecutionCounts


getPausedSubscriptionConsumers()

getPausedSubscriptionConsumers(): object

Defined in: amqpbridge/src/AmqpBridge.impl.ts:1211

Returns paused subscription consumer states keyed by adapter registration key.

Returns

object

Implementation of

EventBridge.getPausedSubscriptionConsumers

Overrides

EventBridgeBaseClass.getPausedSubscriptionConsumers


getSubscriptionDeadLetterTarget()

protected getSubscriptionDeadLetterTarget(subscription): string | undefined

Defined in: amqpbridge/src/AmqpBridge.impl.ts:182

Parameters

subscription

Subscription

Returns

string | undefined


getSubscriptionFailureReason()

protected getSubscriptionFailureReason(error): string

Defined in: amqpbridge/src/AmqpBridge.impl.ts:186

Parameters

error

unknown

Returns

string


getSubscriptionRetryQueueName()

protected getSubscriptionRetryQueueName(queueName, retryDelayMs): string

Defined in: amqpbridge/src/AmqpBridge.impl.ts:236

Parameters

queueName

string

retryDelayMs

number

Returns

string


getTracer()

getTracer(): Tracer

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:38

Returns open telemetry tracer of this service

Returns

Tracer

Tracer

Inherited from

EventBridgeBaseClass.getTracer


invoke()

invoke<T>(input, commandTimeout?): Promise<T>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:577

Invokes a remote command and waits for a matching command response. The call is rejected with timeout if no response is received in time.

Type Parameters

T

T

Parameters

input

Omit<Command, "id" | "messageType" | "timestamp" | "correlationId">

commandTimeout?

number = ...

Returns

Promise<T>

Implementation of

EventBridge.invoke


isHealthy()

isHealthy(): Promise<boolean>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:350

Indicates if the bridge connection and channels are currently healthy.

Returns

Promise<boolean>

Implementation of

EventBridge.isHealthy


isReady()

isReady(): Promise<boolean>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:343

Indicates if the bridge finished startup and is ready to process traffic.

Returns

Promise<boolean>

Implementation of

EventBridge.isReady


openStream()

openStream<Chunk, Final>(_input, _ttl?): Promise<StreamHandle<Chunk, Final>>

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:73

Open a stream invocation. The returned handle can be consumed via async iteration and can be cancelled by caller.

Type Parameters

Chunk

Chunk = unknown

Final

Final = unknown

Parameters

_input

Omit<StreamOpenRequest, "id" | "messageType" | "timestamp" | "correlationId">

_ttl?

number

Returns

Promise<StreamHandle<Chunk, Final>>

Implementation of

EventBridge.openStream

Inherited from

EventBridgeBaseClass.openStream


registerCommand()

registerCommand(address, cb, metadata, eventBridgeConfig): Promise<string>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:678

Register a service function and ensure that there is a queue for all incoming command requests.

Parameters

address

EBMessageAddress

The service function address

cb

(message) => Promise<{ contentEncoding: "utf-8"; contentType: "application/json"; correlationId: string; eventName?: string; id: string; isHandledError: boolean; messageType: CommandErrorResponse; otp?: string; payload: { data?: unknown; message: string; status: StatusCode; }; principalId?: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; } | { contentEncoding: string; contentType: string; correlationId: string; eventName?: string; id: string; messageType: CommandSuccessResponse; otp?: string; payload: unknown; principalId?: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; }>

the function to call if a matching command message arrives

metadata

CommandDefinitionMetadataBase

eventBridgeConfig

DefinitionEventBridgeConfig

Returns

Promise<string>

the id of command function queue

Implementation of

EventBridge.registerCommand


registerStream()

registerStream(_address, _cb, _metadata, _eventBridgeConfig): Promise<string>

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:74

Register a service stream.

Parameters

_address

EBMessageAddress

_cb

(message) => Promise<void>

_metadata

StreamDefinitionMetadataBase

_eventBridgeConfig

DefinitionEventBridgeConfig

Returns

Promise<string>

Implementation of

EventBridge.registerStream

Inherited from

EventBridgeBaseClass.registerStream


registerSubscription()

registerSubscription(subscription, cb): Promise<string>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:932

Registers a subscription consumer and returns its stable subscription key.

Parameters

subscription

Subscription

cb

(message) => Promise<Omit<{ contentEncoding: string; contentType: string; correlationId?: string; eventName: string; id: string; messageType: CustomMessage; otp?: string; payload?: unknown; principalId?: string; receiver?: EBMessageAddress; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; }, "id" | "timestamp"> | undefined>

Returns

Promise<string>

Implementation of

EventBridge.registerSubscription


removeConsumerRegistration()

protected removeConsumerRegistration(channel, tag): void

Defined in: amqpbridge/src/AmqpBridge.impl.ts:135

Parameters

channel

ConfirmChannel

tag

string

Returns

void


removeConsumerRegistrationsForChannel()

protected removeConsumerRegistrationsForChannel(channel): void

Defined in: amqpbridge/src/AmqpBridge.impl.ts:131

Parameters

channel

ConfirmChannel

Returns

void


resumeSubscriptionConsumer()

resumeSubscriptionConsumer(registrationKey): Promise<void>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:1215

Resumes a paused subscription consumer by registration key.

Parameters

registrationKey

string

Returns

Promise<void>

Implementation of

EventBridge.resumeSubscriptionConsumer

Overrides

EventBridgeBaseClass.resumeSubscriptionConsumer


retrySubscriptionMessage()

protected retrySubscriptionMessage(channel, queueName, msg, nextAttempt, retryDelayMs, durable): Promise<void>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:204

Parameters

channel

ConfirmChannel

queueName

string

msg

ConsumeMessage

nextAttempt

number

retryDelayMs

number

durable

boolean

Returns

Promise<void>


runInFlight()

runInFlight<T>(fn, kind?): Promise<T>

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:67

Type Parameters

T

T

Parameters

fn

() => Promise<T>

kind?

"command" | "subscription" | "stream" | "generic"

Returns

Promise<T>

Inherited from

EventBridgeBaseClass.runInFlight


sendToQueueAndConfirm()

protected sendToQueueAndConfirm(channel, queueName, content, options): Promise<void>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:141

Parameters

channel

ConfirmChannel

queueName

string

content

Buffer

options

Publish | undefined

Returns

Promise<void>


start()

start(): Promise<void>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:357

Connect to RabbitMQ broker, ensure exchange, call back queue

Returns

Promise<void>

Implementation of

EventBridge.start

Overrides

EventBridgeBaseClass.start


startActiveSpan()

startActiveSpan<F>(name, opts, context, fn): Promise<F>

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:47

Start a child span for opentelemetry tracking

Type Parameters

F

F

Parameters

name

string

name of span

opts

SpanOptions

span options

context

Context | undefined

optional context

fn

(span) => Promise<F>

function to be executed within the span

Returns

Promise<F>

return value of fn

Inherited from

EventBridgeBaseClass.startActiveSpan


unregisterCommand()

unregisterCommand(address): Promise<void>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:908

Unregisters a command consumer and closes the dedicated command channel.

Parameters

address

EBMessageAddress

Returns

Promise<void>

Implementation of

EventBridge.unregisterCommand


unregisterStream()

unregisterStream(_address): Promise<void>

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:75

Unregister a service stream

Parameters

_address

EBMessageAddress

Returns

Promise<void>

Implementation of

EventBridge.unregisterStream

Inherited from

EventBridgeBaseClass.unregisterStream


unregisterSubscription()

unregisterSubscription(address): Promise<void>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:1189

Unregisters a subscription consumer and closes its channel.

Parameters

address

EBMessageAddress

Returns

Promise<void>

Implementation of

EventBridge.unregisterSubscription


waitForInFlightDrain()

waitForInFlightDrain(timeoutMs?): Promise<boolean>

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:68

Parameters

timeoutMs?

number

Returns

Promise<boolean>

Inherited from

EventBridgeBaseClass.waitForInFlightDrain


wrapInSpan()

wrapInSpan<F>(name, opts, fn, context?): Promise<F>

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:64

Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!

This means during logging and similar the spanId of parent span is logged.

Use wrapInSpan for marking points in flow of one bigger function, but not to trace the program flow itself

Type Parameters

F

F

Parameters

name

string

name of span

opts

SpanOptions

span options

fn

(span) => Promise<F>

function te be executed in the span

context?

Context

span context

Returns

Promise<F>

return value of fn

Inherited from

EventBridgeBaseClass.wrapInSpan