PURISTA API / @purista/natsbridge / NatsBridge
Class: NatsBridge
Defined in: natsbridge/src/NatsBridge.ts:64
The event bridge supports brokers with and without JetStream enabled.
If JetStream is enabled, subscriptions which are marked as durable are persisted by using JetStream.
If JetStream is not available, subscription fall back to live-subscriptions without any persistence.
Example usage:
Example
* ```typescript
import { NatsBridge } from '@purista/natsbridge'
// create and init our eventbridge
const eventBridge = new NatsBridge()
await eventBridge.start()
## Extends
- [`EventBridgeBaseClass`](../../core/classes/EventBridgeBaseClass.md)\<[`NatsBridgeConfig`](../type-aliases/NatsBridgeConfig.md)\>
## Implements
- [`EventBridge`](../../core/interfaces/EventBridge.md)
## Constructors
### new NatsBridge()
> **new NatsBridge**(`config`?): [`NatsBridge`](NatsBridge.md)
Defined in: [natsbridge/src/NatsBridge.ts:76](https://github.com/puristajs/purista/blob/master/packages/natsbridge/src/NatsBridge.ts#L76)
#### Parameters
##### config?
###### commandResponsePublishTwice?
`"always"` \| `"eventOnly"` \| `"eventAndError"` \| `"never"`
Indicates if a command response should be published a second time.
If the command response gets published, it will be published to the regular topic pattern.
If set to `never`, subscription might not get messages they are expecting because of the timing.
If set to `always`, every command response is published.
Because there might not be a consumer for every message, the broker will store the messages until the `defaultMessageExpiryInterval` is reached.
This might result in a high resource consumption of the broker.
If set to `eventOnly`, only success responses which have a event name set, are published twice.
There, we expect, that an event has at least one consumer subscription and the broker does not unnecessarily stores messages for a long time.
**Default**
```ts
eventOnly
defaultCommandTimeout?
number
Overwrite the hardcoded default timeout of command invocations
defaultMessageExpiryInterval?
number
the message expiry interval in seconds
Default
30 days in seconds
emptyTopicPartString?
string
The string which should be used in topics for parts, which are undefined
Default
__none__
instanceId?
string
The instance id of the event bridge. If not set, a id will generated each time a instance is created. Use this if there is a need to always have the same instance id.
logger?
A logger instance
logLevel?
If no logger instance is given, use this log level
maxMessages?
number
maximum messages to run in parallel per subscription 10 means, each subscription can handle 10 calls at the same time
Default
10
spanProcessor?
SpanProcessor
A OpenTelemetry span processor
topicPrefix?
string
the prefix for topic to prevent name collisions
Default
purista
Returns
Overrides
EventBridgeBaseClass
.constructor
Properties
commands
commands:
Map
<string
,Subscription
>
Defined in: natsbridge/src/NatsBridge.ts:71
config
config:
Complete
<{commandResponsePublishTwice
:"always"
|"eventOnly"
|"eventAndError"
|"never"
;defaultCommandTimeout
:number
;defaultMessageExpiryInterval
:number
;emptyTopicPartString
:string
;instanceId
:string
;logger
:Logger
;logLevel
:LogLevelName
;maxMessages
:number
;spanProcessor
:SpanProcessor
;topicPrefix
:string
; }>
Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:14
Inherited from
connection
connection:
undefined
|NatsConnection
Defined in: natsbridge/src/NatsBridge.ts:65
defaultCommandTimeout
defaultCommandTimeout:
number
Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:17
The default time until when a command invocation automatically returns a time out error
Implementation of
EventBridge
.defaultCommandTimeout
Inherited from
EventBridgeBaseClass
.defaultCommandTimeout
instanceId
instanceId:
string
Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:16
Implementation of
Inherited from
EventBridgeBaseClass
.instanceId
isJetStreamEnabled
isJetStreamEnabled:
boolean
=false
Defined in: natsbridge/src/NatsBridge.ts:67
jsm
jsm:
undefined
|JetStreamManager
Defined in: natsbridge/src/NatsBridge.ts:69
logger
logger:
Logger
Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:12
Inherited from
name
name:
string
Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:15
Implementation of
Inherited from
sc
sc:
Codec
<unknown
>
Defined in: natsbridge/src/NatsBridge.ts:74
subscriptions
subscriptions:
Map
<string
,Subscription
>
Defined in: natsbridge/src/NatsBridge.ts:72
traceProvider
traceProvider:
NodeTracerProvider
Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:13
Inherited from
EventBridgeBaseClass
.traceProvider
Methods
destroy()
destroy():
Promise
<void
>
Defined in: natsbridge/src/NatsBridge.ts:392
Shut down event bridge as gracefully as possible
Returns
Promise
<void
>
Implementation of
Overrides
emit()
emit<
K
>(eventName
,parameter
?):void
Defined in: core/dist/commonjs/core/types/GenericEventEmitter.d.ts:13
Type Parameters
• K extends EventKey
<{ [key:
custom-${string}]
: unknown
; [key:
adapter-${string}]
: unknown
; eventbridge-connected
: never
; eventbridge-connection-error
: unknown
; eventbridge-disconnected
: never
; eventbridge-error
: unknown
; eventbridge-reconnecting
: never
; }>
Parameters
eventName
K
parameter?
object
[K
]
Returns
void
Inherited from
emitMessage()
emitMessage<
T
>(message
,contentType
,contentEncoding
):Promise
<Readonly
<EBMessage
>>
Defined in: natsbridge/src/NatsBridge.ts:106
Emit a message to the eventbridge without awaiting a result
Type Parameters
• T extends EBMessage
Parameters
message
Omit
<EBMessage
, "id"
| "timestamp"
| "correlationId"
>
the message
contentType
string
= 'application/json'
contentEncoding
string
= 'utf-8'
Returns
Promise
<Readonly
<EBMessage
>>
Implementation of
getTracer()
getTracer():
Tracer
Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:24
Returns open telemetry tracer of this service
Returns
Tracer
Tracer
Inherited from
EventBridgeBaseClass
.getTracer
invoke()
invoke<
T
>(input
,commandTimeout
):Promise
<T
>
Defined in: natsbridge/src/NatsBridge.ts:179
Call a command of a service and return the result of this command
Type Parameters
• T
Parameters
input
Omit
<{ contentEncoding
: string
; contentType
: string
; correlationId
: string
; eventName
: string
; id
: string
; messageType
: Command
; otp
: string
; payload
: { parameter
: unknown
; payload
: unknown
; }; principalId
: string
; receiver
: EBMessageAddress
; sender
: { instanceId
: string
; serviceName
: string
; serviceTarget
: string
; serviceVersion
: string
; }; tenantId
: string
; timestamp
: number
; traceId
: string
; }, "id"
| "timestamp"
| "correlationId"
| "messageType"
>
a partial command message
commandTimeout
number
= ...
the time to live (timeout) of the invocation
Returns
Promise
<T
>
Implementation of
isHealthy()
isHealthy():
Promise
<boolean
>
Defined in: natsbridge/src/NatsBridge.ts:102
Indicates if the eventbridge is running and works correctly
Returns
Promise
<boolean
>
Implementation of
isReady()
isReady():
Promise
<boolean
>
Defined in: natsbridge/src/NatsBridge.ts:98
Indicates if the eventbridge has been started and is connected to underlaying message broker
Returns
Promise
<boolean
>
Implementation of
off()
off<
K
>(eventName
,fn
):void
Defined in: core/dist/commonjs/core/types/GenericEventEmitter.d.ts:12
Type Parameters
• K extends EventKey
<{ [key:
custom-${string}]
: unknown
; [key:
adapter-${string}]
: unknown
; eventbridge-connected
: never
; eventbridge-connection-error
: unknown
; eventbridge-disconnected
: never
; eventbridge-error
: unknown
; eventbridge-reconnecting
: never
; }>
Parameters
eventName
K
fn
EventReceiver
<object
[K
]>
Returns
void
Inherited from
on()
on<
K
>(eventName
,fn
):void
Defined in: core/dist/commonjs/core/types/GenericEventEmitter.d.ts:11
Type Parameters
• K extends EventKey
<{ [key:
custom-${string}]
: unknown
; [key:
adapter-${string}]
: unknown
; eventbridge-connected
: never
; eventbridge-connection-error
: unknown
; eventbridge-disconnected
: never
; eventbridge-error
: unknown
; eventbridge-reconnecting
: never
; }>
Parameters
eventName
K
fn
EventReceiver
<object
[K
]>
Returns
void
Inherited from
registerCommand()
registerCommand(
address
,cb
,metadata
,eventBridgeConfig
):Promise
<string
>
Defined in: natsbridge/src/NatsBridge.ts:322
Parameters
address
the address of the service command (service name, version and command name)
cb
(message
) => Promise
<{ contentEncoding
: "utf-8"
; contentType
: "application/json"
; correlationId
: string
; eventName
: string
; id
: string
; isHandledError
: boolean
; messageType
: CommandErrorResponse
; otp
: string
; payload
: { data
: unknown
; message
: string
; status
: StatusCode
; }; principalId
: string
; receiver
: { instanceId
: string
; serviceName
: string
; serviceTarget
: string
; serviceVersion
: string
; }; sender
: { instanceId
: string
; serviceName
: string
; serviceTarget
: string
; serviceVersion
: string
; }; tenantId
: string
; timestamp
: number
; traceId
: string
; } | { contentEncoding
: string
; contentType
: string
; correlationId
: string
; eventName
: string
; id
: string
; messageType
: CommandSuccessResponse
; otp
: string
; payload
: unknown
; principalId
: string
; receiver
: { instanceId
: string
; serviceName
: string
; serviceTarget
: string
; serviceVersion
: string
; }; sender
: { instanceId
: string
; serviceName
: string
; serviceTarget
: string
; serviceVersion
: string
; }; tenantId
: string
; timestamp
: number
; traceId
: string
; }>
the function to be called if a matching command arrives
metadata
eventBridgeConfig
Returns
Promise
<string
>
Implementation of
registerSubscription()
registerSubscription(
subscription
,cb
):Promise
<string
>
Defined in: natsbridge/src/NatsBridge.ts:363
Register a new subscription
Parameters
subscription
the subscription definition
cb
(message
) => Promise
<undefined
| Omit
<{ contentEncoding
: string
; contentType
: string
; correlationId
: string
; eventName
: string
; id
: string
; messageType
: CustomMessage
; otp
: string
; payload
: unknown
; principalId
: string
; receiver
: EBMessageAddress
; sender
: { instanceId
: string
; serviceName
: string
; serviceTarget
: string
; serviceVersion
: string
; }; tenantId
: string
; timestamp
: number
; traceId
: string
; }, "id"
| "timestamp"
>>
the function to be called if a matching message arrives
Returns
Promise
<string
>
Implementation of
EventBridge
.registerSubscription
removeAllListeners()
removeAllListeners():
void
Defined in: core/dist/commonjs/core/types/GenericEventEmitter.d.ts:14
Returns
void
Inherited from
EventBridgeBaseClass
.removeAllListeners
start()
start():
Promise
<void
>
Defined in: natsbridge/src/NatsBridge.ts:85
Start the eventbridge and connect to the underlaying message broker
Returns
Promise
<void
>
Implementation of
Overrides
startActiveSpan()
startActiveSpan<
F
>(name
,opts
,context
,fn
):Promise
<F
>
Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:33
Start a child span for opentelemetry tracking
Type Parameters
• F
Parameters
name
string
name of span
opts
SpanOptions
span options
context
optional context
undefined
| Context
fn
(span
) => Promise
<F
>
function to be executed within the span
Returns
Promise
<F
>
return value of fn
Inherited from
EventBridgeBaseClass
.startActiveSpan
unregisterCommand()
unregisterCommand(
address
):Promise
<void
>
Defined in: natsbridge/src/NatsBridge.ts:352
Unregister a service command
Parameters
address
The address (service name, version and command name) of the command to be de-registered
Returns
Promise
<void
>
Implementation of
unregisterSubscription()
unregisterSubscription(
address
):Promise
<void
>
Defined in: natsbridge/src/NatsBridge.ts:379
Parameters
address
Returns
Promise
<void
>
Implementation of
EventBridge
.unregisterSubscription
wrapInSpan()
wrapInSpan<
F
>(name
,opts
,fn
,context
?):Promise
<F
>
Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:49
Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!
This means during logging and similar the spanId of parent span is logged.
Use wrapInSpan for marking points in flow of one bigger function, but not to trace the program flow itself
Type Parameters
• F
Parameters
name
string
name of span
opts
SpanOptions
span options
fn
(span
) => Promise
<F
>
function te be executed in the span
context?
Context
span context
Returns
Promise
<F
>
return value of fn