Skip to content

@purista/amqpbridge v2.0.5


PURISTA API / @purista/amqpbridge / AmqpBridge

Class: AmqpBridge

Defined in: amqpbridge/src/AmqpBridge.impl.ts:67

The AMQP event bridge connects to a AMQP broker.

Example

typescript
import { AmqpBridge } from '@purista/amqpbridge'

// create and init our eventbridge
const config = {
   url: 'amqp://localhost'
}

const eventBridge = new AmqpBridge(config)
await eventBridge.start()

Extends

Implements

Constructors

new AmqpBridge()

new AmqpBridge(config?): AmqpBridge

Defined in: amqpbridge/src/AmqpBridge.impl.ts:105

Parameters

config?
defaultCommandTimeout?

number

Overwrite the hardcoded default timeout of command invocations

encoder?

Encoder

the encoder(s) to be used for AMQP messages

Default

ts
jsonEncoder
encrypter?

Encrypter

the encrypter(s) to be used for AMQP messages

Default

ts
plain
exchangeName?

string

the AMQP exchage name to be used

Default

ts
purista
exchangeOptions?

AssertExchange

the AMQP exchange options

instanceId?

string

The instance id of the event bridge. If not set, a id will generated each time a instance is created. Use this if there is a need to always have the same instance id.

logger?

Logger

A logger instance

logLevel?

LogLevelName

If no logger instance is given, use this log level

namePrefix?

string

the queue prefix to be used for all PURISTA queues except short living queues created by the broker on request

Default

ts
purista
socketOptions?

any

socket options

spanProcessor?

SpanProcessor

A OpenTelemetry span processor

url?

string | Connect

the AMQP broker url

Default

ts
amqp://localhost

Returns

AmqpBridge

Overrides

EventBridgeBaseClass.constructor

Properties

channel?

protected optional channel: Channel

Defined in: amqpbridge/src/AmqpBridge.impl.ts:69


config

config: Complete<{ defaultCommandTimeout: number; encoder: Encoder; encrypter: Encrypter; exchangeName: string; exchangeOptions: AssertExchange; instanceId: string; logger: Logger; logLevel: LogLevelName; namePrefix: string; socketOptions: any; spanProcessor: SpanProcessor; url: string | Connect; }>

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:14

Inherited from

EventBridgeBaseClass.config


connection?

protected optional connection: Connection

Defined in: amqpbridge/src/AmqpBridge.impl.ts:68


consumerTags

protected consumerTags: string[] = []

Defined in: amqpbridge/src/AmqpBridge.impl.ts:74


defaultCommandTimeout

defaultCommandTimeout: number

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:17

The default time until when a command invocation automatically returns a time out error

Implementation of

EventBridge.defaultCommandTimeout

Inherited from

EventBridgeBaseClass.defaultCommandTimeout


encoder

protected encoder: Encoder

Defined in: amqpbridge/src/AmqpBridge.impl.ts:97


encrypter

protected encrypter: Encrypter

Defined in: amqpbridge/src/AmqpBridge.impl.ts:101


healthy

protected healthy: boolean = false

Defined in: amqpbridge/src/AmqpBridge.impl.ts:71


instanceId

instanceId: string

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:16

Implementation of

EventBridge.instanceId

Inherited from

EventBridgeBaseClass.instanceId


logger

logger: Logger

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:12

Inherited from

EventBridgeBaseClass.logger


name

name: string

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:15

Implementation of

EventBridge.name

Inherited from

EventBridgeBaseClass.name


pendingInvocations

protected pendingInvocations: Map<string, PendigInvocation>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:85


ready

protected ready: boolean = false

Defined in: amqpbridge/src/AmqpBridge.impl.ts:72


replyQueueName?

protected optional replyQueueName: string

Defined in: amqpbridge/src/AmqpBridge.impl.ts:76


runningSubscriptionCount

protected runningSubscriptionCount: number = 0

Defined in: amqpbridge/src/AmqpBridge.impl.ts:87


serviceFunctions

protected serviceFunctions: Map<string, { cb: (message) => Promise<{ contentEncoding: "utf-8"; contentType: "application/json"; correlationId: string; eventName: string; id: string; isHandledError: boolean; messageType: CommandErrorResponse; otp: string; payload: { data: unknown; message: string; status: StatusCode; }; principalId: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId: string; timestamp: number; traceId: string; } | { contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: CommandSuccessResponse; otp: string; payload: unknown; principalId: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId: string; timestamp: number; traceId: string; }>; channel: Channel; }>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:77


subscriptions

protected subscriptions: Map<string, { cb: (message) => Promise<undefined | Omit<{ contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: CustomMessage; otp: string; payload: unknown; principalId: string; receiver: EBMessageAddress; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId: string; timestamp: number; traceId: string; }, "id" | "timestamp">>; channel: Channel; }>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:89


traceProvider

traceProvider: NodeTracerProvider

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:13

Inherited from

EventBridgeBaseClass.traceProvider

Methods

decodeContent()

protected decodeContent<T>(input, contentType, contentEncoding): Promise<T>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:821

Decode buffer into given type

Type Parameters

T

Parameters

input

Buffer

the input buffer

contentType

string

the content type of buffer content

contentEncoding

string

the encoding type of buffer content

Returns

Promise<T>


destroy()

destroy(): Promise<void>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:836

Shut down event bridge as gracefully as possible

Returns

Promise<void>

Implementation of

EventBridge.destroy

Overrides

EventBridgeBaseClass.destroy


emit()

emit<K>(eventName, parameter?): void

Defined in: core/dist/commonjs/core/types/GenericEventEmitter.d.ts:13

Type Parameters

K extends EventKey<{ [key: custom-${string}]: unknown; [key: adapter-${string}]: unknown; eventbridge-connected: never; eventbridge-connection-error: unknown; eventbridge-disconnected: never; eventbridge-error: unknown; eventbridge-reconnecting: never; }>

Parameters

eventName

K

parameter?

object[K]

Returns

void

Inherited from

EventBridgeBaseClass.emit


emitMessage()

emitMessage<T>(message, contentType, contentEncoding): Promise<Readonly<EBMessage>>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:279

Emit a message to the eventbridge without awaiting a result

Type Parameters

T extends EBMessage

Parameters

message

Omit<EBMessage, "id" | "timestamp" | "correlationId">

the message

contentType

string = 'application/json'

contentEncoding

string = 'utf-8'

Returns

Promise<Readonly<EBMessage>>

Implementation of

EventBridge.emitMessage


encodeContent()

protected encodeContent<T>(input, contentType, contentEncoding): Promise<Buffer<ArrayBufferLike>>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:800

Encode given payload to buffer

Type Parameters

T

Parameters

input

T

contentType

string

contentEncoding

string

Returns

Promise<Buffer<ArrayBufferLike>>


getTracer()

getTracer(): Tracer

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:24

Returns open telemetry tracer of this service

Returns

Tracer

Tracer

Inherited from

EventBridgeBaseClass.getTracer


invoke()

invoke<T>(input, commandTimeout): Promise<T>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:354

Call a command of a service and return the result of this command

Type Parameters

T

Parameters

input

Omit<{ contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: Command; otp: string; payload: { parameter: unknown; payload: unknown; }; principalId: string; receiver: EBMessageAddress; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId: string; timestamp: number; traceId: string; }, "id" | "timestamp" | "correlationId" | "messageType">

a partial command message

commandTimeout

number = ...

the time to live (timeout) of the invocation

Returns

Promise<T>

Implementation of

EventBridge.invoke


isHealthy()

isHealthy(): Promise<boolean>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:128

Indicates if the eventbridge is running and works correctly

Returns

Promise<boolean>

Implementation of

EventBridge.isHealthy


isReady()

isReady(): Promise<boolean>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:124

Indicates if the eventbridge has been started and is connected to underlaying message broker

Returns

Promise<boolean>

Implementation of

EventBridge.isReady


off()

off<K>(eventName, fn): void

Defined in: core/dist/commonjs/core/types/GenericEventEmitter.d.ts:12

Type Parameters

K extends EventKey<{ [key: custom-${string}]: unknown; [key: adapter-${string}]: unknown; eventbridge-connected: never; eventbridge-connection-error: unknown; eventbridge-disconnected: never; eventbridge-error: unknown; eventbridge-reconnecting: never; }>

Parameters

eventName

K

fn

EventReceiver<object[K]>

Returns

void

Inherited from

EventBridgeBaseClass.off


on()

on<K>(eventName, fn): void

Defined in: core/dist/commonjs/core/types/GenericEventEmitter.d.ts:11

Type Parameters

K extends EventKey<{ [key: custom-${string}]: unknown; [key: adapter-${string}]: unknown; eventbridge-connected: never; eventbridge-connection-error: unknown; eventbridge-disconnected: never; eventbridge-error: unknown; eventbridge-reconnecting: never; }>

Parameters

eventName

K

fn

EventReceiver<object[K]>

Returns

void

Inherited from

EventBridgeBaseClass.on


registerCommand()

registerCommand(address, cb, metadata, eventBridgeConfig): Promise<string>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:472

Register a service function and ensure that there is a queue for all incoming command requests.

Parameters

address

EBMessageAddress

The service function address

cb

(message) => Promise<{ contentEncoding: "utf-8"; contentType: "application/json"; correlationId: string; eventName: string; id: string; isHandledError: boolean; messageType: CommandErrorResponse; otp: string; payload: { data: unknown; message: string; status: StatusCode; }; principalId: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId: string; timestamp: number; traceId: string; } | { contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: CommandSuccessResponse; otp: string; payload: unknown; principalId: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId: string; timestamp: number; traceId: string; }>

the function to call if a matching command message arrives

metadata

CommandDefinitionMetadataBase

eventBridgeConfig

DefinitionEventBridgeConfig

Returns

Promise<string>

the id of command function queue

Implementation of

EventBridge.registerCommand


registerSubscription()

registerSubscription(subscription, cb): Promise<string>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:654

Register a new subscription

Parameters

subscription

Subscription

the subscription definition

cb

(message) => Promise<undefined | Omit<{ contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: CustomMessage; otp: string; payload: unknown; principalId: string; receiver: EBMessageAddress; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId: string; timestamp: number; traceId: string; }, "id" | "timestamp">>

the function to be called if a matching message arrives

Returns

Promise<string>

Implementation of

EventBridge.registerSubscription


removeAllListeners()

removeAllListeners(): void

Defined in: core/dist/commonjs/core/types/GenericEventEmitter.d.ts:14

Returns

void

Inherited from

EventBridgeBaseClass.removeAllListeners


start()

start(): Promise<void>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:135

Connect to RabbitMQ broker, ensure exchange, call back queue

Returns

Promise<void>

Implementation of

EventBridge.start

Overrides

EventBridgeBaseClass.start


startActiveSpan()

startActiveSpan<F>(name, opts, context, fn): Promise<F>

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:33

Start a child span for opentelemetry tracking

Type Parameters

F

Parameters

name

string

name of span

opts

SpanOptions

span options

context

optional context

undefined | Context

fn

(span) => Promise<F>

function to be executed within the span

Returns

Promise<F>

return value of fn

Inherited from

EventBridgeBaseClass.startActiveSpan


unregisterCommand()

unregisterCommand(address): Promise<void>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:635

Unregister a service command

Parameters

address

EBMessageAddress

The address (service name, version and command name) of the command to be de-registered

Returns

Promise<void>

Implementation of

EventBridge.unregisterCommand


unregisterSubscription()

unregisterSubscription(address): Promise<void>

Defined in: amqpbridge/src/AmqpBridge.impl.ts:774

Parameters

address

EBMessageAddress

Returns

Promise<void>

Implementation of

EventBridge.unregisterSubscription


wrapInSpan()

wrapInSpan<F>(name, opts, fn, context?): Promise<F>

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:49

Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!

This means during logging and similar the spanId of parent span is logged.

Use wrapInSpan for marking points in flow of one bigger function, but not to trace the program flow itself

Type Parameters

F

Parameters

name

string

name of span

opts

SpanOptions

span options

fn

(span) => Promise<F>

function te be executed in the span

context?

Context

span context

Returns

Promise<F>

return value of fn

Inherited from

EventBridgeBaseClass.wrapInSpan