Skip to content

@purista/mqttbridge v2.0.5


PURISTA API / @purista/mqttbridge / MqttBridge

Class: MqttBridge

Defined in: mqttbridge/src/MqttEventBridge.ts:66

The MQTT event bridge connects to a MQTT broker. The broker must support the MQTT 5 protocol version

Example

typescript
import { MqttBridge } from '@purista/mqttbridge'

// create and init our eventbridge
const eventBridge = new MqttBridge()
await eventBridge.start()

@group Event bridge

## Extends

- [`EventBridgeBaseClass`](../../core/classes/EventBridgeBaseClass.md)\<[`MqttBridgeConfig`](../type-aliases/MqttBridgeConfig.md)\>

## Implements

- [`EventBridge`](../../core/interfaces/EventBridge.md)

## Constructors

### new MqttBridge()

> **new MqttBridge**(`config`?): [`MqttBridge`](MqttBridge.md)

Defined in: [mqttbridge/src/MqttEventBridge.ts:73](https://github.com/puristajs/purista/blob/master/packages/mqttbridge/src/MqttEventBridge.ts#L73)

#### Parameters

##### config?

###### allowRetries?

`boolean`

allow retry of the initial connect

###### defaultCommandTimeout?

`number`

Overwrite the hardcoded default timeout of command invocations

###### defaultMessageExpiryInterval?

`number`

the message expiry interval in seconds

**Default**

```ts
defaultSessionExpiryInterval?

number

Default

ts
0
emptyTopicPartString?

string

The string which should be used in topics for parts, which are undefined

Default

ts
__none__
instanceId?

string

The instance id of the event bridge. If not set, a id will generated each time a instance is created. Use this if there is a need to always have the same instance id.

logger?

Logger

A logger instance

logLevel?

LogLevelName

If no logger instance is given, use this log level

qosCommand?

QoS

QOS for command, command responses and command response subscriptions messages

Default

ts
1
qoSSubscription?

QoS

QOS for all subscriptions

Default

ts
1
shareTopicName?

string

the name of the shared topic (similar to pubsub name)

Default

ts
sharedpurista
shareTopicPrefix?

string

the prefix to be used to dynamically create topic names for shared subscriptions

Default

ts
$share
spanProcessor?

SpanProcessor

A OpenTelemetry span processor

topicPrefix?

string

the prefix for topic to prevent name collisions

Default

ts
purista

Returns

MqttBridge

Overrides

EventBridgeBaseClass.constructor

Properties

client

client: undefined | MqttClient

Defined in: mqttbridge/src/MqttEventBridge.ts:69


config

config: Complete<{ allowRetries: boolean; defaultCommandTimeout: number; defaultMessageExpiryInterval: number; defaultSessionExpiryInterval: number; emptyTopicPartString: string; instanceId: string; logger: Logger; logLevel: LogLevelName; qosCommand: QoS; qoSSubscription: QoS; shareTopicName: string; shareTopicPrefix: string; spanProcessor: SpanProcessor; topicPrefix: string; }>

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:14

Inherited from

EventBridgeBaseClass.config


defaultCommandTimeout

defaultCommandTimeout: number

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:17

The default time until when a command invocation automatically returns a time out error

Implementation of

EventBridge.defaultCommandTimeout

Inherited from

EventBridgeBaseClass.defaultCommandTimeout


instanceId

instanceId: string

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:16

Implementation of

EventBridge.instanceId

Inherited from

EventBridgeBaseClass.instanceId


logger

logger: Logger

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:12

Inherited from

EventBridgeBaseClass.logger


name

name: string

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:15

Implementation of

EventBridge.name

Inherited from

EventBridgeBaseClass.name


pendingInvocations

pendingInvocations: Map<string, PendigInvocation>

Defined in: mqttbridge/src/MqttEventBridge.ts:70


traceProvider

traceProvider: NodeTracerProvider

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:13

Inherited from

EventBridgeBaseClass.traceProvider

Methods

destroy()

destroy(): Promise<void>

Defined in: mqttbridge/src/MqttEventBridge.ts:385

Shut down event bridge as gracefully as possible

Returns

Promise<void>

Implementation of

EventBridge.destroy

Overrides

EventBridgeBaseClass.destroy


emit()

emit<K>(eventName, parameter?): void

Defined in: core/dist/commonjs/core/types/GenericEventEmitter.d.ts:13

Type Parameters

K extends EventKey<{ [key: custom-${string}]: unknown; [key: adapter-${string}]: unknown; eventbridge-connected: never; eventbridge-connection-error: unknown; eventbridge-disconnected: never; eventbridge-error: unknown; eventbridge-reconnecting: never; }>

Parameters

eventName

K

parameter?

object[K]

Returns

void

Inherited from

EventBridgeBaseClass.emit


emitMessage()

emitMessage<T>(message, contentType, contentEncoding): Promise<Readonly<EBMessage>>

Defined in: mqttbridge/src/MqttEventBridge.ts:132

Emit a message to the eventbridge without awaiting a result

Type Parameters

T extends EBMessage

Parameters

message

Omit<EBMessage, "id" | "timestamp" | "correlationId">

the message

contentType

string = 'application/json'

contentEncoding

string = 'utf-8'

Returns

Promise<Readonly<EBMessage>>

Implementation of

EventBridge.emitMessage


getTracer()

getTracer(): Tracer

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:24

Returns open telemetry tracer of this service

Returns

Tracer

Tracer

Inherited from

EventBridgeBaseClass.getTracer


invoke()

invoke<T>(input, commandTimeout): Promise<T>

Defined in: mqttbridge/src/MqttEventBridge.ts:208

Call a command of a service and return the result of this command

Type Parameters

T

Parameters

input

Omit<{ contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: Command; otp: string; payload: { parameter: unknown; payload: unknown; }; principalId: string; receiver: EBMessageAddress; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId: string; timestamp: number; traceId: string; }, "id" | "timestamp" | "correlationId" | "messageType">

a partial command message

commandTimeout

number = ...

the time to live (timeout) of the invocation

Returns

Promise<T>

Implementation of

EventBridge.invoke


isHealthy()

isHealthy(): Promise<boolean>

Defined in: mqttbridge/src/MqttEventBridge.ts:204

Indicates if the eventbridge is running and works correctly

Returns

Promise<boolean>

Implementation of

EventBridge.isHealthy


isReady()

isReady(): Promise<boolean>

Defined in: mqttbridge/src/MqttEventBridge.ts:200

Indicates if the eventbridge has been started and is connected to underlaying message broker

Returns

Promise<boolean>

Implementation of

EventBridge.isReady


off()

off<K>(eventName, fn): void

Defined in: core/dist/commonjs/core/types/GenericEventEmitter.d.ts:12

Type Parameters

K extends EventKey<{ [key: custom-${string}]: unknown; [key: adapter-${string}]: unknown; eventbridge-connected: never; eventbridge-connection-error: unknown; eventbridge-disconnected: never; eventbridge-error: unknown; eventbridge-reconnecting: never; }>

Parameters

eventName

K

fn

EventReceiver<object[K]>

Returns

void

Inherited from

EventBridgeBaseClass.off


on()

on<K>(eventName, fn): void

Defined in: core/dist/commonjs/core/types/GenericEventEmitter.d.ts:11

Type Parameters

K extends EventKey<{ [key: custom-${string}]: unknown; [key: adapter-${string}]: unknown; eventbridge-connected: never; eventbridge-connection-error: unknown; eventbridge-disconnected: never; eventbridge-error: unknown; eventbridge-reconnecting: never; }>

Parameters

eventName

K

fn

EventReceiver<object[K]>

Returns

void

Inherited from

EventBridgeBaseClass.on


registerCommand()

registerCommand(address, cb, metadata, eventBridgeConfig): Promise<string>

Defined in: mqttbridge/src/MqttEventBridge.ts:328

Parameters

address

EBMessageAddress

the address of the service command (service name, version and command name)

cb

(message) => Promise<{ contentEncoding: "utf-8"; contentType: "application/json"; correlationId: string; eventName: string; id: string; isHandledError: boolean; messageType: CommandErrorResponse; otp: string; payload: { data: unknown; message: string; status: StatusCode; }; principalId: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId: string; timestamp: number; traceId: string; } | { contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: CommandSuccessResponse; otp: string; payload: unknown; principalId: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId: string; timestamp: number; traceId: string; }>

the function to be called if a matching command arrives

metadata

CommandDefinitionMetadataBase

eventBridgeConfig

DefinitionEventBridgeConfig

Returns

Promise<string>

Implementation of

EventBridge.registerCommand


registerSubscription()

registerSubscription(subscription, cb): Promise<string>

Defined in: mqttbridge/src/MqttEventBridge.ts:357

Register a new subscription

Parameters

subscription

Subscription

the subscription definition

cb

(message) => Promise<undefined | Omit<{ contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: CustomMessage; otp: string; payload: unknown; principalId: string; receiver: EBMessageAddress; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId: string; timestamp: number; traceId: string; }, "id" | "timestamp">>

the function to be called if a matching message arrives

Returns

Promise<string>

Implementation of

EventBridge.registerSubscription


removeAllListeners()

removeAllListeners(): void

Defined in: core/dist/commonjs/core/types/GenericEventEmitter.d.ts:14

Returns

void

Inherited from

EventBridgeBaseClass.removeAllListeners


start()

start(): Promise<void>

Defined in: mqttbridge/src/MqttEventBridge.ts:82

Start the eventbridge and connect to the underlaying message broker

Returns

Promise<void>

Implementation of

EventBridge.start

Overrides

EventBridgeBaseClass.start


startActiveSpan()

startActiveSpan<F>(name, opts, context, fn): Promise<F>

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:33

Start a child span for opentelemetry tracking

Type Parameters

F

Parameters

name

string

name of span

opts

SpanOptions

span options

context

optional context

undefined | Context

fn

(span) => Promise<F>

function to be executed within the span

Returns

Promise<F>

return value of fn

Inherited from

EventBridgeBaseClass.startActiveSpan


unregisterCommand()

unregisterCommand(address): Promise<void>

Defined in: mqttbridge/src/MqttEventBridge.ts:351

Unregister a service command

Parameters

address

EBMessageAddress

The address (service name, version and command name) of the command to be de-registered

Returns

Promise<void>

Implementation of

EventBridge.unregisterCommand


unregisterSubscription()

unregisterSubscription(_address): Promise<void>

Defined in: mqttbridge/src/MqttEventBridge.ts:383

Parameters

_address

EBMessageAddress

Returns

Promise<void>

Implementation of

EventBridge.unregisterSubscription


wrapInSpan()

wrapInSpan<F>(name, opts, fn, context?): Promise<F>

Defined in: core/dist/commonjs/core/EventBridge/EventBridgeBaseClass.impl.d.ts:49

Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!

This means during logging and similar the spanId of parent span is logged.

Use wrapInSpan for marking points in flow of one bigger function, but not to trace the program flow itself

Type Parameters

F

Parameters

name

string

name of span

opts

SpanOptions

span options

fn

(span) => Promise<F>

function te be executed in the span

context?

Context

span context

Returns

Promise<F>

return value of fn

Inherited from

EventBridgeBaseClass.wrapInSpan