Skip to content

PURISTA API


PURISTA API / @purista/mqttbridge / MqttBridge

Class: MqttBridge

Defined in: mqttbridge/src/MqttEventBridge.ts:68

The MQTT event bridge connects to a MQTT broker. The broker must support the MQTT 5 protocol version

Example

typescript
import { MqttBridge } from '@purista/mqttbridge'

// create and init our eventbridge
const eventBridge = new MqttBridge()
await eventBridge.start()

@group Event bridge

## Extends

- [`EventBridgeBaseClass`](../../core/classes/EventBridgeBaseClass.md)\<[`MqttBridgeConfig`](../type-aliases/MqttBridgeConfig.md)\>

## Implements

- [`EventBridge`](../../core/interfaces/EventBridge.md)

## Constructors

### Constructor

> **new MqttBridge**(`config?`): `MqttBridge`

Defined in: [mqttbridge/src/MqttEventBridge.ts:85](https://github.com/puristajs/purista/blob/master/packages/mqttbridge/src/MqttEventBridge.ts#L85)

#### Parameters

##### config?

###### allowRetries?

`boolean`

allow retry of the initial connect

###### defaultCommandTimeout?

`number`

Overwrite the hardcoded default timeout of command invocations

###### defaultMessageExpiryInterval?

`number`

the message expiry interval in seconds

**Default**

```ts
defaultSessionExpiryInterval?

number

Default

ts
0
emptyTopicPartString?

string

The string which should be used in topics for parts, which are undefined

Default

ts
__none__
instanceId?

string

The instance id of the event bridge. If not set, a id will generated each time a instance is created. Use this if there is a need to always have the same instance id.

logger?

Logger

logLevel?

LogLevelName

metrics?

PuristaMetricsRuntimeOptions

metricsRecorder?

PuristaMetricsRecorderInterface

qosCommand?

QoS

QOS for command, command responses and command response subscriptions messages

Default

ts
1
qoSSubscription?

QoS

QOS for all subscriptions

Default

ts
1
shareTopicName?

string

the name of the shared topic (similar to pubsub name)

Default

ts
sharedpurista
shareTopicPrefix?

string

the prefix to be used to dynamically create topic names for shared subscriptions

Default

ts
$share
spanProcessor?

SpanProcessor

topicPrefix?

string

the prefix for topic to prevent name collisions

Default

ts
purista

Returns

MqttBridge

Overrides

EventBridgeBaseClass.constructor

Properties

capabilities

capabilities: EventBridgeCapabilities

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:28

Implementation of

EventBridge.capabilities

Inherited from

EventBridgeBaseClass.capabilities


client

client: MqttClient | undefined

Defined in: mqttbridge/src/MqttEventBridge.ts:69


config

config: Complete<EventBridgeConfig<ConfigType>>

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:26

Inherited from

EventBridgeBaseClass.config


defaultCommandTimeout

defaultCommandTimeout: number

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:30

The default time until when a command invocation automatically returns a time out error

Implementation of

EventBridge.defaultCommandTimeout

Inherited from

EventBridgeBaseClass.defaultCommandTimeout


inFlightExecutions

protected readonly inFlightExecutions: InFlightExecutionTracker

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:31

Inherited from

EventBridgeBaseClass.inFlightExecutions


instanceId

instanceId: string

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:29

Implementation of

EventBridge.instanceId

Inherited from

EventBridgeBaseClass.instanceId


logger

logger: Logger

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:23

Inherited from

EventBridgeBaseClass.logger


metricsRecorder

protected metricsRecorder: PuristaMetricsRecorderInterface

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:25

Inherited from

EventBridgeBaseClass.metricsRecorder


name

name: string

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:27

Implementation of

EventBridge.name

Inherited from

EventBridgeBaseClass.name


pendingInvocations

pendingInvocations: PendingInvocationRegistry<unknown>

Defined in: mqttbridge/src/MqttEventBridge.ts:70


traceProvider

traceProvider: NodeTracerProvider

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:24

Inherited from

EventBridgeBaseClass.traceProvider

Methods

destroy()

destroy(): Promise<void>

Defined in: mqttbridge/src/MqttEventBridge.ts:418

Shut down event bridge as gracefully as possible

Returns

Promise<void>

Implementation of

EventBridge.destroy

Overrides

EventBridgeBaseClass.destroy


emitMessage()

emitMessage<T>(message, contentType?, contentEncoding?): Promise<Readonly<EBMessage>>

Defined in: mqttbridge/src/MqttEventBridge.ts:175

Emit a message to the eventbridge without awaiting a result

Type Parameters

T

T extends EBMessage

Parameters

message

Omit<EBMessage, "id" | "timestamp" | "correlationId">

the message

contentType?

string = 'application/json'

contentEncoding?

string = 'utf-8'

Returns

Promise<Readonly<EBMessage>>

Implementation of

EventBridge.emitMessage


getInFlightExecutionCount()

getInFlightExecutionCount(): number

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:69

Number of currently running handlers across all work kinds.

Returns

number

Implementation of

EventBridge.getInFlightExecutionCount

Inherited from

EventBridgeBaseClass.getInFlightExecutionCount


getInFlightExecutionCounts()

getInFlightExecutionCounts(): InFlightExecutionCounts

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:70

Number of currently running handlers grouped by work kind.

Returns

InFlightExecutionCounts

Implementation of

EventBridge.getInFlightExecutionCounts

Inherited from

EventBridgeBaseClass.getInFlightExecutionCounts


getPausedSubscriptionConsumers()

getPausedSubscriptionConsumers(): PausedSubscriptionConsumersByRegistrationKey

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:71

Returns paused subscription consumer states keyed by adapter registration key.

Returns

PausedSubscriptionConsumersByRegistrationKey

Implementation of

EventBridge.getPausedSubscriptionConsumers

Inherited from

EventBridgeBaseClass.getPausedSubscriptionConsumers


getTracer()

getTracer(): Tracer

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:38

Returns open telemetry tracer of this service

Returns

Tracer

Tracer

Inherited from

EventBridgeBaseClass.getTracer


invoke()

invoke<T>(input, commandTimeout?): Promise<T>

Defined in: mqttbridge/src/MqttEventBridge.ts:252

Call a command of a service and return the result of this command

Type Parameters

T

T

Parameters

input

Omit<Command, "id" | "messageType" | "timestamp" | "correlationId">

a partial command message

commandTimeout?

number = ...

the time to live (timeout) of the invocation

Returns

Promise<T>

Implementation of

EventBridge.invoke


isHealthy()

isHealthy(): Promise<boolean>

Defined in: mqttbridge/src/MqttEventBridge.ts:248

Indicates if the eventbridge is running and works correctly

Returns

Promise<boolean>

Implementation of

EventBridge.isHealthy


isReady()

isReady(): Promise<boolean>

Defined in: mqttbridge/src/MqttEventBridge.ts:244

Indicates if the eventbridge has been started and is connected to underlaying message broker

Returns

Promise<boolean>

Implementation of

EventBridge.isReady


openStream()

openStream<Chunk, Final>(_input, _ttl?): Promise<StreamHandle<Chunk, Final>>

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:73

Open a stream invocation. The returned handle can be consumed via async iteration and can be cancelled by caller.

Type Parameters

Chunk

Chunk = unknown

Final

Final = unknown

Parameters

_input

Omit<StreamOpenRequest, "id" | "messageType" | "timestamp" | "correlationId">

_ttl?

number

Returns

Promise<StreamHandle<Chunk, Final>>

Implementation of

EventBridge.openStream

Inherited from

EventBridgeBaseClass.openStream


registerCommand()

registerCommand(address, cb, metadata, eventBridgeConfig): Promise<string>

Defined in: mqttbridge/src/MqttEventBridge.ts:343

Parameters

address

EBMessageAddress

the address of the service command (service name, version and command name)

cb

(message) => Promise<{ contentEncoding: "utf-8"; contentType: "application/json"; correlationId: string; eventName?: string; id: string; isHandledError: boolean; messageType: CommandErrorResponse; otp?: string; payload: { data?: unknown; message: string; status: StatusCode; }; principalId?: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; } | { contentEncoding: string; contentType: string; correlationId: string; eventName?: string; id: string; messageType: CommandSuccessResponse; otp?: string; payload: unknown; principalId?: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; }>

the function to be called if a matching command arrives

metadata

CommandDefinitionMetadataBase

eventBridgeConfig

DefinitionEventBridgeConfig

Returns

Promise<string>

Implementation of

EventBridge.registerCommand


registerStream()

registerStream(_address, _cb, _metadata, _eventBridgeConfig): Promise<string>

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:74

Register a service stream.

Parameters

_address

EBMessageAddress

_cb

(message) => Promise<void>

_metadata

StreamDefinitionMetadataBase

_eventBridgeConfig

DefinitionEventBridgeConfig

Returns

Promise<string>

Implementation of

EventBridge.registerStream

Inherited from

EventBridgeBaseClass.registerStream


registerSubscription()

registerSubscription(subscription, cb): Promise<string>

Defined in: mqttbridge/src/MqttEventBridge.ts:374

Register a new subscription

Parameters

subscription

Subscription

the subscription definition

cb

(message) => Promise<Omit<{ contentEncoding: string; contentType: string; correlationId?: string; eventName: string; id: string; messageType: CustomMessage; otp?: string; payload?: unknown; principalId?: string; receiver?: EBMessageAddress; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; }, "id" | "timestamp"> | undefined>

the function to be called if a matching message arrives

Returns

Promise<string>

Implementation of

EventBridge.registerSubscription


resumeSubscriptionConsumer()

resumeSubscriptionConsumer(_registrationKey): Promise<void>

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:72

Resumes a paused subscription consumer by registration key.

Parameters

_registrationKey

string

Returns

Promise<void>

Implementation of

EventBridge.resumeSubscriptionConsumer

Inherited from

EventBridgeBaseClass.resumeSubscriptionConsumer


runInFlight()

runInFlight<T>(fn, kind?): Promise<T>

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:67

Type Parameters

T

T

Parameters

fn

() => Promise<T>

kind?

"command" | "subscription" | "stream" | "generic"

Returns

Promise<T>

Inherited from

EventBridgeBaseClass.runInFlight


start()

start(): Promise<void>

Defined in: mqttbridge/src/MqttEventBridge.ts:128

Start the eventbridge and connect to the underlaying message broker

Returns

Promise<void>

Implementation of

EventBridge.start

Overrides

EventBridgeBaseClass.start


startActiveSpan()

startActiveSpan<F>(name, opts, context, fn): Promise<F>

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:47

Start a child span for opentelemetry tracking

Type Parameters

F

F

Parameters

name

string

name of span

opts

SpanOptions

span options

context

Context | undefined

optional context

fn

(span) => Promise<F>

function to be executed within the span

Returns

Promise<F>

return value of fn

Inherited from

EventBridgeBaseClass.startActiveSpan


unregisterCommand()

unregisterCommand(address): Promise<void>

Defined in: mqttbridge/src/MqttEventBridge.ts:367

Unregister a service command

Parameters

address

EBMessageAddress

The address (service name, version and command name) of the command to be de-registered

Returns

Promise<void>

Implementation of

EventBridge.unregisterCommand


unregisterStream()

unregisterStream(_address): Promise<void>

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:75

Unregister a service stream

Parameters

_address

EBMessageAddress

Returns

Promise<void>

Implementation of

EventBridge.unregisterStream

Inherited from

EventBridgeBaseClass.unregisterStream


unregisterSubscription()

unregisterSubscription(address): Promise<void>

Defined in: mqttbridge/src/MqttEventBridge.ts:405

Parameters

address

EBMessageAddress

Returns

Promise<void>

Implementation of

EventBridge.unregisterSubscription


waitForInFlightDrain()

waitForInFlightDrain(timeoutMs?): Promise<boolean>

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:68

Parameters

timeoutMs?

number

Returns

Promise<boolean>

Inherited from

EventBridgeBaseClass.waitForInFlightDrain


wrapInSpan()

wrapInSpan<F>(name, opts, fn, context?): Promise<F>

Defined in: core/dist/core/EventBridge/EventBridgeBaseClass.impl.d.ts:64

Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!

This means during logging and similar the spanId of parent span is logged.

Use wrapInSpan for marking points in flow of one bigger function, but not to trace the program flow itself

Type Parameters

F

F

Parameters

name

string

name of span

opts

SpanOptions

span options

fn

(span) => Promise<F>

function te be executed in the span

context?

Context

span context

Returns

Promise<F>

return value of fn

Inherited from

EventBridgeBaseClass.wrapInSpan