Skip to main content

Class: AmqpBridge


PURISTA API / Modules / @purista/amqpbridge / AmqpBridge

Class: AmqpBridge

@purista/amqpbridge.AmqpBridge

The AMQP event bridge connects to a AMQP broker.

Example

import { AmqpBridge } from '@purista/amqpbridge'

// create and init our eventbridge
const config = {
   url: 'amqp://localhost'
}

const eventBridge = new AmqpBridge(config)
await eventBridge.start()

Hierarchy

Implements

  • EventBridge

Table of contents

Constructors

Properties

Methods

Constructors

constructor

new AmqpBridge(config?): AmqpBridge

Parameters

NameTypeDescription
config?Object-
config.defaultCommandTimeout?numberOverwrite the hardcoded default timeout of command invocations
config.encoder?Encoderthe encoder(s) to be used for AMQP messages Default ts jsonEncoder
config.encrypter?Encrypterthe encrypter(s) to be used for AMQP messages Default ts plain
config.exchangeName?stringthe AMQP exchage name to be used Default ts purista
config.exchangeOptions?AssertExchangethe AMQP exchange options
config.instanceId?stringThe instance id of the event bridge. If not set, a id will generated each time a instance is created. Use this if there is a need to always have the same instance id.
config.logLevel?LogLevelNameIf no logger instance is given, use this log level
config.logger?LoggerA logger instance
config.namePrefix?stringthe queue prefix to be used for all PURISTA queues except short living queues created by the broker on request Default ts purista
config.socketOptions?anysocket options
config.spanProcessor?SpanProcessorA OpenTelemetry span processor
config.url?string | Connectthe AMQP broker url Default ts amqp://localhost

Returns

AmqpBridge

Overrides

EventBridgeBaseClass<AmqpBridgeConfig>.constructor

Defined in

amqpbridge/src/AmqpBridge.impl.ts:103open in new window

Properties

channel

Protected Optional channel: Channel

Defined in

amqpbridge/src/AmqpBridge.impl.ts:67open in new window


config

config: Complete<{ defaultCommandTimeout?: number ; encoder?: Encoder ; encrypter?: Encrypter ; exchangeName?: string ; exchangeOptions?: AssertExchange ; instanceId?: string ; logLevel?: LogLevelName ; logger?: Logger ; namePrefix?: string ; socketOptions?: any ; spanProcessor?: SpanProcessor ; url?: string | Connect }>

Inherited from

EventBridgeBaseClass.config

Defined in

core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:13


connection

Protected Optional connection: Connection

Defined in

amqpbridge/src/AmqpBridge.impl.ts:66open in new window


consumerTags

Protected consumerTags: string[] = []

Defined in

amqpbridge/src/AmqpBridge.impl.ts:72open in new window


defaultCommandTimeout

defaultCommandTimeout: number

Implementation of

EventBridge.defaultCommandTimeout

Inherited from

EventBridgeBaseClass.defaultCommandTimeout

Defined in

core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:16


encoder

Protected encoder: Encoder

Defined in

amqpbridge/src/AmqpBridge.impl.ts:95open in new window


encrypter

Protected encrypter: Encrypter

Defined in

amqpbridge/src/AmqpBridge.impl.ts:99open in new window


healthy

Protected healthy: boolean = false

Defined in

amqpbridge/src/AmqpBridge.impl.ts:69open in new window


instanceId

instanceId: string

Implementation of

EventBridge.instanceId

Inherited from

EventBridgeBaseClass.instanceId

Defined in

core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:15


logger

logger: Logger

Inherited from

EventBridgeBaseClass.logger

Defined in

core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:11


name

name: string

Implementation of

EventBridge.name

Inherited from

EventBridgeBaseClass.name

Defined in

core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:14


pendingInvocations

Protected pendingInvocations: Map<string, PendigInvocation>

Defined in

amqpbridge/src/AmqpBridge.impl.ts:83open in new window


ready

Protected ready: boolean = false

Defined in

amqpbridge/src/AmqpBridge.impl.ts:70open in new window


replyQueueName

Protected Optional replyQueueName: string

Defined in

amqpbridge/src/AmqpBridge.impl.ts:74open in new window


runningSubscriptionCount

Protected runningSubscriptionCount: number = 0

Defined in

amqpbridge/src/AmqpBridge.impl.ts:85open in new window


serviceFunctions

Protected serviceFunctions: Map<string, { cb: (message: { contentEncoding: string ; contentType: string ; correlationId: string ; eventName?: string ; id: string ; messageType: Command ; otp?: string ; payload: { parameter: unknown ; payload: unknown } ; principalId?: string ; receiver: EBMessageAddress ; sender: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId?: string ; timestamp: number ; traceId?: string }) => Promise<{ contentEncoding: "utf-8" ; contentType: "application/json" ; correlationId: string ; eventName?: string ; id: string ; isHandledError: boolean ; messageType: CommandErrorResponse ; otp?: string ; payload: { data?: unknown ; message: string ; status: StatusCode } ; principalId?: string ; receiver: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; sender: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId?: string ; timestamp: number ; traceId?: string } | { contentEncoding: string ; contentType: string ; correlationId: string ; eventName?: string ; id: string ; messageType: CommandSuccessResponse ; otp?: string ; payload: unknown ; principalId?: string ; receiver: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; sender: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId?: string ; timestamp: number ; traceId?: string }> ; channel: Channel }>

Defined in

amqpbridge/src/AmqpBridge.impl.ts:75open in new window


subscriptions

Protected subscriptions: Map<string, { cb: (message: { contentEncoding: string ; contentType: string ; correlationId?: string ; eventName: string ; id: string ; messageType: CustomMessage ; otp?: string ; payload?: unknown ; principalId?: string ; receiver?: EBMessageAddress ; sender: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId?: string ; timestamp: number ; traceId?: string }) => Promise<undefined | Omit<{ contentEncoding: string ; contentType: string ; correlationId?: string ; eventName: string ; id: string ; messageType: CustomMessage ; otp?: string ; payload?: unknown ; principalId?: string ; receiver?: EBMessageAddress ; sender: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId?: string ; timestamp: number ; traceId?: string }, "id" | "timestamp">> ; channel: Channel }>

Defined in

amqpbridge/src/AmqpBridge.impl.ts:87open in new window


traceProvider

traceProvider: NodeTracerProvider

Inherited from

EventBridgeBaseClass.traceProvider

Defined in

core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:12

Methods

decodeContent

decodeContent<T>(input, contentType, contentEncoding): Promise<T>

Decode buffer into given type

Type parameters

Name
T

Parameters

NameTypeDescription
inputBufferthe input buffer
contentTypestringthe content type of buffer content
contentEncodingstringthe encoding type of buffer content

Returns

Promise<T>

Defined in

amqpbridge/src/AmqpBridge.impl.ts:819open in new window


destroy

destroy(): Promise<void>

Returns

Promise<void>

Implementation of

EventBridge.destroy

Overrides

EventBridgeBaseClass.destroy

Defined in

amqpbridge/src/AmqpBridge.impl.ts:834open in new window


emit

emit<K>(eventName, parameter?): void

Type parameters

NameType
Kextends EventKey<{ eventbridge-connected: never ; eventbridge-connection-error: unknown ; eventbridge-disconnected: never ; eventbridge-error: unknown ; eventbridge-reconnecting: never }>

Parameters

NameType
eventNameK
parameter?{ eventbridge-connected: never ; eventbridge-connection-error: unknown ; eventbridge-disconnected: never ; eventbridge-error: unknown ; eventbridge-reconnecting: never }[K]

Returns

void

Inherited from

EventBridgeBaseClass.emit

Defined in

core/lib/types/core/types/GenericEventEmitter.d.ts:13


emitMessage

emitMessage<T>(message, contentType?, contentEncoding?): Promise<Readonly<EBMessage>>

Type parameters

NameType
Textends EBMessage

Parameters

NameTypeDefault value
messageOmit<EBMessage, "id" | "timestamp" | "correlationId">undefined
contentTypestring'application/json'
contentEncodingstring'utf-8'

Returns

Promise<Readonly<EBMessage>>

Implementation of

EventBridge.emitMessage

Defined in

amqpbridge/src/AmqpBridge.impl.ts:277open in new window


encodeContent

encodeContent<T>(input, contentType, contentEncoding): Promise<Buffer>

Encode given payload to buffer

Type parameters

Name
T

Parameters

NameType
inputT
contentTypestring
contentEncodingstring

Returns

Promise<Buffer>

Defined in

amqpbridge/src/AmqpBridge.impl.ts:798open in new window


getTracer

getTracer(): Tracer

Returns open telemetry tracer of this service

Returns

Tracer

Tracer

Inherited from

EventBridgeBaseClass.getTracer

Defined in

core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:23


invoke

invoke<T>(input, commandTimeout?): Promise<T>

Type parameters

Name
T

Parameters

NameType
inputOmit<{ contentEncoding: string ; contentType: string ; correlationId: string ; eventName?: string ; id: string ; messageType: Command ; otp?: string ; payload: { parameter: unknown ; payload: unknown } ; principalId?: string ; receiver: EBMessageAddress ; sender: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId?: string ; timestamp: number ; traceId?: string }, "id" | "timestamp" | "correlationId" | "messageType">
commandTimeoutnumber

Returns

Promise<T>

Implementation of

EventBridge.invoke

Defined in

amqpbridge/src/AmqpBridge.impl.ts:352open in new window


isHealthy

isHealthy(): Promise<boolean>

Returns

Promise<boolean>

Implementation of

EventBridge.isHealthy

Defined in

amqpbridge/src/AmqpBridge.impl.ts:126open in new window


isReady

isReady(): Promise<boolean>

Returns

Promise<boolean>

Implementation of

EventBridge.isReady

Defined in

amqpbridge/src/AmqpBridge.impl.ts:122open in new window


off

off<K>(eventName, fn): void

Type parameters

NameType
Kextends EventKey<{ eventbridge-connected: never ; eventbridge-connection-error: unknown ; eventbridge-disconnected: never ; eventbridge-error: unknown ; eventbridge-reconnecting: never }>

Parameters

NameType
eventNameK
fnEventReceiver<{ eventbridge-connected: never ; eventbridge-connection-error: unknown ; eventbridge-disconnected: never ; eventbridge-error: unknown ; eventbridge-reconnecting: never }[K]>

Returns

void

Inherited from

EventBridgeBaseClass.off

Defined in

core/lib/types/core/types/GenericEventEmitter.d.ts:12


on

on<K>(eventName, fn): void

Type parameters

NameType
Kextends EventKey<{ eventbridge-connected: never ; eventbridge-connection-error: unknown ; eventbridge-disconnected: never ; eventbridge-error: unknown ; eventbridge-reconnecting: never }>

Parameters

NameType
eventNameK
fnEventReceiver<{ eventbridge-connected: never ; eventbridge-connection-error: unknown ; eventbridge-disconnected: never ; eventbridge-error: unknown ; eventbridge-reconnecting: never }[K]>

Returns

void

Inherited from

EventBridgeBaseClass.on

Defined in

core/lib/types/core/types/GenericEventEmitter.d.ts:11


registerCommand

registerCommand(address, cb, metadata, eventBridgeConfig): Promise<string>

Register a service function and ensure that there is a queue for all incoming command requests.

Parameters

NameTypeDescription
addressEBMessageAddressThe service function address
cb(message: { contentEncoding: string ; contentType: string ; correlationId: string ; eventName?: string ; id: string ; messageType: Command ; otp?: string ; payload: { parameter: unknown ; payload: unknown } ; principalId?: string ; receiver: EBMessageAddress ; sender: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId?: string ; timestamp: number ; traceId?: string }) => Promise<{ contentEncoding: "utf-8" ; contentType: "application/json" ; correlationId: string ; eventName?: string ; id: string ; isHandledError: boolean ; messageType: CommandErrorResponse ; otp?: string ; payload: { data?: unknown ; message: string ; status: StatusCode } ; principalId?: string ; receiver: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; sender: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId?: string ; timestamp: number ; traceId?: string } | { contentEncoding: string ; contentType: string ; correlationId: string ; eventName?: string ; id: string ; messageType: CommandSuccessResponse ; otp?: string ; payload: unknown ; principalId?: string ; receiver: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; sender: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId?: string ; timestamp: number ; traceId?: string }>the function to call if a matching command message arrives
metadataCommandDefinitionMetadataBase-
eventBridgeConfigDefinitionEventBridgeConfig-

Returns

Promise<string>

the id of command function queue

Implementation of

EventBridge.registerCommand

Defined in

amqpbridge/src/AmqpBridge.impl.ts:470open in new window


registerSubscription

registerSubscription(subscription, cb): Promise<string>

Parameters

NameType
subscriptionSubscription
cb(message: EBMessage) => Promise<undefined | Omit<{ contentEncoding: string ; contentType: string ; correlationId?: string ; eventName: string ; id: string ; messageType: CustomMessage ; otp?: string ; payload?: unknown ; principalId?: string ; receiver?: EBMessageAddress ; sender: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId?: string ; timestamp: number ; traceId?: string }, "id" | "timestamp">>

Returns

Promise<string>

Implementation of

EventBridge.registerSubscription

Defined in

amqpbridge/src/AmqpBridge.impl.ts:652open in new window


removeAllListeners

removeAllListeners(): void

Returns

void

Inherited from

EventBridgeBaseClass.removeAllListeners

Defined in

core/lib/types/core/types/GenericEventEmitter.d.ts:14


start

start(): Promise<void>

Connect to RabbitMQ broker, ensure exchange, call back queue

Returns

Promise<void>

Implementation of

EventBridge.start

Overrides

EventBridgeBaseClass.start

Defined in

amqpbridge/src/AmqpBridge.impl.ts:133open in new window


startActiveSpan

startActiveSpan<F>(name, opts, context, fn): Promise<F>

Start a child span for opentelemetry tracking

Type parameters

Name
F

Parameters

NameTypeDescription
namestringname of span
optsSpanOptionsspan options
contextundefined | Contextoptional context
fn(span: Span) => Promise<F>function to be executed within the span

Returns

Promise<F>

return value of fn

Inherited from

EventBridgeBaseClass.startActiveSpan

Defined in

core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:32


unregisterCommand

unregisterCommand(address): Promise<void>

Parameters

NameType
addressEBMessageAddress

Returns

Promise<void>

Implementation of

EventBridge.unregisterCommand

Defined in

amqpbridge/src/AmqpBridge.impl.ts:633open in new window


unregisterSubscription

unregisterSubscription(address): Promise<void>

Parameters

NameType
addressEBMessageAddress

Returns

Promise<void>

Implementation of

EventBridge.unregisterSubscription

Defined in

amqpbridge/src/AmqpBridge.impl.ts:772open in new window


wrapInSpan

wrapInSpan<F>(name, opts, fn, context?): Promise<F>

Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!

This means during logging and similar the spanId of parent span is logged.

Use wrapInSpan for marking points in flow of one bigger function, but not to trace the program flow itself

Type parameters

Name
F

Parameters

NameTypeDescription
namestringname of span
optsSpanOptionsspan options
fn(span: Span) => Promise<F>function te be executed in the span
context?Contextspan context

Returns

Promise<F>

return value of fn

Inherited from

EventBridgeBaseClass.wrapInSpan

Defined in

core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:48

Last update:
Contributors: Sebastian Wessel