Classes · @purista/mqttbridge

MqttBridge

EventBridge implementation for MQTT 5 brokers.

Signature

MqttBridge.ts typescript
class MqttBridge

Examples

example-1.ts typescript
import { MqttBridge } from '@purista/mqttbridge'

// create and init our eventbridge
const eventBridge = new MqttBridge()
await eventBridge.start()

Constructors

1 entry

constructor

Constructor

Source
constructor.ts typescript
new constructor(config?: { allowRetries: boolean; defaultCommandTimeout: number; defaultMessageExpiryInterval: number; defaultSessionExpiryInterval: number; emptyTopicPartString: string; instanceId: string; ... })

Creates an MQTT bridge with defaults merged into MQTT client options.

Properties

11 entries

capabilities

Property

capabilities.ts typescript
capabilities: EventBridgeCapabilities

Runtime capability matrix used for strict startup validation.

client

Property

Source
client.ts typescript
client: MqttClient | undefined

MQTT client instance, available after start.

config

Property

config.ts typescript
config: Complete<EventBridgeConfig<ConfigType>>

defaultCommandTimeout

Property

defaultCommandTimeout.ts typescript
defaultCommandTimeout: number

The default time until when a command invocation automatically returns a time out error

inFlightExecutions

Property

inFlightExecutions.ts typescript
inFlightExecutions: InFlightExecutionTracker

instanceId

Property

instanceId.ts typescript
instanceId: string

Stable runtime instance id used to distinguish bridge processes.

logger

Property

logger.ts typescript
logger: Logger

metricsRecorder

Property

metricsRecorder.ts typescript
metricsRecorder: PuristaMetricsRecorder

name

Property

name.ts typescript
name: string

Human-readable bridge name used in logs, traces, and metrics.

pendingInvocations

Property

Source
pendingInvocations.ts typescript
pendingInvocations: PendingInvocationRegistry<unknown>

Pending command invocations waiting for MQTT correlation responses.

traceProvider

Property

traceProvider.ts typescript
traceProvider: NodeTracerProvider

Methods

22 entries

destroy

Method

Source
destroy.ts typescript
destroy(): Promise<void>

Rejects pending invocations, waits for in-flight handlers, and closes the MQTT client.

emitMessage

Method

Source
emitMessage.ts typescript
emitMessage<T>(message: Omit<EBMessage, "id" | "timestamp" | "correlationId">, contentType: string, contentEncoding: string): Promise<Readonly<EBMessage>>

Publishes a PURISTA event/message as a JSON MQTT payload.

getInFlightExecutionCount

Method

getInFlightExecutionCount.ts typescript
getInFlightExecutionCount(): number

Number of currently running handlers across all work kinds.

getInFlightExecutionCounts

Method

getInFlightExecutionCounts.ts typescript
getInFlightExecutionCounts(): InFlightExecutionCounts

Number of currently running handlers grouped by work kind.

getPausedSubscriptionConsumers

Method

getPausedSubscriptionConsumers.ts typescript
getPausedSubscriptionConsumers(): PausedSubscriptionConsumersByRegistrationKey

Returns paused subscription consumer states keyed by adapter registration key.

getTracer

Method

getTracer.ts typescript
getTracer(): Tracer

Returns open telemetry tracer of this service

invoke

Method

Source
invoke.ts typescript
invoke<T>(input: Omit<Command, "id" | "messageType" | "timestamp" | "correlationId">, commandTimeout: number): Promise<T>

Invokes a command over MQTT and waits for a correlated response message.

isHealthy

Method

Source
isHealthy.ts typescript
isHealthy(): Promise<boolean>

Indicates whether the MQTT client is connected.

isReady

Method

Source
isReady.ts typescript
isReady(): Promise<boolean>

Indicates whether the MQTT client is connected.

openStream

Method

openStream.ts typescript
openStream<Chunk, Final>(_input: Omit<StreamOpenRequest, "id" | "messageType" | "timestamp" | "correlationId">, _ttl?: number): Promise<StreamHandle<Chunk, Final>>

Open a stream invocation.

registerCommand

Method

Source
registerCommand.ts typescript
registerCommand(address: EBMessageAddress, cb: (message: { contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: Command; ... }) => Promise<{ contentEncoding: "utf-8"; contentType: "application/json"; correlationId: string; eventName: string; id: string; isHandledError: boolean; ... } | { contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: CommandSuccessResponse; ... }>, metadata: CommandDefinitionMetadataBase, eventBridgeConfig: DefinitionEventBridgeConfig): Promise<string>

Registers a command handler on the shared MQTT command topic.

registerStream

Method

registerStream.ts typescript
registerStream(_address: EBMessageAddress, _cb: (message: StreamMessage) => Promise<void>, _metadata: StreamDefinitionMetadataBase, _eventBridgeConfig: DefinitionEventBridgeConfig): Promise<string>

Register a service stream handler for a service target.

registerSubscription

Method

Source
registerSubscription.ts typescript
registerSubscription(subscription: Subscription, cb: (message: EBMessage) => Promise<Omit<{ contentEncoding: unknown; contentType: unknown; correlationId: unknown; eventName: unknown; id: unknown; messageType: unknown; ... }, unknown | unknown> | undefined>): Promise<string>

Registers a subscription handler on a topic derived from the subscription filter.

resumeSubscriptionConsumer

Method

resumeSubscriptionConsumer.ts typescript
resumeSubscriptionConsumer(_registrationKey: string): Promise<void>

Resumes a paused subscription consumer by registration key.

runInFlight

Method

runInFlight.ts typescript
runInFlight<T>(fn: () => Promise<T>, kind?: "command" | "subscription" | "stream" | "generic"): Promise<T>

start

Method

Source
start.ts typescript
start(): Promise<void>

Connects to the MQTT broker and subscribes to this instance's command response topic.

startActiveSpan

Method

startActiveSpan.ts typescript
startActiveSpan<F>(name: string, opts: SpanOptions, context: Context | undefined, fn: (span: Span) => Promise<F>): Promise<F>

Start a child span for opentelemetry tracking

unregisterCommand

Method

Source
unregisterCommand.ts typescript
unregisterCommand(address: EBMessageAddress): Promise<void>

Unsubscribes a command handler topic and removes it from the local router.

unregisterStream

Method

unregisterStream.ts typescript
unregisterStream(_address: EBMessageAddress): Promise<void>

Unregister a service stream

unregisterSubscription

Method

Source
unregisterSubscription.ts typescript
unregisterSubscription(address: EBMessageAddress): Promise<void>

Unsubscribes and removes a registered subscription handler.

waitForInFlightDrain

Method

waitForInFlightDrain.ts typescript
waitForInFlightDrain(timeoutMs?: number): Promise<boolean>

wrapInSpan

Method

wrapInSpan.ts typescript
wrapInSpan<F>(name: string, opts: SpanOptions, fn: (span: Span) => Promise<F>, context?: Context): Promise<F>

Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!