Class: NatsBridge
PURISTA API / Modules / @purista/natsbridge / NatsBridge
Class: NatsBridge
@purista/natsbridge.NatsBridge
The event bridge supports brokers with and without JetStream enabled.
If JetStream is enabled, subscriptions which are marked as durable are persisted by using JetStream.
If JetStream is not available, subscription fall back to live-subscriptions without any persistence.
Example usage:
Example
import { NatsBridge } from '@purista/natsbridge'
// create and init our eventbridge
const eventBridge = new NatsBridge()
await eventBridge.start()
Hierarchy
EventBridgeBaseClass
<NatsBridgeConfig
>↳
NatsBridge
Implements
EventBridge
Table of contents
Constructors
Properties
- commands
- config
- connection
- defaultCommandTimeout
- instanceId
- isJetStreamEnabled
- jsm
- logger
- name
- sc
- subscriptions
- traceProvider
Methods
- destroy
- emit
- emitMessage
- getTracer
- invoke
- isHealthy
- isReady
- off
- on
- registerCommand
- registerSubscription
- removeAllListeners
- start
- startActiveSpan
- unregisterCommand
- unregisterSubscription
- wrapInSpan
Constructors
constructor
• new NatsBridge(config?
): NatsBridge
Parameters
Name | Type | Description |
---|---|---|
config? | Object | - |
config.commandResponsePublishTwice? | "always" | "eventOnly" | "eventAndError" | "never" | Indicates if a command response should be published a second time. If the command response gets published, it will be published to the regular topic pattern. If set to never , subscription might not get messages they are expecting because of the timing. If set to always , every command response is published. Because there might not be a consumer for every message, the broker will store the messages until the defaultMessageExpiryInterval is reached. This might result in a high ressource consumption of the broker. If set to eventOnly , only success responses which have a event name set, are published twice. There, we expect, that an event has at least one consumer subscription and the broker does not unnecessarily stores messages for a long time. Default ts eventOnly |
config.defaultCommandTimeout? | number | Overwrite the hardcoded default timeout of command invocations |
config.defaultMessageExpiryInterval? | number | the message expiry interval in seconds Default ts 30 days in seconds |
config.emptyTopicPartString? | string | The string which should be used in topics for parts, which are undefined Default ts __none__ |
config.instanceId? | string | The instance id of the event bridge. If not set, a id will generated each time a instance is created. Use this if there is a need to always have the same instance id. |
config.logLevel? | LogLevelName | If no logger instance is given, use this log level |
config.logger? | Logger | A logger instance |
config.maxMessages? | number | maximum messages to run in parallel per subscription 10 means, each subscription can handle 10 calls at the same time Default ts 10 |
config.spanProcessor? | SpanProcessor | A OpenTelemetry span processor |
config.topicPrefix? | string | the prefix for topic to prevent name collisions Default ts purista |
Returns
Overrides
EventBridgeBaseClass<NatsBridgeConfig>.constructor
Defined in
natsbridge/src/NatsBridge.ts:81
Properties
commands
• commands: Map
<string
, Subscription
>
Defined in
natsbridge/src/NatsBridge.ts:76
config
• config: Complete
<{ commandResponsePublishTwice
: "always"
| "eventOnly"
| "eventAndError"
| "never"
; defaultCommandTimeout?
: number
; defaultMessageExpiryInterval
: number
; emptyTopicPartString
: string
; instanceId?
: string
; logLevel?
: LogLevelName
; logger?
: Logger
; maxMessages
: number
; spanProcessor?
: SpanProcessor
; topicPrefix
: string
}>
Inherited from
EventBridgeBaseClass.config
Defined in
core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:13
connection
• connection: undefined
| NatsConnection
Defined in
natsbridge/src/NatsBridge.ts:70
defaultCommandTimeout
• defaultCommandTimeout: number
Implementation of
EventBridge.defaultCommandTimeout
Inherited from
EventBridgeBaseClass.defaultCommandTimeout
Defined in
core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:16
instanceId
• instanceId: string
Implementation of
EventBridge.instanceId
Inherited from
EventBridgeBaseClass.instanceId
Defined in
core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:15
isJetStreamEnabled
• isJetStreamEnabled: boolean
= false
Defined in
natsbridge/src/NatsBridge.ts:72
jsm
• jsm: undefined
| JetStreamManager
Defined in
natsbridge/src/NatsBridge.ts:74
logger
• logger: Logger
Inherited from
EventBridgeBaseClass.logger
Defined in
core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:11
name
• name: string
Implementation of
EventBridge.name
Inherited from
EventBridgeBaseClass.name
Defined in
core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:14
sc
• sc: Codec
<unknown
>
Defined in
natsbridge/src/NatsBridge.ts:79
subscriptions
• subscriptions: Map
<string
, Subscription
>
Defined in
natsbridge/src/NatsBridge.ts:77
traceProvider
• traceProvider: NodeTracerProvider
Inherited from
EventBridgeBaseClass.traceProvider
Defined in
core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:12
Methods
destroy
▸ destroy(): Promise
<void
>
Returns
Promise
<void
>
Implementation of
EventBridge.destroy
Overrides
EventBridgeBaseClass.destroy
Defined in
natsbridge/src/NatsBridge.ts:396
emit
▸ emit<K
>(eventName
, parameter?
): void
Type parameters
Name | Type |
---|---|
K | extends EventKey <{ eventbridge-connected : never ; eventbridge-connection-error : unknown ; eventbridge-disconnected : never ; eventbridge-error : unknown ; eventbridge-reconnecting : never }> |
Parameters
Name | Type |
---|---|
eventName | K |
parameter? | { eventbridge-connected : never ; eventbridge-connection-error : unknown ; eventbridge-disconnected : never ; eventbridge-error : unknown ; eventbridge-reconnecting : never }[K ] |
Returns
void
Inherited from
EventBridgeBaseClass.emit
Defined in
core/lib/types/core/types/GenericEventEmitter.d.ts:13
emitMessage
▸ emitMessage<T
>(message
, contentType?
, contentEncoding?
): Promise
<Readonly
<EBMessage
>>
Type parameters
Name | Type |
---|---|
T | extends EBMessage |
Parameters
Name | Type | Default value |
---|---|---|
message | Omit <EBMessage , "id" | "timestamp" | "correlationId" > | undefined |
contentType | string | 'application/json' |
contentEncoding | string | 'utf-8' |
Returns
Promise
<Readonly
<EBMessage
>>
Implementation of
EventBridge.emitMessage
Defined in
natsbridge/src/NatsBridge.ts:111
getTracer
▸ getTracer(): Tracer
Returns open telemetry tracer of this service
Returns
Tracer
Tracer
Inherited from
EventBridgeBaseClass.getTracer
Defined in
core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:23
invoke
▸ invoke<T
>(input
, commandTimeout?
): Promise
<T
>
Type parameters
Name |
---|
T |
Parameters
Name | Type |
---|---|
input | Omit <{ contentEncoding : string ; contentType : string ; correlationId : string ; eventName? : string ; id : string ; messageType : Command ; otp? : string ; payload : { parameter : unknown ; payload : unknown } ; principalId? : string ; receiver : EBMessageAddress ; sender : { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId? : string ; timestamp : number ; traceId? : string }, "id" | "timestamp" | "correlationId" | "messageType" > |
commandTimeout | number |
Returns
Promise
<T
>
Implementation of
EventBridge.invoke
Defined in
natsbridge/src/NatsBridge.ts:182
isHealthy
▸ isHealthy(): Promise
<boolean
>
Returns
Promise
<boolean
>
Implementation of
EventBridge.isHealthy
Defined in
natsbridge/src/NatsBridge.ts:107
isReady
▸ isReady(): Promise
<boolean
>
Returns
Promise
<boolean
>
Implementation of
EventBridge.isReady
Defined in
natsbridge/src/NatsBridge.ts:103
off
▸ off<K
>(eventName
, fn
): void
Type parameters
Name | Type |
---|---|
K | extends EventKey <{ eventbridge-connected : never ; eventbridge-connection-error : unknown ; eventbridge-disconnected : never ; eventbridge-error : unknown ; eventbridge-reconnecting : never }> |
Parameters
Name | Type |
---|---|
eventName | K |
fn | EventReceiver <{ eventbridge-connected : never ; eventbridge-connection-error : unknown ; eventbridge-disconnected : never ; eventbridge-error : unknown ; eventbridge-reconnecting : never }[K ]> |
Returns
void
Inherited from
EventBridgeBaseClass.off
Defined in
core/lib/types/core/types/GenericEventEmitter.d.ts:12
on
▸ on<K
>(eventName
, fn
): void
Type parameters
Name | Type |
---|---|
K | extends EventKey <{ eventbridge-connected : never ; eventbridge-connection-error : unknown ; eventbridge-disconnected : never ; eventbridge-error : unknown ; eventbridge-reconnecting : never }> |
Parameters
Name | Type |
---|---|
eventName | K |
fn | EventReceiver <{ eventbridge-connected : never ; eventbridge-connection-error : unknown ; eventbridge-disconnected : never ; eventbridge-error : unknown ; eventbridge-reconnecting : never }[K ]> |
Returns
void
Inherited from
EventBridgeBaseClass.on
Defined in
core/lib/types/core/types/GenericEventEmitter.d.ts:11
registerCommand
▸ registerCommand(address
, cb
, metadata
, eventBridgeConfig
): Promise
<string
>
Parameters
Name | Type |
---|---|
address | EBMessageAddress |
cb | (message : { contentEncoding : string ; contentType : string ; correlationId : string ; eventName? : string ; id : string ; messageType : Command ; otp? : string ; payload : { parameter : unknown ; payload : unknown } ; principalId? : string ; receiver : EBMessageAddress ; sender : { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId? : string ; timestamp : number ; traceId? : string }) => Promise <{ contentEncoding : "utf-8" ; contentType : "application/json" ; correlationId : string ; eventName? : string ; id : string ; isHandledError : boolean ; messageType : CommandErrorResponse ; otp? : string ; payload : { data? : unknown ; message : string ; status : StatusCode } ; principalId? : string ; receiver : { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; sender : { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId? : string ; timestamp : number ; traceId? : string } | { contentEncoding : string ; contentType : string ; correlationId : string ; eventName? : string ; id : string ; messageType : CommandSuccessResponse ; otp? : string ; payload : unknown ; principalId? : string ; receiver : { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; sender : { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId? : string ; timestamp : number ; traceId? : string }> |
metadata | CommandDefinitionMetadataBase |
eventBridgeConfig | DefinitionEventBridgeConfig |
Returns
Promise
<string
>
Implementation of
EventBridge.registerCommand
Defined in
natsbridge/src/NatsBridge.ts:326
registerSubscription
▸ registerSubscription(subscription
, cb
): Promise
<string
>
Parameters
Name | Type |
---|---|
subscription | Subscription |
cb | (message : EBMessage ) => Promise <undefined | Omit <{ contentEncoding : string ; contentType : string ; correlationId? : string ; eventName : string ; id : string ; messageType : CustomMessage ; otp? : string ; payload? : unknown ; principalId? : string ; receiver? : EBMessageAddress ; sender : { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId? : string ; timestamp : number ; traceId? : string }, "id" | "timestamp" >> |
Returns
Promise
<string
>
Implementation of
EventBridge.registerSubscription
Defined in
natsbridge/src/NatsBridge.ts:367
removeAllListeners
▸ removeAllListeners(): void
Returns
void
Inherited from
EventBridgeBaseClass.removeAllListeners
Defined in
core/lib/types/core/types/GenericEventEmitter.d.ts:14
start
▸ start(): Promise
<void
>
Returns
Promise
<void
>
Implementation of
EventBridge.start
Overrides
EventBridgeBaseClass.start
Defined in
natsbridge/src/NatsBridge.ts:90
startActiveSpan
▸ startActiveSpan<F
>(name
, opts
, context
, fn
): Promise
<F
>
Start a child span for opentelemetry tracking
Type parameters
Name |
---|
F |
Parameters
Name | Type | Description |
---|---|---|
name | string | name of span |
opts | SpanOptions | span options |
context | undefined | Context | optional context |
fn | (span : Span ) => Promise <F > | function to be executed within the span |
Returns
Promise
<F
>
return value of fn
Inherited from
EventBridgeBaseClass.startActiveSpan
Defined in
core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:32
unregisterCommand
▸ unregisterCommand(address
): Promise
<void
>
Parameters
Name | Type |
---|---|
address | EBMessageAddress |
Returns
Promise
<void
>
Implementation of
EventBridge.unregisterCommand
Defined in
natsbridge/src/NatsBridge.ts:356
unregisterSubscription
▸ unregisterSubscription(address
): Promise
<void
>
Parameters
Name | Type |
---|---|
address | EBMessageAddress |
Returns
Promise
<void
>
Implementation of
EventBridge.unregisterSubscription
Defined in
natsbridge/src/NatsBridge.ts:383
wrapInSpan
▸ wrapInSpan<F
>(name
, opts
, fn
, context?
): Promise
<F
>
Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!
This means during logging and similar the spanId of parent span is logged.
Use wrapInSpan for marking points in flow of one bigger function, but not to trace the program flow itself
Type parameters
Name |
---|
F |
Parameters
Name | Type | Description |
---|---|---|
name | string | name of span |
opts | SpanOptions | span options |
fn | (span : Span ) => Promise <F > | function te be executed in the span |
context? | Context | span context |
Returns
Promise
<F
>
return value of fn
Inherited from
EventBridgeBaseClass.wrapInSpan
Defined in
core/lib/types/core/EventBridge/EventBridgeBaseClass.impl.d.ts:48