Classes · @purista/mqttbridge
MqttBridge
EventBridge implementation for MQTT 5 brokers.
Signature
class MqttBridge Examples
import { MqttBridge } from '@purista/mqttbridge'
// create and init our eventbridge
const eventBridge = new MqttBridge()
await eventBridge.start() Constructors
1 entry
constructor
Constructor
new constructor(config?: { allowRetries: boolean; defaultCommandTimeout: number; defaultMessageExpiryInterval: number; defaultSessionExpiryInterval: number; emptyTopicPartString: string; instanceId: string; ... }) Creates an MQTT bridge with defaults merged into MQTT client options.
Properties
11 entries
capabilities
Property
capabilities: EventBridgeCapabilities Runtime capability matrix used for strict startup validation.
client
Property
client: MqttClient | undefined MQTT client instance, available after start.
config
Property
config: Complete<EventBridgeConfig<ConfigType>> defaultCommandTimeout
Property
defaultCommandTimeout: number The default time until when a command invocation automatically returns a time out error
inFlightExecutions
Property
inFlightExecutions: InFlightExecutionTracker instanceId
Property
instanceId: string Stable runtime instance id used to distinguish bridge processes.
logger
Property
logger: Logger metricsRecorder
Property
metricsRecorder: PuristaMetricsRecorder name
Property
name: string Human-readable bridge name used in logs, traces, and metrics.
pendingInvocations
Property
pendingInvocations: PendingInvocationRegistry<unknown> Pending command invocations waiting for MQTT correlation responses.
traceProvider
Property
traceProvider: NodeTracerProvider Methods
22 entries
destroy
Method
destroy(): Promise<void> Rejects pending invocations, waits for in-flight handlers, and closes the MQTT client.
emitMessage
Method
emitMessage<T>(message: Omit<EBMessage, "id" | "timestamp" | "correlationId">, contentType: string, contentEncoding: string): Promise<Readonly<EBMessage>> Publishes a PURISTA event/message as a JSON MQTT payload.
getInFlightExecutionCount
Method
getInFlightExecutionCount(): number Number of currently running handlers across all work kinds.
getInFlightExecutionCounts
Method
getInFlightExecutionCounts(): InFlightExecutionCounts Number of currently running handlers grouped by work kind.
getPausedSubscriptionConsumers
Method
getPausedSubscriptionConsumers(): PausedSubscriptionConsumersByRegistrationKey Returns paused subscription consumer states keyed by adapter registration key.
getTracer
Method
getTracer(): Tracer Returns open telemetry tracer of this service
invoke
Method
invoke<T>(input: Omit<Command, "id" | "messageType" | "timestamp" | "correlationId">, commandTimeout: number): Promise<T> Invokes a command over MQTT and waits for a correlated response message.
isHealthy
Method
isHealthy(): Promise<boolean> Indicates whether the MQTT client is connected.
isReady
Method
isReady(): Promise<boolean> Indicates whether the MQTT client is connected.
openStream
Method
openStream<Chunk, Final>(_input: Omit<StreamOpenRequest, "id" | "messageType" | "timestamp" | "correlationId">, _ttl?: number): Promise<StreamHandle<Chunk, Final>> Open a stream invocation.
registerCommand
Method
registerCommand(address: EBMessageAddress, cb: (message: { contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: Command; ... }) => Promise<{ contentEncoding: "utf-8"; contentType: "application/json"; correlationId: string; eventName: string; id: string; isHandledError: boolean; ... } | { contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: CommandSuccessResponse; ... }>, metadata: CommandDefinitionMetadataBase, eventBridgeConfig: DefinitionEventBridgeConfig): Promise<string> Registers a command handler on the shared MQTT command topic.
registerStream
Method
registerStream(_address: EBMessageAddress, _cb: (message: StreamMessage) => Promise<void>, _metadata: StreamDefinitionMetadataBase, _eventBridgeConfig: DefinitionEventBridgeConfig): Promise<string> Register a service stream handler for a service target.
registerSubscription
Method
registerSubscription(subscription: Subscription, cb: (message: EBMessage) => Promise<Omit<{ contentEncoding: unknown; contentType: unknown; correlationId: unknown; eventName: unknown; id: unknown; messageType: unknown; ... }, unknown | unknown> | undefined>): Promise<string> Registers a subscription handler on a topic derived from the subscription filter.
resumeSubscriptionConsumer
Method
resumeSubscriptionConsumer(_registrationKey: string): Promise<void> Resumes a paused subscription consumer by registration key.
runInFlight
Method
runInFlight<T>(fn: () => Promise<T>, kind?: "command" | "subscription" | "stream" | "generic"): Promise<T> start
Method
start(): Promise<void> Connects to the MQTT broker and subscribes to this instance's command response topic.
startActiveSpan
Method
startActiveSpan<F>(name: string, opts: SpanOptions, context: Context | undefined, fn: (span: Span) => Promise<F>): Promise<F> Start a child span for opentelemetry tracking
unregisterCommand
Method
unregisterCommand(address: EBMessageAddress): Promise<void> Unsubscribes a command handler topic and removes it from the local router.
unregisterStream
Method
unregisterStream(_address: EBMessageAddress): Promise<void> Unregister a service stream
unregisterSubscription
Method
unregisterSubscription(address: EBMessageAddress): Promise<void> Unsubscribes and removes a registered subscription handler.
waitForInFlightDrain
Method
waitForInFlightDrain(timeoutMs?: number): Promise<boolean> wrapInSpan
Method
wrapInSpan<F>(name: string, opts: SpanOptions, fn: (span: Span) => Promise<F>, context?: Context): Promise<F> Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!