Classes · @purista/natsbridge

NatsBridge

EventBridge implementation for NATS core messaging with optional JetStream.

Signature

NatsBridge.ts typescript
class NatsBridge

Examples

example-1.ts typescript
import { NatsBridge } from '@purista/natsbridge'

const eventBridge = new NatsBridge({
  servers: 'nats:___PH0___
  durableSubscriptionMode: 'strict',
})

await eventBridge.start()

Constructors

1 entry

constructor

Constructor

Source
constructor.ts typescript
new constructor(config?: { commandResponsePublishTwice: "always" | "eventOnly" | "eventAndError" | "never"; defaultCommandTimeout: number; defaultConsumerFailureHandling: NatsConsumerFailureHandlingDefaults; defaultMessageExpiryInterval: number; durableSubscriptionMode: "strict" | "best-effort"; emptyTopicPartString: string; ... })

Creates a NATS bridge with defaults merged into NATS connection options.

Properties

16 entries

capabilities

Property

capabilities.ts typescript
capabilities: EventBridgeCapabilities

Runtime capability matrix used for strict startup validation.

commands

Property

Source
commands.ts typescript
commands: Map<string, JetStreamSubscription | Subscription>

Registered command subscriptions keyed by PURISTA service address.

config

Property

config.ts typescript
config: Complete<EventBridgeConfig<ConfigType>>

connection

Property

Source
connection.ts typescript
connection: NatsConnection | undefined

Active NATS connection, available after start.

defaultCommandTimeout

Property

defaultCommandTimeout.ts typescript
defaultCommandTimeout: number

The default time until when a command invocation automatically returns a time out error

inFlightExecutions

Property

inFlightExecutions.ts typescript
inFlightExecutions: InFlightExecutionTracker

instanceId

Property

instanceId.ts typescript
instanceId: string

Stable runtime instance id used to distinguish bridge processes.

isJetStreamEnabled

Property

Source
isJetStreamEnabled.ts typescript
isJetStreamEnabled: boolean

Indicates whether the connected broker reports JetStream support.

js

Property

Source
js.ts typescript
js: JetStreamClient | undefined

JetStream client used for durable publications and subscriptions.

jsm

Property

Source
jsm.ts typescript
jsm: JetStreamManager | undefined

JetStream manager used for stream and consumer provisioning.

logger

Property

logger.ts typescript
logger: Logger

metricsRecorder

Property

metricsRecorder.ts typescript
metricsRecorder: PuristaMetricsRecorder

name

Property

name.ts typescript
name: string

Human-readable bridge name used in logs, traces, and metrics.

sc

Property

Source
sc.ts typescript
sc: Codec<unknown>

JSON codec used for NATS message payload serialization.

subscriptions

Property

Source
subscriptions.ts typescript
subscriptions: Map<string, RegisteredSubscription>

Registered event subscriptions keyed by PURISTA subscriber address.

traceProvider

Property

traceProvider.ts typescript
traceProvider: NodeTracerProvider

Methods

22 entries

destroy

Method

Source
destroy.ts typescript
destroy(): Promise<void>

Waits for in-flight handlers, drains subscriptions, and closes the NATS connection.

emitMessage

Method

Source
emitMessage.ts typescript
emitMessage<T>(message: Omit<EBMessage, "id" | "timestamp" | "correlationId">, contentType: string, contentEncoding: string): Promise<Readonly<EBMessage>>

Publishes a PURISTA message as a NATS JSON payload.

getInFlightExecutionCount

Method

getInFlightExecutionCount.ts typescript
getInFlightExecutionCount(): number

Number of currently running handlers across all work kinds.

getInFlightExecutionCounts

Method

getInFlightExecutionCounts.ts typescript
getInFlightExecutionCounts(): InFlightExecutionCounts

Number of currently running handlers grouped by work kind.

getPausedSubscriptionConsumers

Method

Source
getPausedSubscriptionConsumers.ts typescript
getPausedSubscriptionConsumers(): object

Returns currently paused subscription consumers keyed by registration key.

getTracer

Method

getTracer.ts typescript
getTracer(): Tracer

Returns open telemetry tracer of this service

invoke

Method

Source
invoke.ts typescript
invoke<T>(input: Omit<Command, "id" | "messageType" | "timestamp" | "correlationId">, commandTimeout: number): Promise<T>

Invokes a command with NATS request/reply and waits for the response.

isHealthy

Method

Source
isHealthy.ts typescript
isHealthy(): Promise<boolean>

Indicates whether the NATS connection is open and not draining.

isReady

Method

Source
isReady.ts typescript
isReady(): Promise<boolean>

Indicates whether the NATS connection is open and not draining.

openStream

Method

openStream.ts typescript
openStream<Chunk, Final>(_input: Omit<StreamOpenRequest, "id" | "messageType" | "timestamp" | "correlationId">, _ttl?: number): Promise<StreamHandle<Chunk, Final>>

Open a stream invocation.

registerCommand

Method

Source
registerCommand.ts typescript
registerCommand(address: EBMessageAddress, cb: (message: { contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: Command; ... }) => Promise<{ contentEncoding: "utf-8"; contentType: "application/json"; correlationId: string; eventName: string; id: string; isHandledError: boolean; ... } | { contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: CommandSuccessResponse; ... }>, metadata: CommandDefinitionMetadataBase, eventBridgeConfig: DefinitionEventBridgeConfig): Promise<string>

Registers a command handler.

registerStream

Method

registerStream.ts typescript
registerStream(_address: EBMessageAddress, _cb: (message: StreamMessage) => Promise<void>, _metadata: StreamDefinitionMetadataBase, _eventBridgeConfig: DefinitionEventBridgeConfig): Promise<string>

Register a service stream handler for a service target.

registerSubscription

Method

Source
registerSubscription.ts typescript
registerSubscription(subscription: Subscription, cb: (message: EBMessage) => Promise<Omit<{ contentEncoding: unknown; contentType: unknown; correlationId: unknown; eventName: unknown; id: unknown; messageType: unknown; ... }, unknown | unknown> | undefined>): Promise<string>

Registers a subscription handler.

resumeSubscriptionConsumer

Method

Source
resumeSubscriptionConsumer.ts typescript
resumeSubscriptionConsumer(registrationKey: string): Promise<void>

Resumes a subscription consumer paused by a `stop-consumer` control signal.

runInFlight

Method

runInFlight.ts typescript
runInFlight<T>(fn: () => Promise<T>, kind?: "command" | "subscription" | "stream" | "generic"): Promise<T>

start

Method

Source
start.ts typescript
start(): Promise<void>

Connects to NATS and initializes JetStream clients when available.

startActiveSpan

Method

startActiveSpan.ts typescript
startActiveSpan<F>(name: string, opts: SpanOptions, context: Context | undefined, fn: (span: Span) => Promise<F>): Promise<F>

Start a child span for opentelemetry tracking

unregisterCommand

Method

Source
unregisterCommand.ts typescript
unregisterCommand(address: EBMessageAddress): Promise<void>

Unregisters and drains/destroys a command handler subscription.

unregisterStream

Method

unregisterStream.ts typescript
unregisterStream(_address: EBMessageAddress): Promise<void>

Unregister a service stream

unregisterSubscription

Method

Source
unregisterSubscription.ts typescript
unregisterSubscription(address: EBMessageAddress): Promise<void>

Unregisters and drains/destroys a subscription handler.

waitForInFlightDrain

Method

waitForInFlightDrain.ts typescript
waitForInFlightDrain(timeoutMs?: number): Promise<boolean>

wrapInSpan

Method

wrapInSpan.ts typescript
wrapInSpan<F>(name: string, opts: SpanOptions, fn: (span: Span) => Promise<F>, context?: Context): Promise<F>

Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!