PURISTA API / Modules / @purista/amqpbridge / AmqpBridge
Class: AmqpBridge
@purista/amqpbridge.AmqpBridge
The AMQP event bridge connects to a AMQP broker.
Example
import { AmqpBridge } from '@purista/amqpbridge'
// create and init our eventbridge
const config = {
url: 'amqp://localhost'
}
const eventBridge = new AmqpBridge(config)
await eventBridge.start()
Hierarchy
EventBridgeBaseClass
<AmqpBridgeConfig
>↳
AmqpBridge
Implements
Table of contents
Constructors
Properties
- channel
- config
- connection
- consumerTags
- defaultCommandTimeout
- encoder
- encrypter
- healthy
- instanceId
- logger
- name
- pendingInvocations
- ready
- replyQueueName
- runningSubscriptionCount
- serviceFunctions
- subscriptions
- traceProvider
Methods
- decodeContent
- destroy
- emit
- emitMessage
- encodeContent
- getTracer
- invoke
- isHealthy
- isReady
- off
- on
- registerCommand
- registerSubscription
- removeAllListeners
- start
- startActiveSpan
- unregisterCommand
- unregisterSubscription
- wrapInSpan
Constructors
constructor
• new AmqpBridge(config?
): AmqpBridge
Parameters
Name | Type | Description |
---|---|---|
config? | Object | - |
config.defaultCommandTimeout? | number | Overwrite the hardcoded default timeout of command invocations |
config.encoder? | Encoder | the encoder(s) to be used for AMQP messages Default ts jsonEncoder |
config.encrypter? | Encrypter | the encrypter(s) to be used for AMQP messages Default ts plain |
config.exchangeName? | string | the AMQP exchage name to be used Default ts purista |
config.exchangeOptions? | AssertExchange | the AMQP exchange options |
config.instanceId? | string | The instance id of the event bridge. If not set, a id will generated each time a instance is created. Use this if there is a need to always have the same instance id. |
config.logLevel? | LogLevelName | If no logger instance is given, use this log level |
config.logger? | Logger | A logger instance |
config.namePrefix? | string | the queue prefix to be used for all PURISTA queues except short living queues created by the broker on request Default ts purista |
config.socketOptions? | any | socket options |
config.spanProcessor? | SpanProcessor | A OpenTelemetry span processor |
config.url? | string | Connect | the AMQP broker url Default ts amqp://localhost |
Returns
Overrides
EventBridgeBaseClass.constructor
Defined in
amqpbridge/src/AmqpBridge.impl.ts:105
Properties
channel
• Protected
Optional
channel: Channel
Defined in
amqpbridge/src/AmqpBridge.impl.ts:69
config
• config: Complete
<{ defaultCommandTimeout?
: number
; encoder?
: Encoder
; encrypter?
: Encrypter
; exchangeName?
: string
; exchangeOptions?
: AssertExchange
; instanceId?
: string
; logLevel?
: LogLevelName
; logger?
: Logger
; namePrefix?
: string
; socketOptions?
: any
; spanProcessor?
: SpanProcessor
; url?
: string
| Connect
}>
Inherited from
Defined in
core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:14
connection
• Protected
Optional
connection: Connection
Defined in
amqpbridge/src/AmqpBridge.impl.ts:68
consumerTags
• Protected
consumerTags: string
[] = []
Defined in
amqpbridge/src/AmqpBridge.impl.ts:74
defaultCommandTimeout
• defaultCommandTimeout: number
The default time until when a command invocation automatically returns a time out error
Implementation of
EventBridge.defaultCommandTimeout
Inherited from
EventBridgeBaseClass.defaultCommandTimeout
Defined in
core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:17
encoder
• Protected
encoder: Encoder
Defined in
amqpbridge/src/AmqpBridge.impl.ts:97
encrypter
• Protected
encrypter: Encrypter
Defined in
amqpbridge/src/AmqpBridge.impl.ts:101
healthy
• Protected
healthy: boolean
= false
Defined in
amqpbridge/src/AmqpBridge.impl.ts:71
instanceId
• instanceId: string
Implementation of
Inherited from
EventBridgeBaseClass.instanceId
Defined in
core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:16
logger
• logger: Logger
Inherited from
Defined in
core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:12
name
• name: string
Implementation of
Inherited from
Defined in
core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:15
pendingInvocations
• Protected
pendingInvocations: Map
<string
, PendigInvocation
>
Defined in
amqpbridge/src/AmqpBridge.impl.ts:85
ready
• Protected
ready: boolean
= false
Defined in
amqpbridge/src/AmqpBridge.impl.ts:72
replyQueueName
• Protected
Optional
replyQueueName: string
Defined in
amqpbridge/src/AmqpBridge.impl.ts:76
runningSubscriptionCount
• Protected
runningSubscriptionCount: number
= 0
Defined in
amqpbridge/src/AmqpBridge.impl.ts:87
serviceFunctions
• Protected
serviceFunctions: Map
<string
, { cb
: (message
: { contentEncoding
: string
; contentType
: string
; correlationId
: string
; eventName?
: string
; id
: string
; messageType
: Command
; otp?
: string
; payload
: { parameter
: unknown
; payload
: unknown
} ; principalId?
: string
; receiver
: EBMessageAddress
; sender
: { instanceId
: string
; serviceName
: string
; serviceTarget
: string
; serviceVersion
: string
} ; tenantId?
: string
; timestamp
: number
; traceId?
: string
}) => Promise
<{ contentEncoding
: "utf-8"
; contentType
: "application/json"
; correlationId
: string
; eventName?
: string
; id
: string
; isHandledError
: boolean
; messageType
: CommandErrorResponse
; otp?
: string
; payload
: { data?
: unknown
; message
: string
; status
: StatusCode
} ; principalId?
: string
; receiver
: { instanceId
: string
; serviceName
: string
; serviceTarget
: string
; serviceVersion
: string
} ; sender
: { instanceId
: string
; serviceName
: string
; serviceTarget
: string
; serviceVersion
: string
} ; tenantId?
: string
; timestamp
: number
; traceId?
: string
} | { contentEncoding
: string
; contentType
: string
; correlationId
: string
; eventName?
: string
; id
: string
; messageType
: CommandSuccessResponse
; otp?
: string
; payload
: unknown
; principalId?
: string
; receiver
: { instanceId
: string
; serviceName
: string
; serviceTarget
: string
; serviceVersion
: string
} ; sender
: { instanceId
: string
; serviceName
: string
; serviceTarget
: string
; serviceVersion
: string
} ; tenantId?
: string
; timestamp
: number
; traceId?
: string
}> ; channel
: Channel
}>
Defined in
amqpbridge/src/AmqpBridge.impl.ts:77
subscriptions
• Protected
subscriptions: Map
<string
, { cb
: (message
: { contentEncoding
: string
; contentType
: string
; correlationId?
: string
; eventName
: string
; id
: string
; messageType
: CustomMessage
; otp?
: string
; payload?
: unknown
; principalId?
: string
; receiver?
: EBMessageAddress
; sender
: { instanceId
: string
; serviceName
: string
; serviceTarget
: string
; serviceVersion
: string
} ; tenantId?
: string
; timestamp
: number
; traceId?
: string
}) => Promise
<undefined
| Omit
<{ contentEncoding
: string
; contentType
: string
; correlationId?
: string
; eventName
: string
; id
: string
; messageType
: CustomMessage
; otp?
: string
; payload?
: unknown
; principalId?
: string
; receiver?
: EBMessageAddress
; sender
: { instanceId
: string
; serviceName
: string
; serviceTarget
: string
; serviceVersion
: string
} ; tenantId?
: string
; timestamp
: number
; traceId?
: string
}, "id"
| "timestamp"
>> ; channel
: Channel
}>
Defined in
amqpbridge/src/AmqpBridge.impl.ts:89
traceProvider
• traceProvider: NodeTracerProvider
Inherited from
EventBridgeBaseClass.traceProvider
Defined in
core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:13
Methods
decodeContent
▸ decodeContent<T
>(input
, contentType
, contentEncoding
): Promise
<T
>
Decode buffer into given type
Type parameters
Name |
---|
T |
Parameters
Name | Type | Description |
---|---|---|
input | Buffer | the input buffer |
contentType | string | the content type of buffer content |
contentEncoding | string | the encoding type of buffer content |
Returns
Promise
<T
>
Defined in
amqpbridge/src/AmqpBridge.impl.ts:821
destroy
▸ destroy(): Promise
<void
>
Shut down event bridge as gracefully as possible
Returns
Promise
<void
>
Implementation of
Overrides
Defined in
amqpbridge/src/AmqpBridge.impl.ts:836
emit
▸ emit<K
>(eventName
, parameter?
): void
Type parameters
Name | Type |
---|---|
K | extends EventKey <{ eventbridge-connected : never ; eventbridge-connection-error : unknown ; eventbridge-disconnected : never ; eventbridge-error : unknown ; eventbridge-reconnecting : never }> |
Parameters
Name | Type |
---|---|
eventName | K |
parameter? | { eventbridge-connected : never ; eventbridge-connection-error : unknown ; eventbridge-disconnected : never ; eventbridge-error : unknown ; eventbridge-reconnecting : never }[K ] |
Returns
void
Inherited from
Defined in
core/dist/commonjs/core/types/GenericEventEmitter.d.ts:13
emitMessage
▸ emitMessage<T
>(message
, contentType?
, contentEncoding?
): Promise
<Readonly
<EBMessage
>>
Emit a message to the eventbridge without awaiting a result
Type parameters
Name | Type |
---|---|
T | extends EBMessage |
Parameters
Name | Type | Default value | Description |
---|---|---|---|
message | Omit <EBMessage , "id" | "timestamp" | "correlationId" > | undefined | the message |
contentType | string | 'application/json' | - |
contentEncoding | string | 'utf-8' | - |
Returns
Promise
<Readonly
<EBMessage
>>
Implementation of
Defined in
amqpbridge/src/AmqpBridge.impl.ts:279
encodeContent
▸ encodeContent<T
>(input
, contentType
, contentEncoding
): Promise
<Buffer
>
Encode given payload to buffer
Type parameters
Name |
---|
T |
Parameters
Name | Type |
---|---|
input | T |
contentType | string |
contentEncoding | string |
Returns
Promise
<Buffer
>
Defined in
amqpbridge/src/AmqpBridge.impl.ts:800
getTracer
▸ getTracer(): Tracer
Returns open telemetry tracer of this service
Returns
Tracer
Tracer
Inherited from
EventBridgeBaseClass.getTracer
Defined in
core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:24
invoke
▸ invoke<T
>(input
, commandTimeout?
): Promise
<T
>
Call a command of a service and return the result of this command
Type parameters
Name |
---|
T |
Parameters
Name | Type | Description |
---|---|---|
input | Omit <{ contentEncoding : string ; contentType : string ; correlationId : string ; eventName? : string ; id : string ; messageType : Command ; otp? : string ; payload : { parameter : unknown ; payload : unknown } ; principalId? : string ; receiver : EBMessageAddress ; sender : { instanceId : string ; serviceName : string ; serviceTarget : string ; serviceVersion : string } ; tenantId? : string ; timestamp : number ; traceId? : string }, "id" | "timestamp" | "correlationId" | "messageType" > | a partial command message |
commandTimeout | number | the time to live (timeout) of the invocation |
Returns
Promise
<T
>
Implementation of
Defined in
amqpbridge/src/AmqpBridge.impl.ts:354
isHealthy
▸ isHealthy(): Promise
<boolean
>
Indicates if the eventbridge is running and works correctly
Returns
Promise
<boolean
>
Implementation of
Defined in
amqpbridge/src/AmqpBridge.impl.ts:128
isReady
▸ isReady(): Promise
<boolean
>
Indicates if the eventbridge has been started and is connected to underlaying message broker
Returns
Promise
<boolean
>
Implementation of
Defined in
amqpbridge/src/AmqpBridge.impl.ts:124
off
▸ off<K
>(eventName
, fn
): void
Type parameters
Name | Type |
---|---|
K | extends EventKey <{ eventbridge-connected : never ; eventbridge-connection-error : unknown ; eventbridge-disconnected : never ; eventbridge-error : unknown ; eventbridge-reconnecting : never }> |
Parameters
Name | Type |
---|---|
eventName | K |
fn | EventReceiver <{ eventbridge-connected : never ; eventbridge-connection-error : unknown ; eventbridge-disconnected : never ; eventbridge-error : unknown ; eventbridge-reconnecting : never }[K ]> |
Returns
void
Inherited from
Defined in
core/dist/commonjs/core/types/GenericEventEmitter.d.ts:12
on
▸ on<K
>(eventName
, fn
): void
Type parameters
Name | Type |
---|---|
K | extends EventKey <{ eventbridge-connected : never ; eventbridge-connection-error : unknown ; eventbridge-disconnected : never ; eventbridge-error : unknown ; eventbridge-reconnecting : never }> |
Parameters
Name | Type |
---|---|
eventName | K |
fn | EventReceiver <{ eventbridge-connected : never ; eventbridge-connection-error : unknown ; eventbridge-disconnected : never ; eventbridge-error : unknown ; eventbridge-reconnecting : never }[K ]> |
Returns
void
Inherited from
Defined in
core/dist/commonjs/core/types/GenericEventEmitter.d.ts:11
registerCommand
▸ registerCommand(address
, cb
, metadata
, eventBridgeConfig
): Promise
<string
>
Register a service function and ensure that there is a queue for all incoming command requests.
Parameters
Name | Type | Description |
---|---|---|
address | EBMessageAddress | The service function address |
cb | (message : { contentEncoding : string ; contentType : string ; correlationId : string ; eventName? : string ; id : string ; messageType : Command ; otp? : string ; payload : { parameter : unknown ; payload : unknown } ; principalId? : string ; receiver : EBMessageAddress ; sender : { instanceId : string ; serviceName : string ; serviceTarget : string ; serviceVersion : string } ; tenantId? : string ; timestamp : number ; traceId? : string }) => Promise <{ contentEncoding : "utf-8" ; contentType : "application/json" ; correlationId : string ; eventName? : string ; id : string ; isHandledError : boolean ; messageType : CommandErrorResponse ; otp? : string ; payload : { data? : unknown ; message : string ; status : StatusCode } ; principalId? : string ; receiver : { instanceId : string ; serviceName : string ; serviceTarget : string ; serviceVersion : string } ; sender : { instanceId : string ; serviceName : string ; serviceTarget : string ; serviceVersion : string } ; tenantId? : string ; timestamp : number ; traceId? : string } | { contentEncoding : string ; contentType : string ; correlationId : string ; eventName? : string ; id : string ; messageType : CommandSuccessResponse ; otp? : string ; payload : unknown ; principalId? : string ; receiver : { instanceId : string ; serviceName : string ; serviceTarget : string ; serviceVersion : string } ; sender : { instanceId : string ; serviceName : string ; serviceTarget : string ; serviceVersion : string } ; tenantId? : string ; timestamp : number ; traceId? : string }> | the function to call if a matching command message arrives |
metadata | CommandDefinitionMetadataBase | - |
eventBridgeConfig | DefinitionEventBridgeConfig | - |
Returns
Promise
<string
>
the id of command function queue
Implementation of
Defined in
amqpbridge/src/AmqpBridge.impl.ts:472
registerSubscription
▸ registerSubscription(subscription
, cb
): Promise
<string
>
Register a new subscription
Parameters
Name | Type | Description |
---|---|---|
subscription | Subscription | the subscription definition |
cb | (message : EBMessage ) => Promise <undefined | Omit <{ contentEncoding : string ; contentType : string ; correlationId? : string ; eventName : string ; id : string ; messageType : CustomMessage ; otp? : string ; payload? : unknown ; principalId? : string ; receiver? : EBMessageAddress ; sender : { instanceId : string ; serviceName : string ; serviceTarget : string ; serviceVersion : string } ; tenantId? : string ; timestamp : number ; traceId? : string }, "id" | "timestamp" >> | the function to be called if a matching message arrives |
Returns
Promise
<string
>
Implementation of
EventBridge.registerSubscription
Defined in
amqpbridge/src/AmqpBridge.impl.ts:654
removeAllListeners
▸ removeAllListeners(): void
Returns
void
Inherited from
EventBridgeBaseClass.removeAllListeners
Defined in
core/dist/commonjs/core/types/GenericEventEmitter.d.ts:14
start
▸ start(): Promise
<void
>
Connect to RabbitMQ broker, ensure exchange, call back queue
Returns
Promise
<void
>
Implementation of
Overrides
Defined in
amqpbridge/src/AmqpBridge.impl.ts:135
startActiveSpan
▸ startActiveSpan<F
>(name
, opts
, context
, fn
): Promise
<F
>
Start a child span for opentelemetry tracking
Type parameters
Name |
---|
F |
Parameters
Name | Type | Description |
---|---|---|
name | string | name of span |
opts | SpanOptions | span options |
context | undefined | Context | optional context |
fn | (span : Span ) => Promise <F > | function to be executed within the span |
Returns
Promise
<F
>
return value of fn
Inherited from
EventBridgeBaseClass.startActiveSpan
Defined in
core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:33
unregisterCommand
▸ unregisterCommand(address
): Promise
<void
>
Unregister a service command
Parameters
Name | Type | Description |
---|---|---|
address | EBMessageAddress | The address (service name, version and command name) of the command to be de-registered |
Returns
Promise
<void
>
Implementation of
Defined in
amqpbridge/src/AmqpBridge.impl.ts:635
unregisterSubscription
▸ unregisterSubscription(address
): Promise
<void
>
Parameters
Name | Type |
---|---|
address | EBMessageAddress |
Returns
Promise
<void
>
Implementation of
EventBridge.unregisterSubscription
Defined in
amqpbridge/src/AmqpBridge.impl.ts:774
wrapInSpan
▸ wrapInSpan<F
>(name
, opts
, fn
, context?
): Promise
<F
>
Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!
This means during logging and similar the spanId of parent span is logged.
Use wrapInSpan for marking points in flow of one bigger function, but not to trace the program flow itself
Type parameters
Name |
---|
F |
Parameters
Name | Type | Description |
---|---|---|
name | string | name of span |
opts | SpanOptions | span options |
fn | (span : Span ) => Promise <F > | function te be executed in the span |
context? | Context | span context |
Returns
Promise
<F
>
return value of fn
Inherited from
EventBridgeBaseClass.wrapInSpan
Defined in
core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:49