Skip to content

PURISTA API / Modules / @purista/amqpbridge / AmqpBridge

Class: AmqpBridge

@purista/amqpbridge.AmqpBridge

The AMQP event bridge connects to a AMQP broker.

Example

typescript
import { AmqpBridge } from '@purista/amqpbridge'

// create and init our eventbridge
const config = {
   url: 'amqp://localhost'
}

const eventBridge = new AmqpBridge(config)
await eventBridge.start()

Hierarchy

Implements

Table of contents

Constructors

Properties

Methods

Constructors

constructor

new AmqpBridge(config?): AmqpBridge

Parameters

NameTypeDescription
config?Object-
config.defaultCommandTimeout?numberOverwrite the hardcoded default timeout of command invocations
config.encoder?Encoderthe encoder(s) to be used for AMQP messages Default ts jsonEncoder
config.encrypter?Encrypterthe encrypter(s) to be used for AMQP messages Default ts plain
config.exchangeName?stringthe AMQP exchage name to be used Default ts purista
config.exchangeOptions?AssertExchangethe AMQP exchange options
config.instanceId?stringThe instance id of the event bridge. If not set, a id will generated each time a instance is created. Use this if there is a need to always have the same instance id.
config.logLevel?LogLevelNameIf no logger instance is given, use this log level
config.logger?LoggerA logger instance
config.namePrefix?stringthe queue prefix to be used for all PURISTA queues except short living queues created by the broker on request Default ts purista
config.socketOptions?anysocket options
config.spanProcessor?SpanProcessorA OpenTelemetry span processor
config.url?string | Connectthe AMQP broker url Default ts amqp://localhost

Returns

AmqpBridge

Overrides

EventBridgeBaseClass.constructor

Defined in

amqpbridge/src/AmqpBridge.impl.ts:105

Properties

channel

Protected Optional channel: Channel

Defined in

amqpbridge/src/AmqpBridge.impl.ts:69


config

config: Complete<{ defaultCommandTimeout?: number ; encoder?: Encoder ; encrypter?: Encrypter ; exchangeName?: string ; exchangeOptions?: AssertExchange ; instanceId?: string ; logLevel?: LogLevelName ; logger?: Logger ; namePrefix?: string ; socketOptions?: any ; spanProcessor?: SpanProcessor ; url?: string | Connect }>

Inherited from

EventBridgeBaseClass.config

Defined in

core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:14


connection

Protected Optional connection: Connection

Defined in

amqpbridge/src/AmqpBridge.impl.ts:68


consumerTags

Protected consumerTags: string[] = []

Defined in

amqpbridge/src/AmqpBridge.impl.ts:74


defaultCommandTimeout

defaultCommandTimeout: number

The default time until when a command invocation automatically returns a time out error

Implementation of

EventBridge.defaultCommandTimeout

Inherited from

EventBridgeBaseClass.defaultCommandTimeout

Defined in

core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:17


encoder

Protected encoder: Encoder

Defined in

amqpbridge/src/AmqpBridge.impl.ts:97


encrypter

Protected encrypter: Encrypter

Defined in

amqpbridge/src/AmqpBridge.impl.ts:101


healthy

Protected healthy: boolean = false

Defined in

amqpbridge/src/AmqpBridge.impl.ts:71


instanceId

instanceId: string

Implementation of

EventBridge.instanceId

Inherited from

EventBridgeBaseClass.instanceId

Defined in

core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:16


logger

logger: Logger

Inherited from

EventBridgeBaseClass.logger

Defined in

core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:12


name

name: string

Implementation of

EventBridge.name

Inherited from

EventBridgeBaseClass.name

Defined in

core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:15


pendingInvocations

Protected pendingInvocations: Map<string, PendigInvocation>

Defined in

amqpbridge/src/AmqpBridge.impl.ts:85


ready

Protected ready: boolean = false

Defined in

amqpbridge/src/AmqpBridge.impl.ts:72


replyQueueName

Protected Optional replyQueueName: string

Defined in

amqpbridge/src/AmqpBridge.impl.ts:76


runningSubscriptionCount

Protected runningSubscriptionCount: number = 0

Defined in

amqpbridge/src/AmqpBridge.impl.ts:87


serviceFunctions

Protected serviceFunctions: Map<string, { cb: (message: { contentEncoding: string ; contentType: string ; correlationId: string ; eventName?: string ; id: string ; messageType: Command ; otp?: string ; payload: { parameter: unknown ; payload: unknown } ; principalId?: string ; receiver: EBMessageAddress ; sender: { instanceId: string ; serviceName: string ; serviceTarget: string ; serviceVersion: string } ; tenantId?: string ; timestamp: number ; traceId?: string }) => Promise<{ contentEncoding: "utf-8" ; contentType: "application/json" ; correlationId: string ; eventName?: string ; id: string ; isHandledError: boolean ; messageType: CommandErrorResponse ; otp?: string ; payload: { data?: unknown ; message: string ; status: StatusCode } ; principalId?: string ; receiver: { instanceId: string ; serviceName: string ; serviceTarget: string ; serviceVersion: string } ; sender: { instanceId: string ; serviceName: string ; serviceTarget: string ; serviceVersion: string } ; tenantId?: string ; timestamp: number ; traceId?: string } | { contentEncoding: string ; contentType: string ; correlationId: string ; eventName?: string ; id: string ; messageType: CommandSuccessResponse ; otp?: string ; payload: unknown ; principalId?: string ; receiver: { instanceId: string ; serviceName: string ; serviceTarget: string ; serviceVersion: string } ; sender: { instanceId: string ; serviceName: string ; serviceTarget: string ; serviceVersion: string } ; tenantId?: string ; timestamp: number ; traceId?: string }> ; channel: Channel }>

Defined in

amqpbridge/src/AmqpBridge.impl.ts:77


subscriptions

Protected subscriptions: Map<string, { cb: (message: { contentEncoding: string ; contentType: string ; correlationId?: string ; eventName: string ; id: string ; messageType: CustomMessage ; otp?: string ; payload?: unknown ; principalId?: string ; receiver?: EBMessageAddress ; sender: { instanceId: string ; serviceName: string ; serviceTarget: string ; serviceVersion: string } ; tenantId?: string ; timestamp: number ; traceId?: string }) => Promise<undefined | Omit<{ contentEncoding: string ; contentType: string ; correlationId?: string ; eventName: string ; id: string ; messageType: CustomMessage ; otp?: string ; payload?: unknown ; principalId?: string ; receiver?: EBMessageAddress ; sender: { instanceId: string ; serviceName: string ; serviceTarget: string ; serviceVersion: string } ; tenantId?: string ; timestamp: number ; traceId?: string }, "id" | "timestamp">> ; channel: Channel }>

Defined in

amqpbridge/src/AmqpBridge.impl.ts:89


traceProvider

traceProvider: NodeTracerProvider

Inherited from

EventBridgeBaseClass.traceProvider

Defined in

core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:13

Methods

decodeContent

decodeContent<T>(input, contentType, contentEncoding): Promise<T>

Decode buffer into given type

Type parameters

Name
T

Parameters

NameTypeDescription
inputBufferthe input buffer
contentTypestringthe content type of buffer content
contentEncodingstringthe encoding type of buffer content

Returns

Promise<T>

Defined in

amqpbridge/src/AmqpBridge.impl.ts:821


destroy

destroy(): Promise<void>

Shut down event bridge as gracefully as possible

Returns

Promise<void>

Implementation of

EventBridge.destroy

Overrides

EventBridgeBaseClass.destroy

Defined in

amqpbridge/src/AmqpBridge.impl.ts:836


emit

emit<K>(eventName, parameter?): void

Type parameters

NameType
Kextends EventKey<{ eventbridge-connected: never ; eventbridge-connection-error: unknown ; eventbridge-disconnected: never ; eventbridge-error: unknown ; eventbridge-reconnecting: never }>

Parameters

NameType
eventNameK
parameter?{ eventbridge-connected: never ; eventbridge-connection-error: unknown ; eventbridge-disconnected: never ; eventbridge-error: unknown ; eventbridge-reconnecting: never }[K]

Returns

void

Inherited from

EventBridgeBaseClass.emit

Defined in

core/dist/commonjs/core/types/GenericEventEmitter.d.ts:13


emitMessage

emitMessage<T>(message, contentType?, contentEncoding?): Promise<Readonly<EBMessage>>

Emit a message to the eventbridge without awaiting a result

Type parameters

NameType
Textends EBMessage

Parameters

NameTypeDefault valueDescription
messageOmit<EBMessage, "id" | "timestamp" | "correlationId">undefinedthe message
contentTypestring'application/json'-
contentEncodingstring'utf-8'-

Returns

Promise<Readonly<EBMessage>>

Implementation of

EventBridge.emitMessage

Defined in

amqpbridge/src/AmqpBridge.impl.ts:279


encodeContent

encodeContent<T>(input, contentType, contentEncoding): Promise<Buffer>

Encode given payload to buffer

Type parameters

Name
T

Parameters

NameType
inputT
contentTypestring
contentEncodingstring

Returns

Promise<Buffer>

Defined in

amqpbridge/src/AmqpBridge.impl.ts:800


getTracer

getTracer(): Tracer

Returns open telemetry tracer of this service

Returns

Tracer

Tracer

Inherited from

EventBridgeBaseClass.getTracer

Defined in

core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:24


invoke

invoke<T>(input, commandTimeout?): Promise<T>

Call a command of a service and return the result of this command

Type parameters

Name
T

Parameters

NameTypeDescription
inputOmit<{ contentEncoding: string ; contentType: string ; correlationId: string ; eventName?: string ; id: string ; messageType: Command ; otp?: string ; payload: { parameter: unknown ; payload: unknown } ; principalId?: string ; receiver: EBMessageAddress ; sender: { instanceId: string ; serviceName: string ; serviceTarget: string ; serviceVersion: string } ; tenantId?: string ; timestamp: number ; traceId?: string }, "id" | "timestamp" | "correlationId" | "messageType">a partial command message
commandTimeoutnumberthe time to live (timeout) of the invocation

Returns

Promise<T>

Implementation of

EventBridge.invoke

Defined in

amqpbridge/src/AmqpBridge.impl.ts:354


isHealthy

isHealthy(): Promise<boolean>

Indicates if the eventbridge is running and works correctly

Returns

Promise<boolean>

Implementation of

EventBridge.isHealthy

Defined in

amqpbridge/src/AmqpBridge.impl.ts:128


isReady

isReady(): Promise<boolean>

Indicates if the eventbridge has been started and is connected to underlaying message broker

Returns

Promise<boolean>

Implementation of

EventBridge.isReady

Defined in

amqpbridge/src/AmqpBridge.impl.ts:124


off

off<K>(eventName, fn): void

Type parameters

NameType
Kextends EventKey<{ eventbridge-connected: never ; eventbridge-connection-error: unknown ; eventbridge-disconnected: never ; eventbridge-error: unknown ; eventbridge-reconnecting: never }>

Parameters

NameType
eventNameK
fnEventReceiver<{ eventbridge-connected: never ; eventbridge-connection-error: unknown ; eventbridge-disconnected: never ; eventbridge-error: unknown ; eventbridge-reconnecting: never }[K]>

Returns

void

Inherited from

EventBridgeBaseClass.off

Defined in

core/dist/commonjs/core/types/GenericEventEmitter.d.ts:12


on

on<K>(eventName, fn): void

Type parameters

NameType
Kextends EventKey<{ eventbridge-connected: never ; eventbridge-connection-error: unknown ; eventbridge-disconnected: never ; eventbridge-error: unknown ; eventbridge-reconnecting: never }>

Parameters

NameType
eventNameK
fnEventReceiver<{ eventbridge-connected: never ; eventbridge-connection-error: unknown ; eventbridge-disconnected: never ; eventbridge-error: unknown ; eventbridge-reconnecting: never }[K]>

Returns

void

Inherited from

EventBridgeBaseClass.on

Defined in

core/dist/commonjs/core/types/GenericEventEmitter.d.ts:11


registerCommand

registerCommand(address, cb, metadata, eventBridgeConfig): Promise<string>

Register a service function and ensure that there is a queue for all incoming command requests.

Parameters

NameTypeDescription
addressEBMessageAddressThe service function address
cb(message: { contentEncoding: string ; contentType: string ; correlationId: string ; eventName?: string ; id: string ; messageType: Command ; otp?: string ; payload: { parameter: unknown ; payload: unknown } ; principalId?: string ; receiver: EBMessageAddress ; sender: { instanceId: string ; serviceName: string ; serviceTarget: string ; serviceVersion: string } ; tenantId?: string ; timestamp: number ; traceId?: string }) => Promise<{ contentEncoding: "utf-8" ; contentType: "application/json" ; correlationId: string ; eventName?: string ; id: string ; isHandledError: boolean ; messageType: CommandErrorResponse ; otp?: string ; payload: { data?: unknown ; message: string ; status: StatusCode } ; principalId?: string ; receiver: { instanceId: string ; serviceName: string ; serviceTarget: string ; serviceVersion: string } ; sender: { instanceId: string ; serviceName: string ; serviceTarget: string ; serviceVersion: string } ; tenantId?: string ; timestamp: number ; traceId?: string } | { contentEncoding: string ; contentType: string ; correlationId: string ; eventName?: string ; id: string ; messageType: CommandSuccessResponse ; otp?: string ; payload: unknown ; principalId?: string ; receiver: { instanceId: string ; serviceName: string ; serviceTarget: string ; serviceVersion: string } ; sender: { instanceId: string ; serviceName: string ; serviceTarget: string ; serviceVersion: string } ; tenantId?: string ; timestamp: number ; traceId?: string }>the function to call if a matching command message arrives
metadataCommandDefinitionMetadataBase-
eventBridgeConfigDefinitionEventBridgeConfig-

Returns

Promise<string>

the id of command function queue

Implementation of

EventBridge.registerCommand

Defined in

amqpbridge/src/AmqpBridge.impl.ts:472


registerSubscription

registerSubscription(subscription, cb): Promise<string>

Register a new subscription

Parameters

NameTypeDescription
subscriptionSubscriptionthe subscription definition
cb(message: EBMessage) => Promise<undefined | Omit<{ contentEncoding: string ; contentType: string ; correlationId?: string ; eventName: string ; id: string ; messageType: CustomMessage ; otp?: string ; payload?: unknown ; principalId?: string ; receiver?: EBMessageAddress ; sender: { instanceId: string ; serviceName: string ; serviceTarget: string ; serviceVersion: string } ; tenantId?: string ; timestamp: number ; traceId?: string }, "id" | "timestamp">>the function to be called if a matching message arrives

Returns

Promise<string>

Implementation of

EventBridge.registerSubscription

Defined in

amqpbridge/src/AmqpBridge.impl.ts:654


removeAllListeners

removeAllListeners(): void

Returns

void

Inherited from

EventBridgeBaseClass.removeAllListeners

Defined in

core/dist/commonjs/core/types/GenericEventEmitter.d.ts:14


start

start(): Promise<void>

Connect to RabbitMQ broker, ensure exchange, call back queue

Returns

Promise<void>

Implementation of

EventBridge.start

Overrides

EventBridgeBaseClass.start

Defined in

amqpbridge/src/AmqpBridge.impl.ts:135


startActiveSpan

startActiveSpan<F>(name, opts, context, fn): Promise<F>

Start a child span for opentelemetry tracking

Type parameters

Name
F

Parameters

NameTypeDescription
namestringname of span
optsSpanOptionsspan options
contextundefined | Contextoptional context
fn(span: Span) => Promise<F>function to be executed within the span

Returns

Promise<F>

return value of fn

Inherited from

EventBridgeBaseClass.startActiveSpan

Defined in

core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:33


unregisterCommand

unregisterCommand(address): Promise<void>

Unregister a service command

Parameters

NameTypeDescription
addressEBMessageAddressThe address (service name, version and command name) of the command to be de-registered

Returns

Promise<void>

Implementation of

EventBridge.unregisterCommand

Defined in

amqpbridge/src/AmqpBridge.impl.ts:635


unregisterSubscription

unregisterSubscription(address): Promise<void>

Parameters

NameType
addressEBMessageAddress

Returns

Promise<void>

Implementation of

EventBridge.unregisterSubscription

Defined in

amqpbridge/src/AmqpBridge.impl.ts:774


wrapInSpan

wrapInSpan<F>(name, opts, fn, context?): Promise<F>

Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!

This means during logging and similar the spanId of parent span is logged.

Use wrapInSpan for marking points in flow of one bigger function, but not to trace the program flow itself

Type parameters

Name
F

Parameters

NameTypeDescription
namestringname of span
optsSpanOptionsspan options
fn(span: Span) => Promise<F>function te be executed in the span
context?Contextspan context

Returns

Promise<F>

return value of fn

Inherited from

EventBridgeBaseClass.wrapInSpan

Defined in

core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:49