PURISTA API / @purista/natsbridge / NatsBridge
Class: NatsBridge
Defined in: natsbridge/src/NatsBridge.ts:114
The event bridge supports low-latency core NATS messaging.
When JetStream is available, durable command and subscription registrations use JetStream consumers. Without JetStream, durable requests fail fast by default (durableSubscriptionMode: 'strict') instead of silently degrading to non-durable core NATS semantics.
Example usage:
Example
* ```typescript
import { NatsBridge } from '@purista/natsbridge'
// create and init our eventbridge
const eventBridge = new NatsBridge()
await eventBridge.start()
## Extends
- [`EventBridgeBaseClass`](../../core/classes/EventBridgeBaseClass.md)\<[`NatsBridgeConfig`](../type-aliases/NatsBridgeConfig.md)\>
## Implements
- [`EventBridge`](../../core/interfaces/EventBridge.md)
## Constructors
### Constructor
> **new NatsBridge**(`config?`): `NatsBridge`
Defined in: [natsbridge/src/NatsBridge.ts:128](https://github.com/puristajs/purista/blob/master/packages/natsbridge/src/NatsBridge.ts#L128)
#### Parameters
##### config?
###### commandResponsePublishTwice?
`"always"` \| `"eventOnly"` \| `"eventAndError"` \| `"never"`
Indicates if a command response should be published a second time.
If the command response gets published, it will be published to the regular topic pattern.
If set to `never`, subscription might not get messages they are expecting because of the timing.
If set to `always`, every command response is published.
Because there might not be a consumer for every message, the broker will store the messages until the `defaultMessageExpiryInterval` is reached.
This might result in a high resource consumption of the broker.
If set to `eventOnly`, only success responses which have a event name set, are published twice.
There, we expect, that an event has at least one consumer subscription and the broker does not unnecessarily stores messages for a long time.
**Default**
```ts
eventOnlydefaultCommandTimeout?
number
Overwrite the hardcoded default timeout of command invocations
defaultConsumerFailureHandling?
NatsConsumerFailureHandlingDefaults
Default failure handling for JetStream-backed subscription consumers. Per-subscription consumer failure handling hints override these values.
defaultMessageExpiryInterval?
number
the message expiry interval in seconds
Default
30 days in secondsdurableSubscriptionMode?
"strict" | "best-effort"
Controls how durable registrations behave when JetStream durability is not implemented.
Default
strictemptyTopicPartString?
string
The string which should be used in topics for parts, which are undefined
Default
__none__instanceId?
string
The instance id of the event bridge. If not set, a id will generated each time a instance is created. Use this if there is a need to always have the same instance id.
jetStreamAckWaitMs?
number
JetStream consumer ack wait in milliseconds for command and subscription consumers. This is a broker-level processing timeout used for redelivery when no ack/nak/term is sent.
Default
30000logger?
logLevel?
maxMessages?
number
maximum messages to run in parallel per subscription 10 means, each subscription can handle 10 calls at the same time
Default
10spanProcessor?
SpanProcessor
topicPrefix?
string
the prefix for topic to prevent name collisions
Default
puristaReturns
NatsBridge
Overrides
EventBridgeBaseClass.constructor
Properties
capabilities
capabilities:
EventBridgeCapabilities
Defined in: core/dist/esm/core/EventBridge/EventBridgeBaseClass.impl.d.ts:26
Implementation of
Inherited from
EventBridgeBaseClass.capabilities
commands
commands:
Map<string,JetStreamSubscription|Subscription>
Defined in: natsbridge/src/NatsBridge.ts:122
config
config:
Complete<EventBridgeConfig<ConfigType>>
Defined in: core/dist/esm/core/EventBridge/EventBridgeBaseClass.impl.d.ts:24
Inherited from
connection
connection:
NatsConnection|undefined
Defined in: natsbridge/src/NatsBridge.ts:115
defaultCommandTimeout
defaultCommandTimeout:
number
Defined in: core/dist/esm/core/EventBridge/EventBridgeBaseClass.impl.d.ts:28
The default time until when a command invocation automatically returns a time out error
Implementation of
EventBridge.defaultCommandTimeout
Inherited from
EventBridgeBaseClass.defaultCommandTimeout
inFlightExecutions
protectedreadonlyinFlightExecutions:InFlightExecutionTracker
Defined in: core/dist/esm/core/EventBridge/EventBridgeBaseClass.impl.d.ts:29
Inherited from
EventBridgeBaseClass.inFlightExecutions
instanceId
instanceId:
string
Defined in: core/dist/esm/core/EventBridge/EventBridgeBaseClass.impl.d.ts:27
Implementation of
Inherited from
EventBridgeBaseClass.instanceId
isJetStreamEnabled
isJetStreamEnabled:
boolean=false
Defined in: natsbridge/src/NatsBridge.ts:117
js
js:
JetStreamClient|undefined
Defined in: natsbridge/src/NatsBridge.ts:120
jsm
jsm:
JetStreamManager|undefined
Defined in: natsbridge/src/NatsBridge.ts:119
logger
logger:
Logger
Defined in: core/dist/esm/core/EventBridge/EventBridgeBaseClass.impl.d.ts:22
Inherited from
name
name:
string
Defined in: core/dist/esm/core/EventBridge/EventBridgeBaseClass.impl.d.ts:25
Implementation of
Inherited from
sc
sc:
Codec<unknown>
Defined in: natsbridge/src/NatsBridge.ts:126
subscriptions
subscriptions:
Map<string,RegisteredSubscription>
Defined in: natsbridge/src/NatsBridge.ts:123
traceProvider
traceProvider:
NodeTracerProvider
Defined in: core/dist/esm/core/EventBridge/EventBridgeBaseClass.impl.d.ts:23
Inherited from
EventBridgeBaseClass.traceProvider
Methods
destroy()
destroy():
Promise<void>
Defined in: natsbridge/src/NatsBridge.ts:859
Shut down event bridge as gracefully as possible
Returns
Promise<void>
Implementation of
Overrides
emitMessage()
emitMessage<
T>(message,contentType?,contentEncoding?):Promise<Readonly<EBMessage>>
Defined in: natsbridge/src/NatsBridge.ts:526
Emit a message to the eventbridge without awaiting a result
Type Parameters
T
T extends EBMessage
Parameters
message
Omit<EBMessage, "id" | "timestamp" | "correlationId">
the message
contentType?
string = 'application/json'
contentEncoding?
string = 'utf-8'
Returns
Promise<Readonly<EBMessage>>
Implementation of
getInFlightExecutionCount()
getInFlightExecutionCount():
number
Defined in: core/dist/esm/core/EventBridge/EventBridgeBaseClass.impl.d.ts:66
Number of currently running handlers across all work kinds.
Returns
number
Implementation of
EventBridge.getInFlightExecutionCount
Inherited from
EventBridgeBaseClass.getInFlightExecutionCount
getInFlightExecutionCounts()
getInFlightExecutionCounts():
InFlightExecutionCounts
Defined in: core/dist/esm/core/EventBridge/EventBridgeBaseClass.impl.d.ts:67
Number of currently running handlers grouped by work kind.
Returns
Implementation of
EventBridge.getInFlightExecutionCounts
Inherited from
EventBridgeBaseClass.getInFlightExecutionCounts
getPausedSubscriptionConsumers()
getPausedSubscriptionConsumers():
object
Defined in: natsbridge/src/NatsBridge.ts:847
Returns paused subscription consumer states keyed by adapter registration key.
Returns
object
Implementation of
EventBridge.getPausedSubscriptionConsumers
Overrides
EventBridgeBaseClass.getPausedSubscriptionConsumers
getTracer()
getTracer():
Tracer
Defined in: core/dist/esm/core/EventBridge/EventBridgeBaseClass.impl.d.ts:36
Returns open telemetry tracer of this service
Returns
Tracer
Tracer
Inherited from
EventBridgeBaseClass.getTracer
invoke()
invoke<
T>(input,commandTimeout?):Promise<T>
Defined in: natsbridge/src/NatsBridge.ts:599
Call a command of a service and return the result of this command
Type Parameters
T
T
Parameters
input
Omit<Command, "id" | "messageType" | "timestamp" | "correlationId">
a partial command message
commandTimeout?
number = ...
the time to live (timeout) of the invocation
Returns
Promise<T>
Implementation of
isHealthy()
isHealthy():
Promise<boolean>
Defined in: natsbridge/src/NatsBridge.ts:522
Indicates if the eventbridge is running and works correctly
Returns
Promise<boolean>
Implementation of
isReady()
isReady():
Promise<boolean>
Defined in: natsbridge/src/NatsBridge.ts:518
Indicates if the eventbridge has been started and is connected to underlaying message broker
Returns
Promise<boolean>
Implementation of
openStream()
openStream<
Chunk,Final>(_input,_ttl?):Promise<StreamHandle<Chunk,Final>>
Defined in: core/dist/esm/core/EventBridge/EventBridgeBaseClass.impl.d.ts:70
Open a stream invocation. The returned handle can be consumed via async iteration and can be cancelled by caller.
Type Parameters
Chunk
Chunk = unknown
Final
Final = unknown
Parameters
_input
Omit<StreamOpenRequest, "id" | "messageType" | "timestamp" | "correlationId">
_ttl?
number
Returns
Promise<StreamHandle<Chunk, Final>>
Implementation of
Inherited from
EventBridgeBaseClass.openStream
registerCommand()
registerCommand(
address,cb,metadata,eventBridgeConfig):Promise<string>
Defined in: natsbridge/src/NatsBridge.ts:741
Parameters
address
the address of the service command (service name, version and command name)
cb
(message) => Promise<{ contentEncoding: "utf-8"; contentType: "application/json"; correlationId: string; eventName?: string; id: string; isHandledError: boolean; messageType: CommandErrorResponse; otp?: string; payload: { data?: unknown; message: string; status: StatusCode; }; principalId?: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; } | { contentEncoding: string; contentType: string; correlationId: string; eventName?: string; id: string; messageType: CommandSuccessResponse; otp?: string; payload: unknown; principalId?: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; }>
the function to be called if a matching command arrives
metadata
eventBridgeConfig
Returns
Promise<string>
Implementation of
registerStream()
registerStream(
_address,_cb,_metadata,_eventBridgeConfig):Promise<string>
Defined in: core/dist/esm/core/EventBridge/EventBridgeBaseClass.impl.d.ts:71
Register a service stream.
Parameters
_address
_cb
(message) => Promise<void>
_metadata
_eventBridgeConfig
Returns
Promise<string>
Implementation of
Inherited from
EventBridgeBaseClass.registerStream
registerSubscription()
registerSubscription(
subscription,cb):Promise<string>
Defined in: natsbridge/src/NatsBridge.ts:797
Register a new subscription
Parameters
subscription
the subscription definition
cb
(message) => Promise<Omit<{ contentEncoding: string; contentType: string; correlationId?: string; eventName: string; id: string; messageType: CustomMessage; otp?: string; payload?: unknown; principalId?: string; receiver?: EBMessageAddress; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; }, "id" | "timestamp"> | undefined>
the function to be called if a matching message arrives
Returns
Promise<string>
Implementation of
EventBridge.registerSubscription
resumeSubscriptionConsumer()
resumeSubscriptionConsumer(
registrationKey):Promise<void>
Defined in: natsbridge/src/NatsBridge.ts:851
Resumes a paused subscription consumer by registration key.
Parameters
registrationKey
string
Returns
Promise<void>
Implementation of
EventBridge.resumeSubscriptionConsumer
Overrides
EventBridgeBaseClass.resumeSubscriptionConsumer
runInFlight()
runInFlight<
T>(fn,kind?):Promise<T>
Defined in: core/dist/esm/core/EventBridge/EventBridgeBaseClass.impl.d.ts:64
Type Parameters
T
T
Parameters
fn
() => Promise<T>
kind?
"command" | "subscription" | "stream" | "generic"
Returns
Promise<T>
Inherited from
EventBridgeBaseClass.runInFlight
start()
start():
Promise<void>
Defined in: natsbridge/src/NatsBridge.ts:500
Start the eventbridge and connect to the underlaying message broker
Returns
Promise<void>
Implementation of
Overrides
startActiveSpan()
startActiveSpan<
F>(name,opts,context,fn):Promise<F>
Defined in: core/dist/esm/core/EventBridge/EventBridgeBaseClass.impl.d.ts:45
Start a child span for opentelemetry tracking
Type Parameters
F
F
Parameters
name
string
name of span
opts
SpanOptions
span options
context
optional context
Context | undefined
fn
(span) => Promise<F>
function to be executed within the span
Returns
Promise<F>
return value of fn
Inherited from
EventBridgeBaseClass.startActiveSpan
unregisterCommand()
unregisterCommand(
address):Promise<void>
Defined in: natsbridge/src/NatsBridge.ts:779
Unregister a service command
Parameters
address
The address (service name, version and command name) of the command to be de-registered
Returns
Promise<void>
Implementation of
unregisterStream()
unregisterStream(
_address):Promise<void>
Defined in: core/dist/esm/core/EventBridge/EventBridgeBaseClass.impl.d.ts:72
Unregister a service stream
Parameters
_address
Returns
Promise<void>
Implementation of
Inherited from
EventBridgeBaseClass.unregisterStream
unregisterSubscription()
unregisterSubscription(
address):Promise<void>
Defined in: natsbridge/src/NatsBridge.ts:828
Parameters
address
Returns
Promise<void>
Implementation of
EventBridge.unregisterSubscription
waitForInFlightDrain()
waitForInFlightDrain(
timeoutMs?):Promise<boolean>
Defined in: core/dist/esm/core/EventBridge/EventBridgeBaseClass.impl.d.ts:65
Parameters
timeoutMs?
number
Returns
Promise<boolean>
Inherited from
EventBridgeBaseClass.waitForInFlightDrain
wrapInSpan()
wrapInSpan<
F>(name,opts,fn,context?):Promise<F>
Defined in: core/dist/esm/core/EventBridge/EventBridgeBaseClass.impl.d.ts:61
Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!
This means during logging and similar the spanId of parent span is logged.
Use wrapInSpan for marking points in flow of one bigger function, but not to trace the program flow itself
Type Parameters
F
F
Parameters
name
string
name of span
opts
SpanOptions
span options
fn
(span) => Promise<F>
function te be executed in the span
context?
Context
span context
Returns
Promise<F>
return value of fn
