Skip to content

PURISTA API


PURISTA API / @purista/amqpbridge / AmqpBridge

Class: AmqpBridge

Defined in: amqpbridge/src/AmqpBridge.impl.ts:71

The AMQP event bridge connects to a AMQP broker.

Example

typescript
import { AmqpBridge } from '@purista/amqpbridge'

// create and init our eventbridge
const config = {
   url: 'amqp://localhost'
}

const eventBridge = new AmqpBridge(config)
await eventBridge.start()

Extends

Implements

Constructors

Constructor

new AmqpBridge(config?): AmqpBridge

Defined in: amqpbridge/src/AmqpBridge.impl.ts:117

Parameters

config?
defaultCommandTimeout?

number

Overwrite the hardcoded default timeout of command invocations

encoder?

Encoder

the encoder(s) to be used for AMQP messages

Default

ts
jsonEncoder
encrypter?

Encrypter

the encrypter(s) to be used for AMQP messages

Default

ts
plain
exchangeName?

string

the AMQP exchage name to be used

Default

ts
purista
exchangeOptions?

AssertExchange

the AMQP exchange options

instanceId?

string

The instance id of the event bridge. If not set, a id will generated each time a instance is created. Use this if there is a need to always have the same instance id.

logger?

Logger

A logger instance

logLevel?

LogLevelName

If no logger instance is given, use this log level

namePrefix?

string

the queue prefix to be used for all PURISTA queues except short living queues created by the broker on request

Default

ts
purista
socketOptions?

unknown

socket options

spanProcessor?

SpanProcessor

A OpenTelemetry span processor

url?

string | Connect

the AMQP broker url

Default

ts
amqp://localhost

Returns

AmqpBridge

Overrides

EventBridgeBaseClass.constructor

Properties

channel?

protected optional channel: Channel

Defined in: amqpbridge/src/AmqpBridge.impl.ts:73


config

config: Complete<EventBridgeConfig<ConfigType>>

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:17

Inherited from

EventBridgeBaseClass.config


connection?

protected optional connection: ChannelModel

Defined in: amqpbridge/src/AmqpBridge.impl.ts:72


consumerRegistrations

protected consumerRegistrations: object[] = []

Defined in: amqpbridge/src/AmqpBridge.impl.ts:78

channel

channel: Channel

tag

tag: string


defaultCommandTimeout

defaultCommandTimeout: number

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:20

The default time until when a command invocation automatically returns a time out error

Implementation of

EventBridge.defaultCommandTimeout

Inherited from

EventBridgeBaseClass.defaultCommandTimeout


encoder

protected encoder: Encoder

Defined in: amqpbridge/src/AmqpBridge.impl.ts:101


encrypter

protected encrypter: Encrypter

Defined in: amqpbridge/src/AmqpBridge.impl.ts:105


healthy

protected healthy: boolean = false

Defined in: amqpbridge/src/AmqpBridge.impl.ts:75


instanceId

instanceId: string

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:19

Implementation of

EventBridge.instanceId

Inherited from

EventBridgeBaseClass.instanceId


logger

logger: Logger

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:15

Inherited from

AmqpBridge.logger


name

name: string

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:18

Implementation of

EventBridge.name

Inherited from

EventBridgeBaseClass.name


pendingInvocations

protected pendingInvocations: Map<string, PendigInvocation>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:89


ready

protected ready: boolean = false

Defined in: amqpbridge/src/AmqpBridge.impl.ts:76


replyQueueName?

protected optional replyQueueName: string

Defined in: amqpbridge/src/AmqpBridge.impl.ts:80


runningSubscriptionCount

protected runningSubscriptionCount: number = 0

Defined in: amqpbridge/src/AmqpBridge.impl.ts:91


serviceFunctions

protected serviceFunctions: Map<string, { cb: (message) => Promise<{ contentEncoding: "utf-8"; contentType: "application/json"; correlationId: string; eventName?: string; id: string; isHandledError: boolean; messageType: CommandErrorResponse; otp?: string; payload: { data?: unknown; message: string; status: StatusCode; }; principalId?: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; } | { contentEncoding: string; contentType: string; correlationId: string; eventName?: string; id: string; messageType: CommandSuccessResponse; otp?: string; payload: unknown; principalId?: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; }>; channel: Channel; }>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:81


subscriptions

protected subscriptions: Map<string, { cb: (message) => Promise<Omit<{ contentEncoding: string; contentType: string; correlationId?: string; eventName: string; id: string; messageType: CustomMessage; otp?: string; payload?: unknown; principalId?: string; receiver?: EBMessageAddress; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; }, "id" | "timestamp"> | undefined>; channel: Channel; }>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:93


traceProvider

traceProvider: NodeTracerProvider

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:16

Inherited from

EventBridgeBaseClass.traceProvider

Methods

addConsumerRegistration()

protected addConsumerRegistration(channel, tag): void

Defined in: amqpbridge/src/AmqpBridge.impl.ts:109

Parameters

channel

Channel

tag

string

Returns

void


decodeContent()

protected decodeContent<T>(input, contentType, contentEncoding): Promise<T>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:878

Decode buffer into given type

Type Parameters

T

T

Parameters

input

Buffer

the input buffer

contentType

string

the content type of buffer content

contentEncoding

string

the encoding type of buffer content

Returns

Promise<T>


destroy()

destroy(): Promise<void>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:897

Gracefully stops all consumers, waits for in-flight subscription handlers, closes AMQP resources and rejects unresolved pending invocations.

Returns

Promise<void>

Implementation of

EventBridge.destroy

Overrides

EventBridgeBaseClass.destroy


emit()

emit<K>(eventName, parameter?): void

Defined in: core/dist/commonjs/core/types/GenericEventEmitter.d.ts:16

Type Parameters

K

K extends EventKey<{[key: `adapter-${string}`]: unknown; [key: `custom-${string}`]: unknown; eventbridge-connected: never; eventbridge-connection-error: unknown; eventbridge-disconnected: never; eventbridge-error: unknown; eventbridge-reconnecting: never; }>

Parameters

eventName

K

parameter?

object[K]

Returns

void

Inherited from

EventBridgeBaseClass.emit


emitMessage()

emitMessage<T>(message, contentType?, contentEncoding?): Promise<Readonly<EBMessage>>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:301

Emits a message via AMQP headers exchange. The message is encoded and encrypted according to configured codecs.

Type Parameters

T

T extends EBMessage

Parameters

message

Omit<EBMessage, "id" | "timestamp" | "correlationId">

contentType?

string = 'application/json'

contentEncoding?

string = 'utf-8'

Returns

Promise<Readonly<EBMessage>>

Implementation of

EventBridge.emitMessage


encodeContent()

protected encodeContent<T>(input, contentType, contentEncoding): Promise<Buffer<ArrayBufferLike>>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:857

Encode given payload to buffer

Type Parameters

T

T

Parameters

input

T

contentType

string

contentEncoding

string

Returns

Promise<Buffer<ArrayBufferLike>>


getTracer()

getTracer(): Tracer

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:27

Returns open telemetry tracer of this service

Returns

Tracer

Tracer

Inherited from

EventBridgeBaseClass.getTracer


invoke()

invoke<T>(input, commandTimeout?): Promise<T>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:380

Invokes a remote command and waits for a matching command response. The call is rejected with timeout if no response is received in time.

Type Parameters

T

T

Parameters

input

Omit<Command, "id" | "messageType" | "timestamp" | "correlationId">

commandTimeout?

number = ...

Returns

Promise<T>

Implementation of

EventBridge.invoke


isHealthy()

isHealthy(): Promise<boolean>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:146

Indicates if the bridge connection and channels are currently healthy.

Returns

Promise<boolean>

Implementation of

EventBridge.isHealthy


isReady()

isReady(): Promise<boolean>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:139

Indicates if the bridge finished startup and is ready to process traffic.

Returns

Promise<boolean>

Implementation of

EventBridge.isReady


off()

off<K>(eventName, fn): void

Defined in: core/dist/commonjs/core/types/GenericEventEmitter.d.ts:15

Type Parameters

K

K extends EventKey<{[key: `adapter-${string}`]: unknown; [key: `custom-${string}`]: unknown; eventbridge-connected: never; eventbridge-connection-error: unknown; eventbridge-disconnected: never; eventbridge-error: unknown; eventbridge-reconnecting: never; }>

Parameters

eventName

K

fn

EventReceiver<object[K]>

Returns

void

Inherited from

EventBridgeBaseClass.off


on()

on<K>(eventName, fn): void

Defined in: core/dist/commonjs/core/types/GenericEventEmitter.d.ts:14

Type Parameters

K

K extends EventKey<{[key: `adapter-${string}`]: unknown; [key: `custom-${string}`]: unknown; eventbridge-connected: never; eventbridge-connection-error: unknown; eventbridge-disconnected: never; eventbridge-error: unknown; eventbridge-reconnecting: never; }>

Parameters

eventName

K

fn

EventReceiver<object[K]>

Returns

void

Inherited from

EventBridgeBaseClass.on


registerCommand()

registerCommand(address, cb, metadata, eventBridgeConfig): Promise<string>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:513

Register a service function and ensure that there is a queue for all incoming command requests.

Parameters

address

EBMessageAddress

The service function address

cb

(message) => Promise<{ contentEncoding: "utf-8"; contentType: "application/json"; correlationId: string; eventName?: string; id: string; isHandledError: boolean; messageType: CommandErrorResponse; otp?: string; payload: { data?: unknown; message: string; status: StatusCode; }; principalId?: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; } | { contentEncoding: string; contentType: string; correlationId: string; eventName?: string; id: string; messageType: CommandSuccessResponse; otp?: string; payload: unknown; principalId?: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; }>

the function to call if a matching command message arrives

metadata

CommandDefinitionMetadataBase

eventBridgeConfig

DefinitionEventBridgeConfig

Returns

Promise<string>

the id of command function queue

Implementation of

EventBridge.registerCommand


registerSubscription()

registerSubscription(subscription, cb): Promise<string>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:704

Registers a subscription consumer and returns its stable subscription key.

Parameters

subscription

Subscription

cb

(message) => Promise<Omit<{ contentEncoding: string; contentType: string; correlationId?: string; eventName: string; id: string; messageType: CustomMessage; otp?: string; payload?: unknown; principalId?: string; receiver?: EBMessageAddress; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; }, "id" | "timestamp"> | undefined>

Returns

Promise<string>

Implementation of

EventBridge.registerSubscription


removeAllListeners()

removeAllListeners(): void

Defined in: core/dist/commonjs/core/types/GenericEventEmitter.d.ts:17

Returns

void

Inherited from

EventBridgeBaseClass.removeAllListeners


removeConsumerRegistrationsForChannel()

protected removeConsumerRegistrationsForChannel(channel): void

Defined in: amqpbridge/src/AmqpBridge.impl.ts:113

Parameters

channel

Channel

Returns

void


start()

start(): Promise<void>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:153

Connect to RabbitMQ broker, ensure exchange, call back queue

Returns

Promise<void>

Implementation of

EventBridge.start

Overrides

EventBridgeBaseClass.start


startActiveSpan()

startActiveSpan<F>(name, opts, context, fn): Promise<F>

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:36

Start a child span for opentelemetry tracking

Type Parameters

F

F

Parameters

name

string

name of span

opts

SpanOptions

span options

context

optional context

Context | undefined

fn

(span) => Promise<F>

function to be executed within the span

Returns

Promise<F>

return value of fn

Inherited from

EventBridgeBaseClass.startActiveSpan


unregisterCommand()

unregisterCommand(address): Promise<void>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:679

Unregisters a command consumer and closes the dedicated command channel.

Parameters

address

EBMessageAddress

Returns

Promise<void>

Implementation of

EventBridge.unregisterCommand


unregisterSubscription()

unregisterSubscription(address): Promise<void>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:828

Unregisters a subscription consumer and closes its channel.

Parameters

address

EBMessageAddress

Returns

Promise<void>

Implementation of

EventBridge.unregisterSubscription


wrapInSpan()

wrapInSpan<F>(name, opts, fn, context?): Promise<F>

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:52

Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!

This means during logging and similar the spanId of parent span is logged.

Use wrapInSpan for marking points in flow of one bigger function, but not to trace the program flow itself

Type Parameters

F

F

Parameters

name

string

name of span

opts

SpanOptions

span options

fn

(span) => Promise<F>

function te be executed in the span

context?

Context

span context

Returns

Promise<F>

return value of fn

Inherited from

EventBridgeBaseClass.wrapInSpan