PURISTA API / @purista/amqpbridge / AmqpBridge
Class: AmqpBridge
Defined in: amqpbridge/src/AmqpBridge.impl.ts:71
The AMQP event bridge connects to a AMQP broker.
Example
import { AmqpBridge } from '@purista/amqpbridge'
// create and init our eventbridge
const config = {
url: 'amqp://localhost'
}
const eventBridge = new AmqpBridge(config)
await eventBridge.start()Extends
Implements
Constructors
Constructor
new AmqpBridge(
config?):AmqpBridge
Defined in: amqpbridge/src/AmqpBridge.impl.ts:117
Parameters
config?
defaultCommandTimeout?
number
Overwrite the hardcoded default timeout of command invocations
encoder?
the encoder(s) to be used for AMQP messages
Default
jsonEncoderencrypter?
the encrypter(s) to be used for AMQP messages
Default
plainexchangeName?
string
the AMQP exchage name to be used
Default
puristaexchangeOptions?
AssertExchange
the AMQP exchange options
instanceId?
string
The instance id of the event bridge. If not set, a id will generated each time a instance is created. Use this if there is a need to always have the same instance id.
logger?
A logger instance
logLevel?
If no logger instance is given, use this log level
namePrefix?
string
the queue prefix to be used for all PURISTA queues except short living queues created by the broker on request
Default
puristasocketOptions?
unknown
socket options
spanProcessor?
SpanProcessor
A OpenTelemetry span processor
url?
string | Connect
the AMQP broker url
Default
amqp://localhostReturns
AmqpBridge
Overrides
EventBridgeBaseClass.constructor
Properties
channel?
protectedoptionalchannel:Channel
Defined in: amqpbridge/src/AmqpBridge.impl.ts:73
config
config:
Complete<EventBridgeConfig<ConfigType>>
Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:17
Inherited from
connection?
protectedoptionalconnection:ChannelModel
Defined in: amqpbridge/src/AmqpBridge.impl.ts:72
consumerRegistrations
protectedconsumerRegistrations:object[] =[]
Defined in: amqpbridge/src/AmqpBridge.impl.ts:78
channel
channel:
Channel
tag
tag:
string
defaultCommandTimeout
defaultCommandTimeout:
number
Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:20
The default time until when a command invocation automatically returns a time out error
Implementation of
EventBridge.defaultCommandTimeout
Inherited from
EventBridgeBaseClass.defaultCommandTimeout
encoder
protectedencoder:Encoder
Defined in: amqpbridge/src/AmqpBridge.impl.ts:101
encrypter
protectedencrypter:Encrypter
Defined in: amqpbridge/src/AmqpBridge.impl.ts:105
healthy
protectedhealthy:boolean=false
Defined in: amqpbridge/src/AmqpBridge.impl.ts:75
instanceId
instanceId:
string
Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:19
Implementation of
Inherited from
EventBridgeBaseClass.instanceId
logger
logger:
Logger
Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:15
Inherited from
AmqpBridge.logger
name
name:
string
Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:18
Implementation of
Inherited from
pendingInvocations
protectedpendingInvocations:Map<string,PendigInvocation>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:89
ready
protectedready:boolean=false
Defined in: amqpbridge/src/AmqpBridge.impl.ts:76
replyQueueName?
protectedoptionalreplyQueueName:string
Defined in: amqpbridge/src/AmqpBridge.impl.ts:80
runningSubscriptionCount
protectedrunningSubscriptionCount:number=0
Defined in: amqpbridge/src/AmqpBridge.impl.ts:91
serviceFunctions
protectedserviceFunctions:Map<string, {cb: (message) =>Promise<{contentEncoding:"utf-8";contentType:"application/json";correlationId:string;eventName?:string;id:string;isHandledError:boolean;messageType:CommandErrorResponse;otp?:string;payload: {data?:unknown;message:string;status:StatusCode; };principalId?:string;receiver: {instanceId:string;serviceName:string;serviceTarget:string;serviceVersion:string; };sender: {instanceId:string;serviceName:string;serviceTarget:string;serviceVersion:string; };tenantId?:string;timestamp:number;traceId?:string; } | {contentEncoding:string;contentType:string;correlationId:string;eventName?:string;id:string;messageType:CommandSuccessResponse;otp?:string;payload:unknown;principalId?:string;receiver: {instanceId:string;serviceName:string;serviceTarget:string;serviceVersion:string; };sender: {instanceId:string;serviceName:string;serviceTarget:string;serviceVersion:string; };tenantId?:string;timestamp:number;traceId?:string; }>;channel:Channel; }>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:81
subscriptions
protectedsubscriptions:Map<string, {cb: (message) =>Promise<Omit<{contentEncoding:string;contentType:string;correlationId?:string;eventName:string;id:string;messageType:CustomMessage;otp?:string;payload?:unknown;principalId?:string;receiver?:EBMessageAddress;sender: {instanceId:string;serviceName:string;serviceTarget:string;serviceVersion:string; };tenantId?:string;timestamp:number;traceId?:string; },"id"|"timestamp"> |undefined>;channel:Channel; }>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:93
traceProvider
traceProvider:
NodeTracerProvider
Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:16
Inherited from
EventBridgeBaseClass.traceProvider
Methods
addConsumerRegistration()
protectedaddConsumerRegistration(channel,tag):void
Defined in: amqpbridge/src/AmqpBridge.impl.ts:109
Parameters
channel
Channel
tag
string
Returns
void
decodeContent()
protecteddecodeContent<T>(input,contentType,contentEncoding):Promise<T>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:878
Decode buffer into given type
Type Parameters
T
T
Parameters
input
Buffer
the input buffer
contentType
string
the content type of buffer content
contentEncoding
string
the encoding type of buffer content
Returns
Promise<T>
destroy()
destroy():
Promise<void>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:897
Gracefully stops all consumers, waits for in-flight subscription handlers, closes AMQP resources and rejects unresolved pending invocations.
Returns
Promise<void>
Implementation of
Overrides
emit()
emit<
K>(eventName,parameter?):void
Defined in: core/dist/commonjs/core/types/GenericEventEmitter.d.ts:16
Type Parameters
K
K extends EventKey<{[key: `adapter-${string}`]: unknown; [key: `custom-${string}`]: unknown; eventbridge-connected: never; eventbridge-connection-error: unknown; eventbridge-disconnected: never; eventbridge-error: unknown; eventbridge-reconnecting: never; }>
Parameters
eventName
K
parameter?
object[K]
Returns
void
Inherited from
emitMessage()
emitMessage<
T>(message,contentType?,contentEncoding?):Promise<Readonly<EBMessage>>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:301
Emits a message via AMQP headers exchange. The message is encoded and encrypted according to configured codecs.
Type Parameters
T
T extends EBMessage
Parameters
message
Omit<EBMessage, "id" | "timestamp" | "correlationId">
contentType?
string = 'application/json'
contentEncoding?
string = 'utf-8'
Returns
Promise<Readonly<EBMessage>>
Implementation of
encodeContent()
protectedencodeContent<T>(input,contentType,contentEncoding):Promise<Buffer<ArrayBufferLike>>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:857
Encode given payload to buffer
Type Parameters
T
T
Parameters
input
T
contentType
string
contentEncoding
string
Returns
Promise<Buffer<ArrayBufferLike>>
getTracer()
getTracer():
Tracer
Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:27
Returns open telemetry tracer of this service
Returns
Tracer
Tracer
Inherited from
EventBridgeBaseClass.getTracer
invoke()
invoke<
T>(input,commandTimeout?):Promise<T>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:380
Invokes a remote command and waits for a matching command response. The call is rejected with timeout if no response is received in time.
Type Parameters
T
T
Parameters
input
Omit<Command, "id" | "messageType" | "timestamp" | "correlationId">
commandTimeout?
number = ...
Returns
Promise<T>
Implementation of
isHealthy()
isHealthy():
Promise<boolean>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:146
Indicates if the bridge connection and channels are currently healthy.
Returns
Promise<boolean>
Implementation of
isReady()
isReady():
Promise<boolean>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:139
Indicates if the bridge finished startup and is ready to process traffic.
Returns
Promise<boolean>
Implementation of
off()
off<
K>(eventName,fn):void
Defined in: core/dist/commonjs/core/types/GenericEventEmitter.d.ts:15
Type Parameters
K
K extends EventKey<{[key: `adapter-${string}`]: unknown; [key: `custom-${string}`]: unknown; eventbridge-connected: never; eventbridge-connection-error: unknown; eventbridge-disconnected: never; eventbridge-error: unknown; eventbridge-reconnecting: never; }>
Parameters
eventName
K
fn
EventReceiver<object[K]>
Returns
void
Inherited from
on()
on<
K>(eventName,fn):void
Defined in: core/dist/commonjs/core/types/GenericEventEmitter.d.ts:14
Type Parameters
K
K extends EventKey<{[key: `adapter-${string}`]: unknown; [key: `custom-${string}`]: unknown; eventbridge-connected: never; eventbridge-connection-error: unknown; eventbridge-disconnected: never; eventbridge-error: unknown; eventbridge-reconnecting: never; }>
Parameters
eventName
K
fn
EventReceiver<object[K]>
Returns
void
Inherited from
registerCommand()
registerCommand(
address,cb,metadata,eventBridgeConfig):Promise<string>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:513
Register a service function and ensure that there is a queue for all incoming command requests.
Parameters
address
The service function address
cb
(message) => Promise<{ contentEncoding: "utf-8"; contentType: "application/json"; correlationId: string; eventName?: string; id: string; isHandledError: boolean; messageType: CommandErrorResponse; otp?: string; payload: { data?: unknown; message: string; status: StatusCode; }; principalId?: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; } | { contentEncoding: string; contentType: string; correlationId: string; eventName?: string; id: string; messageType: CommandSuccessResponse; otp?: string; payload: unknown; principalId?: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; }>
the function to call if a matching command message arrives
metadata
eventBridgeConfig
Returns
Promise<string>
the id of command function queue
Implementation of
registerSubscription()
registerSubscription(
subscription,cb):Promise<string>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:704
Registers a subscription consumer and returns its stable subscription key.
Parameters
subscription
cb
(message) => Promise<Omit<{ contentEncoding: string; contentType: string; correlationId?: string; eventName: string; id: string; messageType: CustomMessage; otp?: string; payload?: unknown; principalId?: string; receiver?: EBMessageAddress; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; }, "id" | "timestamp"> | undefined>
Returns
Promise<string>
Implementation of
EventBridge.registerSubscription
removeAllListeners()
removeAllListeners():
void
Defined in: core/dist/commonjs/core/types/GenericEventEmitter.d.ts:17
Returns
void
Inherited from
EventBridgeBaseClass.removeAllListeners
removeConsumerRegistrationsForChannel()
protectedremoveConsumerRegistrationsForChannel(channel):void
Defined in: amqpbridge/src/AmqpBridge.impl.ts:113
Parameters
channel
Channel
Returns
void
start()
start():
Promise<void>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:153
Connect to RabbitMQ broker, ensure exchange, call back queue
Returns
Promise<void>
Implementation of
Overrides
startActiveSpan()
startActiveSpan<
F>(name,opts,context,fn):Promise<F>
Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:36
Start a child span for opentelemetry tracking
Type Parameters
F
F
Parameters
name
string
name of span
opts
SpanOptions
span options
context
optional context
Context | undefined
fn
(span) => Promise<F>
function to be executed within the span
Returns
Promise<F>
return value of fn
Inherited from
EventBridgeBaseClass.startActiveSpan
unregisterCommand()
unregisterCommand(
address):Promise<void>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:679
Unregisters a command consumer and closes the dedicated command channel.
Parameters
address
Returns
Promise<void>
Implementation of
unregisterSubscription()
unregisterSubscription(
address):Promise<void>
Defined in: amqpbridge/src/AmqpBridge.impl.ts:828
Unregisters a subscription consumer and closes its channel.
Parameters
address
Returns
Promise<void>
Implementation of
EventBridge.unregisterSubscription
wrapInSpan()
wrapInSpan<
F>(name,opts,fn,context?):Promise<F>
Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:52
Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!
This means during logging and similar the spanId of parent span is logged.
Use wrapInSpan for marking points in flow of one bigger function, but not to trace the program flow itself
Type Parameters
F
F
Parameters
name
string
name of span
opts
SpanOptions
span options
fn
(span) => Promise<F>
function te be executed in the span
context?
Context
span context
Returns
Promise<F>
return value of fn
