Class: AmqpBridge
PURISTA API / Modules / @purista/amqpbridge / AmqpBridge
Class: AmqpBridge
@purista/amqpbridge.AmqpBridge
The AMQP event bridge connects to a AMQP broker.
Example
import { AmqpBridge } from '@purista/amqpbridge'
// create and init our eventbridge
const config = {
url: 'amqp://localhost'
}
const eventBridge = new AmqpBridge(config)
await eventBridge.start()
Hierarchy
EventBridgeBaseClass
<AmqpBridgeConfig
>↳
AmqpBridge
Implements
EventBridge
Table of contents
Constructors
Properties
- channel
- config
- connection
- consumerTags
- defaultCommandTimeout
- encoder
- encrypter
- healthy
- instanceId
- logger
- name
- pendingInvocations
- ready
- replyQueueName
- runningSubscriptionCount
- serviceFunctions
- subscriptions
- traceProvider
Methods
- decodeContent
- destroy
- emit
- emitMessage
- encodeContent
- getTracer
- invoke
- isHealthy
- isReady
- off
- on
- registerCommand
- registerSubscription
- removeAllListeners
- start
- startActiveSpan
- unregisterCommand
- unregisterSubscription
- wrapInSpan
Constructors
constructor
• new AmqpBridge(config?
): AmqpBridge
Parameters
Name | Type | Description |
---|---|---|
config? | Object | - |
config.defaultCommandTimeout? | number | Overwrite the hardcoded default timeout of command invocations |
config.encoder? | Encoder | the encoder(s) to be used for AMQP messages Default ts jsonEncoder |
config.encrypter? | Encrypter | the encrypter(s) to be used for AMQP messages Default ts plain |
config.exchangeName? | string | the AMQP exchage name to be used Default ts purista |
config.exchangeOptions? | AssertExchange | the AMQP exchange options |
config.instanceId? | string | The instance id of the event bridge. If not set, a id will generated each time a instance is created. Use this if there is a need to always have the same instance id. |
config.logLevel? | LogLevelName | If no logger instance is given, use this log level |
config.logger? | Logger | A logger instance |
config.namePrefix? | string | the queue prefix to be used for all PURISTA queues except short living queues created by the broker on request Default ts purista |
config.socketOptions? | any | socket options |
config.spanProcessor? | SpanProcessor | A OpenTelemetry span processor |
config.url? | string | Connect | the AMQP broker url Default ts amqp://localhost |
Returns
Overrides
EventBridgeBaseClass<AmqpBridgeConfig>.constructor
Defined in
amqpbridge/src/AmqpBridge.impl.ts:103
Properties
channel
• Protected
Optional
channel: Channel
Defined in
amqpbridge/src/AmqpBridge.impl.ts:67
config
• config: Complete
<{ defaultCommandTimeout?
: number
; encoder?
: Encoder
; encrypter?
: Encrypter
; exchangeName?
: string
; exchangeOptions?
: AssertExchange
; instanceId?
: string
; logLevel?
: LogLevelName
; logger?
: Logger
; namePrefix?
: string
; socketOptions?
: any
; spanProcessor?
: SpanProcessor
; url?
: string
| Connect
}>
Inherited from
EventBridgeBaseClass.config
Defined in
core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:13
connection
• Protected
Optional
connection: Connection
Defined in
amqpbridge/src/AmqpBridge.impl.ts:66
consumerTags
• Protected
consumerTags: string
[] = []
Defined in
amqpbridge/src/AmqpBridge.impl.ts:72
defaultCommandTimeout
• defaultCommandTimeout: number
Implementation of
EventBridge.defaultCommandTimeout
Inherited from
EventBridgeBaseClass.defaultCommandTimeout
Defined in
core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:16
encoder
• Protected
encoder: Encoder
Defined in
amqpbridge/src/AmqpBridge.impl.ts:95
encrypter
• Protected
encrypter: Encrypter
Defined in
amqpbridge/src/AmqpBridge.impl.ts:99
healthy
• Protected
healthy: boolean
= false
Defined in
amqpbridge/src/AmqpBridge.impl.ts:69
instanceId
• instanceId: string
Implementation of
EventBridge.instanceId
Inherited from
EventBridgeBaseClass.instanceId
Defined in
core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:15
logger
• logger: Logger
Inherited from
EventBridgeBaseClass.logger
Defined in
core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:11
name
• name: string
Implementation of
EventBridge.name
Inherited from
EventBridgeBaseClass.name
Defined in
core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:14
pendingInvocations
• Protected
pendingInvocations: Map
<string
, PendigInvocation
>
Defined in
amqpbridge/src/AmqpBridge.impl.ts:83
ready
• Protected
ready: boolean
= false
Defined in
amqpbridge/src/AmqpBridge.impl.ts:70
replyQueueName
• Protected
Optional
replyQueueName: string
Defined in
amqpbridge/src/AmqpBridge.impl.ts:74
runningSubscriptionCount
• Protected
runningSubscriptionCount: number
= 0
Defined in
amqpbridge/src/AmqpBridge.impl.ts:85
serviceFunctions
• Protected
serviceFunctions: Map
<string
, { cb
: (message
: { contentEncoding
: string
; contentType
: string
; correlationId
: string
; eventName?
: string
; id
: string
; messageType
: Command
; otp?
: string
; payload
: { parameter
: unknown
; payload
: unknown
} ; principalId?
: string
; receiver
: EBMessageAddress
; sender
: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId?
: string
; timestamp
: number
; traceId?
: string
}) => Promise
<{ contentEncoding
: "utf-8"
; contentType
: "application/json"
; correlationId
: string
; eventName?
: string
; id
: string
; isHandledError
: boolean
; messageType
: CommandErrorResponse
; otp?
: string
; payload
: { data?
: unknown
; message
: string
; status
: StatusCode
} ; principalId?
: string
; receiver
: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; sender
: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId?
: string
; timestamp
: number
; traceId?
: string
} | { contentEncoding
: string
; contentType
: string
; correlationId
: string
; eventName?
: string
; id
: string
; messageType
: CommandSuccessResponse
; otp?
: string
; payload
: unknown
; principalId?
: string
; receiver
: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; sender
: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId?
: string
; timestamp
: number
; traceId?
: string
}> ; channel
: Channel
}>
Defined in
amqpbridge/src/AmqpBridge.impl.ts:75
subscriptions
• Protected
subscriptions: Map
<string
, { cb
: (message
: { contentEncoding
: string
; contentType
: string
; correlationId?
: string
; eventName
: string
; id
: string
; messageType
: CustomMessage
; otp?
: string
; payload?
: unknown
; principalId?
: string
; receiver?
: EBMessageAddress
; sender
: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId?
: string
; timestamp
: number
; traceId?
: string
}) => Promise
<undefined
| Omit
<{ contentEncoding
: string
; contentType
: string
; correlationId?
: string
; eventName
: string
; id
: string
; messageType
: CustomMessage
; otp?
: string
; payload?
: unknown
; principalId?
: string
; receiver?
: EBMessageAddress
; sender
: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId?
: string
; timestamp
: number
; traceId?
: string
}, "id"
| "timestamp"
>> ; channel
: Channel
}>
Defined in
amqpbridge/src/AmqpBridge.impl.ts:87
traceProvider
• traceProvider: NodeTracerProvider
Inherited from
EventBridgeBaseClass.traceProvider
Defined in
core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:12
Methods
decodeContent
▸ decodeContent<T
>(input
, contentType
, contentEncoding
): Promise
<T
>
Decode buffer into given type
Type parameters
Name |
---|
T |
Parameters
Name | Type | Description |
---|---|---|
input | Buffer | the input buffer |
contentType | string | the content type of buffer content |
contentEncoding | string | the encoding type of buffer content |
Returns
Promise
<T
>
Defined in
amqpbridge/src/AmqpBridge.impl.ts:819
destroy
▸ destroy(): Promise
<void
>
Returns
Promise
<void
>
Implementation of
EventBridge.destroy
Overrides
EventBridgeBaseClass.destroy
Defined in
amqpbridge/src/AmqpBridge.impl.ts:834
emit
▸ emit<K
>(eventName
, parameter?
): void
Type parameters
Name | Type |
---|---|
K | extends EventKey <{ eventbridge-connected : never ; eventbridge-connection-error : unknown ; eventbridge-disconnected : never ; eventbridge-error : unknown ; eventbridge-reconnecting : never }> |
Parameters
Name | Type |
---|---|
eventName | K |
parameter? | { eventbridge-connected : never ; eventbridge-connection-error : unknown ; eventbridge-disconnected : never ; eventbridge-error : unknown ; eventbridge-reconnecting : never }[K ] |
Returns
void
Inherited from
EventBridgeBaseClass.emit
Defined in
core/lib/types/core/types/GenericEventEmitter.d.ts:13
emitMessage
▸ emitMessage<T
>(message
, contentType?
, contentEncoding?
): Promise
<Readonly
<EBMessage
>>
Type parameters
Name | Type |
---|---|
T | extends EBMessage |
Parameters
Name | Type | Default value |
---|---|---|
message | Omit <EBMessage , "id" | "timestamp" | "correlationId" > | undefined |
contentType | string | 'application/json' |
contentEncoding | string | 'utf-8' |
Returns
Promise
<Readonly
<EBMessage
>>
Implementation of
EventBridge.emitMessage
Defined in
amqpbridge/src/AmqpBridge.impl.ts:277
encodeContent
▸ encodeContent<T
>(input
, contentType
, contentEncoding
): Promise
<Buffer
>
Encode given payload to buffer
Type parameters
Name |
---|
T |
Parameters
Name | Type |
---|---|
input | T |
contentType | string |
contentEncoding | string |
Returns
Promise
<Buffer
>
Defined in
amqpbridge/src/AmqpBridge.impl.ts:798
getTracer
▸ getTracer(): Tracer
Returns open telemetry tracer of this service
Returns
Tracer
Tracer
Inherited from
EventBridgeBaseClass.getTracer
Defined in
core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:23
invoke
▸ invoke<T
>(input
, commandTimeout?
): Promise
<T
>
Type parameters
Name |
---|
T |
Parameters
Name | Type |
---|---|
input | Omit <{ contentEncoding : string ; contentType : string ; correlationId : string ; eventName? : string ; id : string ; messageType : Command ; otp? : string ; payload : { parameter : unknown ; payload : unknown } ; principalId? : string ; receiver : EBMessageAddress ; sender : { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId? : string ; timestamp : number ; traceId? : string }, "id" | "timestamp" | "correlationId" | "messageType" > |
commandTimeout | number |
Returns
Promise
<T
>
Implementation of
EventBridge.invoke
Defined in
amqpbridge/src/AmqpBridge.impl.ts:352
isHealthy
▸ isHealthy(): Promise
<boolean
>
Returns
Promise
<boolean
>
Implementation of
EventBridge.isHealthy
Defined in
amqpbridge/src/AmqpBridge.impl.ts:126
isReady
▸ isReady(): Promise
<boolean
>
Returns
Promise
<boolean
>
Implementation of
EventBridge.isReady
Defined in
amqpbridge/src/AmqpBridge.impl.ts:122
off
▸ off<K
>(eventName
, fn
): void
Type parameters
Name | Type |
---|---|
K | extends EventKey <{ eventbridge-connected : never ; eventbridge-connection-error : unknown ; eventbridge-disconnected : never ; eventbridge-error : unknown ; eventbridge-reconnecting : never }> |
Parameters
Name | Type |
---|---|
eventName | K |
fn | EventReceiver <{ eventbridge-connected : never ; eventbridge-connection-error : unknown ; eventbridge-disconnected : never ; eventbridge-error : unknown ; eventbridge-reconnecting : never }[K ]> |
Returns
void
Inherited from
EventBridgeBaseClass.off
Defined in
core/lib/types/core/types/GenericEventEmitter.d.ts:12
on
▸ on<K
>(eventName
, fn
): void
Type parameters
Name | Type |
---|---|
K | extends EventKey <{ eventbridge-connected : never ; eventbridge-connection-error : unknown ; eventbridge-disconnected : never ; eventbridge-error : unknown ; eventbridge-reconnecting : never }> |
Parameters
Name | Type |
---|---|
eventName | K |
fn | EventReceiver <{ eventbridge-connected : never ; eventbridge-connection-error : unknown ; eventbridge-disconnected : never ; eventbridge-error : unknown ; eventbridge-reconnecting : never }[K ]> |
Returns
void
Inherited from
EventBridgeBaseClass.on
Defined in
core/lib/types/core/types/GenericEventEmitter.d.ts:11
registerCommand
▸ registerCommand(address
, cb
, metadata
, eventBridgeConfig
): Promise
<string
>
Register a service function and ensure that there is a queue for all incoming command requests.
Parameters
Name | Type | Description |
---|---|---|
address | EBMessageAddress | The service function address |
cb | (message : { contentEncoding : string ; contentType : string ; correlationId : string ; eventName? : string ; id : string ; messageType : Command ; otp? : string ; payload : { parameter : unknown ; payload : unknown } ; principalId? : string ; receiver : EBMessageAddress ; sender : { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId? : string ; timestamp : number ; traceId? : string }) => Promise <{ contentEncoding : "utf-8" ; contentType : "application/json" ; correlationId : string ; eventName? : string ; id : string ; isHandledError : boolean ; messageType : CommandErrorResponse ; otp? : string ; payload : { data? : unknown ; message : string ; status : StatusCode } ; principalId? : string ; receiver : { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; sender : { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId? : string ; timestamp : number ; traceId? : string } | { contentEncoding : string ; contentType : string ; correlationId : string ; eventName? : string ; id : string ; messageType : CommandSuccessResponse ; otp? : string ; payload : unknown ; principalId? : string ; receiver : { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; sender : { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId? : string ; timestamp : number ; traceId? : string }> | the function to call if a matching command message arrives |
metadata | CommandDefinitionMetadataBase | - |
eventBridgeConfig | DefinitionEventBridgeConfig | - |
Returns
Promise
<string
>
the id of command function queue
Implementation of
EventBridge.registerCommand
Defined in
amqpbridge/src/AmqpBridge.impl.ts:470
registerSubscription
▸ registerSubscription(subscription
, cb
): Promise
<string
>
Parameters
Name | Type |
---|---|
subscription | Subscription |
cb | (message : EBMessage ) => Promise <undefined | Omit <{ contentEncoding : string ; contentType : string ; correlationId? : string ; eventName : string ; id : string ; messageType : CustomMessage ; otp? : string ; payload? : unknown ; principalId? : string ; receiver? : EBMessageAddress ; sender : { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId? : string ; timestamp : number ; traceId? : string }, "id" | "timestamp" >> |
Returns
Promise
<string
>
Implementation of
EventBridge.registerSubscription
Defined in
amqpbridge/src/AmqpBridge.impl.ts:652
removeAllListeners
▸ removeAllListeners(): void
Returns
void
Inherited from
EventBridgeBaseClass.removeAllListeners
Defined in
core/lib/types/core/types/GenericEventEmitter.d.ts:14
start
▸ start(): Promise
<void
>
Connect to RabbitMQ broker, ensure exchange, call back queue
Returns
Promise
<void
>
Implementation of
EventBridge.start
Overrides
EventBridgeBaseClass.start
Defined in
amqpbridge/src/AmqpBridge.impl.ts:133
startActiveSpan
▸ startActiveSpan<F
>(name
, opts
, context
, fn
): Promise
<F
>
Start a child span for opentelemetry tracking
Type parameters
Name |
---|
F |
Parameters
Name | Type | Description |
---|---|---|
name | string | name of span |
opts | SpanOptions | span options |
context | undefined | Context | optional context |
fn | (span : Span ) => Promise <F > | function to be executed within the span |
Returns
Promise
<F
>
return value of fn
Inherited from
EventBridgeBaseClass.startActiveSpan
Defined in
core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:32
unregisterCommand
▸ unregisterCommand(address
): Promise
<void
>
Parameters
Name | Type |
---|---|
address | EBMessageAddress |
Returns
Promise
<void
>
Implementation of
EventBridge.unregisterCommand
Defined in
amqpbridge/src/AmqpBridge.impl.ts:633
unregisterSubscription
▸ unregisterSubscription(address
): Promise
<void
>
Parameters
Name | Type |
---|---|
address | EBMessageAddress |
Returns
Promise
<void
>
Implementation of
EventBridge.unregisterSubscription
Defined in
amqpbridge/src/AmqpBridge.impl.ts:772
wrapInSpan
▸ wrapInSpan<F
>(name
, opts
, fn
, context?
): Promise
<F
>
Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!
This means during logging and similar the spanId of parent span is logged.
Use wrapInSpan for marking points in flow of one bigger function, but not to trace the program flow itself
Type parameters
Name |
---|
F |
Parameters
Name | Type | Description |
---|---|---|
name | string | name of span |
opts | SpanOptions | span options |
fn | (span : Span ) => Promise <F > | function te be executed in the span |
context? | Context | span context |
Returns
Promise
<F
>
return value of fn
Inherited from
EventBridgeBaseClass.wrapInSpan
Defined in
core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:48