PURISTA API / @purista/amqpbridge / AmqpBridge
Class: AmqpBridge
Defined in: amqpbridge/src/AmqpBridge.impl.ts:92
The AMQP event bridge connects to a AMQP broker.
Example
import { AmqpBridge } from '@purista/amqpbridge'
// create and init our eventbridge
const config = {
url: 'amqp://localhost'
}
const eventBridge = new AmqpBridge(config)
await eventBridge.start()Extends
Implements
Constructors
Constructor
new AmqpBridge(
config?):AmqpBridge
Defined in: amqpbridge/src/AmqpBridge.impl.ts:287
Parameters
config?
deadLetterExchangeName?
string
optional dead letter exchange name used for durable command/subscription queues
deadLetterRoutingKey?
string
optional dead letter routing key used for durable command/subscription queues
defaultCommandTimeout?
number
Overwrite the hardcoded default timeout of command invocations
encoder?
the encoder(s) to be used for AMQP messages
Default
jsonEncoderencrypter?
the encrypter(s) to be used for AMQP messages
Default
plainexchangeName?
string
the AMQP exchage name to be used
Default
puristaexchangeOptions?
AssertExchange
the AMQP exchange options
instanceId?
string
The instance id of the event bridge. If not set, a id will generated each time a instance is created. Use this if there is a need to always have the same instance id.
logger?
logLevel?
metrics?
metricsRecorder?
PuristaMetricsRecorderInterface
namePrefix?
string
the queue prefix to be used for all PURISTA queues except short living queues created by the broker on request
Default
puristaprefetch?
number
max unacked messages per consumer channel
socketOptions?
SocketOptions
socket options
spanProcessor?
SpanProcessor
url?
string | Connect
the AMQP broker url
Default
amqp://localhostReturns
AmqpBridge
Overrides
EventBridgeBaseClass.constructor
Properties
capabilities
capabilities:
EventBridgeCapabilities
Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:28
Implementation of
Inherited from
EventBridgeBaseClass.capabilities
channel?
protectedoptionalchannel?:ConfirmChannel
Defined in: amqpbridge/src/AmqpBridge.impl.ts:94
config
config:
Complete<EventBridgeConfig<ConfigType>>
Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:26
Inherited from
connection?
protectedoptionalconnection?:ChannelModel
Defined in: amqpbridge/src/AmqpBridge.impl.ts:93
consumerRegistrations
protectedconsumerRegistrations:object[] =[]
Defined in: amqpbridge/src/AmqpBridge.impl.ts:99
channel
channel:
ConfirmChannel
tag
tag:
string
defaultCommandTimeout
defaultCommandTimeout:
number
Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:30
The default time until when a command invocation automatically returns a time out error
Implementation of
EventBridge.defaultCommandTimeout
Inherited from
EventBridgeBaseClass.defaultCommandTimeout
encoder
protectedencoder:Encoder
Defined in: amqpbridge/src/AmqpBridge.impl.ts:119
encrypter
protectedencrypter:Encrypter
Defined in: amqpbridge/src/AmqpBridge.impl.ts:123
healthy
protectedhealthy:boolean=false
Defined in: amqpbridge/src/AmqpBridge.impl.ts:96
inFlightExecutions
protectedreadonlyinFlightExecutions:InFlightExecutionTracker
Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:31
Inherited from
EventBridgeBaseClass.inFlightExecutions
instanceId
instanceId:
string
Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:29
Implementation of
Inherited from
EventBridgeBaseClass.instanceId
logger
logger:
Logger
Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:23
Inherited from
AmqpBridge.logger
metricsRecorder
protectedmetricsRecorder:PuristaMetricsRecorderInterface
Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:25
Inherited from
EventBridgeBaseClass.metricsRecorder
name
name:
string
Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:27
Implementation of
Inherited from
pausedSubscriptionConsumers
protectedpausedSubscriptionConsumers:Map<string,PausedSubscriptionState>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:117
pendingInvocations
protectedpendingInvocations:PendingInvocationRegistry<unknown>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:110
ready
protectedready:boolean=false
Defined in: amqpbridge/src/AmqpBridge.impl.ts:97
replyQueueName?
protectedoptionalreplyQueueName?:string
Defined in: amqpbridge/src/AmqpBridge.impl.ts:101
serviceFunctions
protectedserviceFunctions:Map<string, {cb: (message) =>Promise<{contentEncoding:"utf-8";contentType:"application/json";correlationId:string;eventName?:string;id:string;isHandledError:boolean;messageType:CommandErrorResponse;otp?:string;payload: {data?:unknown;message:string;status:StatusCode; };principalId?:string;receiver: {instanceId:string;serviceName:string;serviceTarget:string;serviceVersion:string; };sender: {instanceId:string;serviceName:string;serviceTarget:string;serviceVersion:string; };tenantId?:string;timestamp:number;traceId?:string; } | {contentEncoding:string;contentType:string;correlationId:string;eventName?:string;id:string;messageType:CommandSuccessResponse;otp?:string;payload:unknown;principalId?:string;receiver: {instanceId:string;serviceName:string;serviceTarget:string;serviceVersion:string; };sender: {instanceId:string;serviceName:string;serviceTarget:string;serviceVersion:string; };tenantId?:string;timestamp:number;traceId?:string; }>;channel:ConfirmChannel; }>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:102
subscriptions
protectedsubscriptions:Map<string,RegisteredSubscription>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:116
traceProvider
traceProvider:
NodeTracerProvider
Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:24
Inherited from
EventBridgeBaseClass.traceProvider
Methods
addConsumerRegistration()
protectedaddConsumerRegistration(channel,tag):void
Defined in: amqpbridge/src/AmqpBridge.impl.ts:127
Parameters
channel
ConfirmChannel
tag
string
Returns
void
createPublishingChannel()
protectedcreatePublishingChannel():Promise<ConfirmChannel>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:153
Returns
Promise<ConfirmChannel>
deadLetterSubscriptionMessage()
protecteddeadLetterSubscriptionMessage(channel,subscription,msg,reason):Promise<void>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:256
Parameters
channel
ConfirmChannel
subscription
msg
ConsumeMessage
reason
string
Returns
Promise<void>
decodeContent()
protecteddecodeContent<T>(input,contentType,contentEncoding):Promise<T>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:1264
Decode buffer into given type
Type Parameters
T
T
Parameters
input
Buffer
the input buffer
contentType
string
the content type of buffer content
contentEncoding
string
the encoding type of buffer content
Returns
Promise<T>
destroy()
destroy():
Promise<void>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:1283
Gracefully stops all consumers, waits for in-flight subscription handlers, closes AMQP resources and rejects unresolved pending invocations.
Returns
Promise<void>
Implementation of
Overrides
emitMessage()
emitMessage<
T>(message,contentType?,contentEncoding?):Promise<Readonly<EBMessage>>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:498
Emits a message via AMQP headers exchange. The message is encoded and encrypted according to configured codecs.
Type Parameters
T
T extends EBMessage
Parameters
message
Omit<EBMessage, "id" | "timestamp" | "correlationId">
contentType?
string = 'application/json'
contentEncoding?
string = 'utf-8'
Returns
Promise<Readonly<EBMessage>>
Implementation of
encodeContent()
protectedencodeContent<T>(input,contentType,contentEncoding):Promise<Buffer<ArrayBufferLike>>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:1243
Encode given payload to buffer
Type Parameters
T
T
Parameters
input
T
contentType
string
contentEncoding
string
Returns
Promise<Buffer<ArrayBufferLike>>
ensureSubscriptionRetryQueue()
protectedensureSubscriptionRetryQueue(channel,sourceQueueName,retryQueueName,retryDelayMs):Promise<void>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:240
Parameters
channel
ConfirmChannel
sourceQueueName
string
retryQueueName
string
retryDelayMs
number
Returns
Promise<void>
getConsumerAttempt()
protectedgetConsumerAttempt(headers):number
Defined in: amqpbridge/src/AmqpBridge.impl.ts:163
Parameters
headers
unknown
Returns
number
getInFlightExecutionCount()
getInFlightExecutionCount():
number
Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:69
Number of currently running handlers across all work kinds.
Returns
number
Implementation of
EventBridge.getInFlightExecutionCount
Inherited from
EventBridgeBaseClass.getInFlightExecutionCount
getInFlightExecutionCounts()
getInFlightExecutionCounts():
InFlightExecutionCounts
Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:70
Number of currently running handlers grouped by work kind.
Returns
Implementation of
EventBridge.getInFlightExecutionCounts
Inherited from
EventBridgeBaseClass.getInFlightExecutionCounts
getPausedSubscriptionConsumers()
getPausedSubscriptionConsumers():
object
Defined in: amqpbridge/src/AmqpBridge.impl.ts:1211
Returns paused subscription consumer states keyed by adapter registration key.
Returns
object
Implementation of
EventBridge.getPausedSubscriptionConsumers
Overrides
EventBridgeBaseClass.getPausedSubscriptionConsumers
getSubscriptionDeadLetterTarget()
protectedgetSubscriptionDeadLetterTarget(subscription):string|undefined
Defined in: amqpbridge/src/AmqpBridge.impl.ts:182
Parameters
subscription
Returns
string | undefined
getSubscriptionFailureReason()
protectedgetSubscriptionFailureReason(error):string
Defined in: amqpbridge/src/AmqpBridge.impl.ts:186
Parameters
error
unknown
Returns
string
getSubscriptionRetryQueueName()
protectedgetSubscriptionRetryQueueName(queueName,retryDelayMs):string
Defined in: amqpbridge/src/AmqpBridge.impl.ts:236
Parameters
queueName
string
retryDelayMs
number
Returns
string
getTracer()
getTracer():
Tracer
Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:38
Returns open telemetry tracer of this service
Returns
Tracer
Tracer
Inherited from
EventBridgeBaseClass.getTracer
invoke()
invoke<
T>(input,commandTimeout?):Promise<T>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:577
Invokes a remote command and waits for a matching command response. The call is rejected with timeout if no response is received in time.
Type Parameters
T
T
Parameters
input
Omit<Command, "id" | "messageType" | "timestamp" | "correlationId">
commandTimeout?
number = ...
Returns
Promise<T>
Implementation of
isHealthy()
isHealthy():
Promise<boolean>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:350
Indicates if the bridge connection and channels are currently healthy.
Returns
Promise<boolean>
Implementation of
isReady()
isReady():
Promise<boolean>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:343
Indicates if the bridge finished startup and is ready to process traffic.
Returns
Promise<boolean>
Implementation of
openStream()
openStream<
Chunk,Final>(_input,_ttl?):Promise<StreamHandle<Chunk,Final>>
Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:73
Open a stream invocation. The returned handle can be consumed via async iteration and can be cancelled by caller.
Type Parameters
Chunk
Chunk = unknown
Final
Final = unknown
Parameters
_input
Omit<StreamOpenRequest, "id" | "messageType" | "timestamp" | "correlationId">
_ttl?
number
Returns
Promise<StreamHandle<Chunk, Final>>
Implementation of
Inherited from
EventBridgeBaseClass.openStream
registerCommand()
registerCommand(
address,cb,metadata,eventBridgeConfig):Promise<string>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:678
Register a service function and ensure that there is a queue for all incoming command requests.
Parameters
address
The service function address
cb
(message) => Promise<{ contentEncoding: "utf-8"; contentType: "application/json"; correlationId: string; eventName?: string; id: string; isHandledError: boolean; messageType: CommandErrorResponse; otp?: string; payload: { data?: unknown; message: string; status: StatusCode; }; principalId?: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; } | { contentEncoding: string; contentType: string; correlationId: string; eventName?: string; id: string; messageType: CommandSuccessResponse; otp?: string; payload: unknown; principalId?: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; }>
the function to call if a matching command message arrives
metadata
eventBridgeConfig
Returns
Promise<string>
the id of command function queue
Implementation of
registerStream()
registerStream(
_address,_cb,_metadata,_eventBridgeConfig):Promise<string>
Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:74
Register a service stream.
Parameters
_address
_cb
(message) => Promise<void>
_metadata
_eventBridgeConfig
Returns
Promise<string>
Implementation of
Inherited from
EventBridgeBaseClass.registerStream
registerSubscription()
registerSubscription(
subscription,cb):Promise<string>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:932
Registers a subscription consumer and returns its stable subscription key.
Parameters
subscription
cb
(message) => Promise<Omit<{ contentEncoding: string; contentType: string; correlationId?: string; eventName: string; id: string; messageType: CustomMessage; otp?: string; payload?: unknown; principalId?: string; receiver?: EBMessageAddress; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; }, "id" | "timestamp"> | undefined>
Returns
Promise<string>
Implementation of
EventBridge.registerSubscription
removeConsumerRegistration()
protectedremoveConsumerRegistration(channel,tag):void
Defined in: amqpbridge/src/AmqpBridge.impl.ts:135
Parameters
channel
ConfirmChannel
tag
string
Returns
void
removeConsumerRegistrationsForChannel()
protectedremoveConsumerRegistrationsForChannel(channel):void
Defined in: amqpbridge/src/AmqpBridge.impl.ts:131
Parameters
channel
ConfirmChannel
Returns
void
resumeSubscriptionConsumer()
resumeSubscriptionConsumer(
registrationKey):Promise<void>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:1215
Resumes a paused subscription consumer by registration key.
Parameters
registrationKey
string
Returns
Promise<void>
Implementation of
EventBridge.resumeSubscriptionConsumer
Overrides
EventBridgeBaseClass.resumeSubscriptionConsumer
retrySubscriptionMessage()
protectedretrySubscriptionMessage(channel,queueName,msg,nextAttempt,retryDelayMs,durable):Promise<void>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:204
Parameters
channel
ConfirmChannel
queueName
string
msg
ConsumeMessage
nextAttempt
number
retryDelayMs
number
durable
boolean
Returns
Promise<void>
runInFlight()
runInFlight<
T>(fn,kind?):Promise<T>
Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:67
Type Parameters
T
T
Parameters
fn
() => Promise<T>
kind?
"command" | "subscription" | "stream" | "generic"
Returns
Promise<T>
Inherited from
EventBridgeBaseClass.runInFlight
sendToQueueAndConfirm()
protectedsendToQueueAndConfirm(channel,queueName,content,options):Promise<void>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:141
Parameters
channel
ConfirmChannel
queueName
string
content
Buffer
options
Publish | undefined
Returns
Promise<void>
start()
start():
Promise<void>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:357
Connect to RabbitMQ broker, ensure exchange, call back queue
Returns
Promise<void>
Implementation of
Overrides
startActiveSpan()
startActiveSpan<
F>(name,opts,context,fn):Promise<F>
Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:47
Start a child span for opentelemetry tracking
Type Parameters
F
F
Parameters
name
string
name of span
opts
SpanOptions
span options
context
Context | undefined
optional context
fn
(span) => Promise<F>
function to be executed within the span
Returns
Promise<F>
return value of fn
Inherited from
EventBridgeBaseClass.startActiveSpan
unregisterCommand()
unregisterCommand(
address):Promise<void>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:908
Unregisters a command consumer and closes the dedicated command channel.
Parameters
address
Returns
Promise<void>
Implementation of
unregisterStream()
unregisterStream(
_address):Promise<void>
Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:75
Unregister a service stream
Parameters
_address
Returns
Promise<void>
Implementation of
Inherited from
EventBridgeBaseClass.unregisterStream
unregisterSubscription()
unregisterSubscription(
address):Promise<void>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:1189
Unregisters a subscription consumer and closes its channel.
Parameters
address
Returns
Promise<void>
Implementation of
EventBridge.unregisterSubscription
waitForInFlightDrain()
waitForInFlightDrain(
timeoutMs?):Promise<boolean>
Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:68
Parameters
timeoutMs?
number
Returns
Promise<boolean>
Inherited from
EventBridgeBaseClass.waitForInFlightDrain
wrapInSpan()
wrapInSpan<
F>(name,opts,fn,context?):Promise<F>
Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:64
Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!
This means during logging and similar the spanId of parent span is logged.
Use wrapInSpan for marking points in flow of one bigger function, but not to trace the program flow itself
Type Parameters
F
F
Parameters
name
string
name of span
opts
SpanOptions
span options
fn
(span) => Promise<F>
function te be executed in the span
context?
Context
span context
Returns
Promise<F>
return value of fn
