Classes · @purista/natsbridge
NatsBridge
EventBridge implementation for NATS core messaging with optional JetStream.
Signature
class NatsBridge Examples
import { NatsBridge } from '@purista/natsbridge'
const eventBridge = new NatsBridge({
servers: 'nats:___PH0___
durableSubscriptionMode: 'strict39;,
})
await eventBridge.start() Constructors
1 entry
constructor
Constructor
new constructor(config?: { commandResponsePublishTwice: "always" | "eventOnly" | "eventAndError" | "never"; defaultCommandTimeout: number; defaultConsumerFailureHandling: NatsConsumerFailureHandlingDefaults; defaultMessageExpiryInterval: number; durableSubscriptionMode: "strict" | "best-effort"; emptyTopicPartString: string; ... }) Creates a NATS bridge with defaults merged into NATS connection options.
Properties
16 entries
capabilities
Property
capabilities: EventBridgeCapabilities Runtime capability matrix used for strict startup validation.
commands
Property
commands: Map<string, JetStreamSubscription | Subscription> Registered command subscriptions keyed by PURISTA service address.
config
Property
config: Complete<EventBridgeConfig<ConfigType>> connection
Property
connection: NatsConnection | undefined Active NATS connection, available after start.
defaultCommandTimeout
Property
defaultCommandTimeout: number The default time until when a command invocation automatically returns a time out error
inFlightExecutions
Property
inFlightExecutions: InFlightExecutionTracker instanceId
Property
instanceId: string Stable runtime instance id used to distinguish bridge processes.
isJetStreamEnabled
Property
isJetStreamEnabled: boolean Indicates whether the connected broker reports JetStream support.
js
Property
js: JetStreamClient | undefined JetStream client used for durable publications and subscriptions.
jsm
Property
jsm: JetStreamManager | undefined JetStream manager used for stream and consumer provisioning.
logger
Property
logger: Logger metricsRecorder
Property
metricsRecorder: PuristaMetricsRecorder name
Property
name: string Human-readable bridge name used in logs, traces, and metrics.
sc
Property
sc: Codec<unknown> JSON codec used for NATS message payload serialization.
subscriptions
Property
subscriptions: Map<string, RegisteredSubscription> Registered event subscriptions keyed by PURISTA subscriber address.
traceProvider
Property
traceProvider: NodeTracerProvider Methods
22 entries
destroy
Method
destroy(): Promise<void> Waits for in-flight handlers, drains subscriptions, and closes the NATS connection.
emitMessage
Method
emitMessage<T>(message: Omit<EBMessage, "id" | "timestamp" | "correlationId">, contentType: string, contentEncoding: string): Promise<Readonly<EBMessage>> Publishes a PURISTA message as a NATS JSON payload.
getInFlightExecutionCount
Method
getInFlightExecutionCount(): number Number of currently running handlers across all work kinds.
getInFlightExecutionCounts
Method
getInFlightExecutionCounts(): InFlightExecutionCounts Number of currently running handlers grouped by work kind.
getPausedSubscriptionConsumers
Method
getPausedSubscriptionConsumers(): object Returns currently paused subscription consumers keyed by registration key.
getTracer
Method
getTracer(): Tracer Returns open telemetry tracer of this service
invoke
Method
invoke<T>(input: Omit<Command, "id" | "messageType" | "timestamp" | "correlationId">, commandTimeout: number): Promise<T> Invokes a command with NATS request/reply and waits for the response.
isHealthy
Method
isHealthy(): Promise<boolean> Indicates whether the NATS connection is open and not draining.
isReady
Method
isReady(): Promise<boolean> Indicates whether the NATS connection is open and not draining.
openStream
Method
openStream<Chunk, Final>(_input: Omit<StreamOpenRequest, "id" | "messageType" | "timestamp" | "correlationId">, _ttl?: number): Promise<StreamHandle<Chunk, Final>> Open a stream invocation.
registerCommand
Method
registerCommand(address: EBMessageAddress, cb: (message: { contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: Command; ... }) => Promise<{ contentEncoding: "utf-8"; contentType: "application/json"; correlationId: string; eventName: string; id: string; isHandledError: boolean; ... } | { contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: CommandSuccessResponse; ... }>, metadata: CommandDefinitionMetadataBase, eventBridgeConfig: DefinitionEventBridgeConfig): Promise<string> Registers a command handler.
registerStream
Method
registerStream(_address: EBMessageAddress, _cb: (message: StreamMessage) => Promise<void>, _metadata: StreamDefinitionMetadataBase, _eventBridgeConfig: DefinitionEventBridgeConfig): Promise<string> Register a service stream handler for a service target.
registerSubscription
Method
registerSubscription(subscription: Subscription, cb: (message: EBMessage) => Promise<Omit<{ contentEncoding: unknown; contentType: unknown; correlationId: unknown; eventName: unknown; id: unknown; messageType: unknown; ... }, unknown | unknown> | undefined>): Promise<string> Registers a subscription handler.
resumeSubscriptionConsumer
Method
resumeSubscriptionConsumer(registrationKey: string): Promise<void> Resumes a subscription consumer paused by a `stop-consumer` control signal.
runInFlight
Method
runInFlight<T>(fn: () => Promise<T>, kind?: "command" | "subscription" | "stream" | "generic"): Promise<T> start
Method
start(): Promise<void> Connects to NATS and initializes JetStream clients when available.
startActiveSpan
Method
startActiveSpan<F>(name: string, opts: SpanOptions, context: Context | undefined, fn: (span: Span) => Promise<F>): Promise<F> Start a child span for opentelemetry tracking
unregisterCommand
Method
unregisterCommand(address: EBMessageAddress): Promise<void> Unregisters and drains/destroys a command handler subscription.
unregisterStream
Method
unregisterStream(_address: EBMessageAddress): Promise<void> Unregister a service stream
unregisterSubscription
Method
unregisterSubscription(address: EBMessageAddress): Promise<void> Unregisters and drains/destroys a subscription handler.
waitForInFlightDrain
Method
waitForInFlightDrain(timeoutMs?: number): Promise<boolean> wrapInSpan
Method
wrapInSpan<F>(name: string, opts: SpanOptions, fn: (span: Span) => Promise<F>, context?: Context): Promise<F> Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!