Skip to content

PURISTA API / Modules / @purista/core / DefaultEventBridge

Class: DefaultEventBridge

@purista/core.DefaultEventBridge

Simple implementation of some simple in-memory event bridge. Does not support threads and does not need any external databases.

Example

typescript
import { DefaultEventBridge } from '@purista/core'

const eventBridge = new DefaultEventBridge()
await eventBridge.start()

// add your services

Hierarchy

Implements

Table of contents

Constructors

Properties

Methods

Constructors

constructor

new DefaultEventBridge(config?): DefaultEventBridge

Parameters

NameTypeDescription
config?Object-
config.defaultCommandTimeout?numberOverwrite the hardcoded default timeout of command invocations
config.emitMessagesAsEventBridgeEvents?booleanEmit messages which have an event name set as javascript events on the event bridge instance
config.instanceId?stringThe instance id of the event bridge. If not set, a id will generated each time a instance is created. Use this if there is a need to always have the same instance id.
config.logLevel?LogLevelNameIf no logger instance is given, use this log level
config.logWarnOnMessagesWithoutReceiver?booleanLog warnings on messages which are emitted, but could not delivered to at least one receiver
config.logger?LoggerA logger instance
config.spanProcessor?SpanProcessorA OpenTelemetry span processor

Returns

DefaultEventBridge

Overrides

EventBridgeBaseClass.constructor

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:85

Properties

config

config: Complete<{ defaultCommandTimeout?: number ; emitMessagesAsEventBridgeEvents?: boolean ; instanceId?: string ; logLevel?: LogLevelName ; logWarnOnMessagesWithoutReceiver?: boolean ; logger?: Logger ; spanProcessor?: SpanProcessor }>

Inherited from

EventBridgeBaseClass.config

Defined in

core/EventBridge/EventBridgeBaseClass.impl.ts:23


defaultCommandTimeout

defaultCommandTimeout: number

The default time until when a command invocation automatically returns a time out error

Implementation of

EventBridge.defaultCommandTimeout

Inherited from

EventBridgeBaseClass.defaultCommandTimeout

Defined in

core/EventBridge/EventBridgeBaseClass.impl.ts:29


hasStarted

Protected hasStarted: boolean = false

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:82


healthy

Protected healthy: boolean = false

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:83


instanceId

instanceId: string

Implementation of

EventBridge.instanceId

Inherited from

EventBridgeBaseClass.instanceId

Defined in

core/EventBridge/EventBridgeBaseClass.impl.ts:27


logger

logger: Logger

Inherited from

EventBridgeBaseClass.logger

Defined in

core/EventBridge/EventBridgeBaseClass.impl.ts:20


name

name: string

Implementation of

EventBridge.name

Inherited from

EventBridgeBaseClass.name

Defined in

core/EventBridge/EventBridgeBaseClass.impl.ts:25


pendingInvocations

Protected pendingInvocations: Map<string, PendigInvocation>

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:77


readStream

Protected readStream: Readable

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:65


runningSubscriptionCount

Protected runningSubscriptionCount: number = 0

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:78


serviceFunctions

Protected serviceFunctions: Map<string, (message: { contentEncoding: string ; contentType: string ; correlationId: string ; eventName?: string ; id: string ; messageType: Command ; otp?: string ; payload: { parameter: unknown ; payload: unknown } ; principalId?: string ; receiver: EBMessageAddress ; sender: { instanceId: string ; serviceName: string ; serviceTarget: string ; serviceVersion: string } ; tenantId?: string ; timestamp: number ; traceId?: string }) => Promise<{ contentEncoding: "utf-8" ; contentType: "application/json" ; correlationId: string ; eventName?: string ; id: string ; isHandledError: boolean ; messageType: CommandErrorResponse ; otp?: string ; payload: { data?: unknown ; message: string ; status: StatusCode } ; principalId?: string ; receiver: { instanceId: string ; serviceName: string ; serviceTarget: string ; serviceVersion: string } ; sender: { instanceId: string ; serviceName: string ; serviceTarget: string ; serviceVersion: string } ; tenantId?: string ; timestamp: number ; traceId?: string } | { contentEncoding: string ; contentType: string ; correlationId: string ; eventName?: string ; id: string ; messageType: CommandSuccessResponse ; otp?: string ; payload: unknown ; principalId?: string ; receiver: { instanceId: string ; serviceName: string ; serviceTarget: string ; serviceVersion: string } ; sender: { instanceId: string ; serviceName: string ; serviceTarget: string ; serviceVersion: string } ; tenantId?: string ; timestamp: number ; traceId?: string }>>

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:72


subscriptions

Protected subscriptions: Map<string, SubscriptionStorageEntry>

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:80


traceProvider

traceProvider: NodeTracerProvider

Inherited from

EventBridgeBaseClass.traceProvider

Defined in

core/EventBridge/EventBridgeBaseClass.impl.ts:21


writeStream

Protected writeStream: Writable

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:64

Methods

destroy

destroy(): Promise<void>

Shut down event bridge as gracefully as possible

Returns

Promise<void>

Implementation of

EventBridge.destroy

Overrides

EventBridgeBaseClass.destroy

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:385


emit

emit<K>(eventName, parameter?): void

Type parameters

NameType
Kextends EventKey<{ eventbridge-connected: never ; eventbridge-connection-error: unknown ; eventbridge-disconnected: never ; eventbridge-error: unknown ; eventbridge-reconnecting: never }>

Parameters

NameType
eventNameK
parameter?{ eventbridge-connected: never ; eventbridge-connection-error: unknown ; eventbridge-disconnected: never ; eventbridge-error: unknown ; eventbridge-reconnecting: never }[K]

Returns

void

Inherited from

EventBridgeBaseClass.emit

Defined in

core/types/GenericEventEmitter.ts:24


emitMessage

emitMessage(message): Promise<Readonly<EBMessage>>

Emit a new message to event bridge to be delivered to receiver

Parameters

NameTypeDescription
messageOmit<EBMessage, "correlationId" | "id" | "timestamp">EBMessage

Returns

Promise<Readonly<EBMessage>>

Implementation of

EventBridge.emitMessage

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:287


getTracer

getTracer(): Tracer

Returns open telemetry tracer of this service

Returns

Tracer

Tracer

Inherited from

EventBridgeBaseClass.getTracer

Defined in

core/EventBridge/EventBridgeBaseClass.impl.ts:70


invoke

invoke<T>(input, commandTimeout?): Promise<T>

Call a command of a service and return the result of this command

Type parameters

Name
T

Parameters

NameTypeDescription
inputOmit<{ contentEncoding: string ; contentType: string ; correlationId: string ; eventName?: string ; id: string ; messageType: Command ; otp?: string ; payload: { parameter: unknown ; payload: unknown } ; principalId?: string ; receiver: EBMessageAddress ; sender: { instanceId: string ; serviceName: string ; serviceTarget: string ; serviceVersion: string } ; tenantId?: string ; timestamp: number ; traceId?: string }, "messageType" | "correlationId" | "id" | "timestamp">a partial command message
commandTimeoutnumberthe time to live (timeout) of the invocation

Returns

Promise<T>

Implementation of

EventBridge.invoke

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:328


isHealthy

isHealthy(): Promise<boolean>

Indicates if the eventbridge is running and works correctly

Returns

Promise<boolean>

Implementation of

EventBridge.isHealthy

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:98


isReady

isReady(): Promise<boolean>

Indicates if the eventbridge has been started and is connected to underlaying message broker

Returns

Promise<boolean>

Implementation of

EventBridge.isReady

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:94


off

off<K>(eventName, fn): void

Type parameters

NameType
Kextends EventKey<{ eventbridge-connected: never ; eventbridge-connection-error: unknown ; eventbridge-disconnected: never ; eventbridge-error: unknown ; eventbridge-reconnecting: never }>

Parameters

NameType
eventNameK
fnEventReceiver<{ eventbridge-connected: never ; eventbridge-connection-error: unknown ; eventbridge-disconnected: never ; eventbridge-error: unknown ; eventbridge-reconnecting: never }[K]>

Returns

void

Inherited from

EventBridgeBaseClass.off

Defined in

core/types/GenericEventEmitter.ts:20


on

on<K>(eventName, fn): void

Type parameters

NameType
Kextends EventKey<{ eventbridge-connected: never ; eventbridge-connection-error: unknown ; eventbridge-disconnected: never ; eventbridge-error: unknown ; eventbridge-reconnecting: never }>

Parameters

NameType
eventNameK
fnEventReceiver<{ eventbridge-connected: never ; eventbridge-connection-error: unknown ; eventbridge-disconnected: never ; eventbridge-error: unknown ; eventbridge-reconnecting: never }[K]>

Returns

void

Inherited from

EventBridgeBaseClass.on

Defined in

core/types/GenericEventEmitter.ts:16


registerCommand

registerCommand(address, cb, metadata): Promise<string>

Register a service command and ensure that there is a queue for all incoming command requests.

Parameters

NameTypeDescription
addressEBMessageAddressThe service function address
cb(message: { contentEncoding: string ; contentType: string ; correlationId: string ; eventName?: string ; id: string ; messageType: Command ; otp?: string ; payload: { parameter: unknown ; payload: unknown } ; principalId?: string ; receiver: EBMessageAddress ; sender: { instanceId: string ; serviceName: string ; serviceTarget: string ; serviceVersion: string } ; tenantId?: string ; timestamp: number ; traceId?: string }) => Promise<{ contentEncoding: "utf-8" ; contentType: "application/json" ; correlationId: string ; eventName?: string ; id: string ; isHandledError: boolean ; messageType: CommandErrorResponse ; otp?: string ; payload: { data?: unknown ; message: string ; status: StatusCode } ; principalId?: string ; receiver: { instanceId: string ; serviceName: string ; serviceTarget: string ; serviceVersion: string } ; sender: { instanceId: string ; serviceName: string ; serviceTarget: string ; serviceVersion: string } ; tenantId?: string ; timestamp: number ; traceId?: string } | { contentEncoding: string ; contentType: string ; correlationId: string ; eventName?: string ; id: string ; messageType: CommandSuccessResponse ; otp?: string ; payload: unknown ; principalId?: string ; receiver: { instanceId: string ; serviceName: string ; serviceTarget: string ; serviceVersion: string } ; sender: { instanceId: string ; serviceName: string ; serviceTarget: string ; serviceVersion: string } ; tenantId?: string ; timestamp: number ; traceId?: string }>the function to call if a matching command message arrives
metadataCommandDefinitionMetadataBase-

Returns

Promise<string>

the id of command function queue

Implementation of

EventBridge.registerCommand

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:242


registerSubscription

registerSubscription(subscription, cb): Promise<string>

Register a new subscription

Parameters

NameTypeDescription
subscriptionSubscriptionthe subscription definition
cb(message: EBMessage) => Promise<undefined | Omit<{ contentEncoding: string ; contentType: string ; correlationId?: string ; eventName: string ; id: string ; messageType: CustomMessage ; otp?: string ; payload?: unknown ; principalId?: string ; receiver?: EBMessageAddress ; sender: { instanceId: string ; serviceName: string ; serviceTarget: string ; serviceVersion: string } ; tenantId?: string ; timestamp: number ; traceId?: string }, "id" | "timestamp">>the function to be called if a matching message arrives

Returns

Promise<string>

Implementation of

EventBridge.registerSubscription

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:267


removeAllListeners

removeAllListeners(): void

Returns

void

Inherited from

EventBridgeBaseClass.removeAllListeners

Defined in

core/types/GenericEventEmitter.ts:28


start

start(): Promise<void>

Start the eventbridge and connect to the underlaying message broker

Returns

Promise<void>

Implementation of

EventBridge.start

Overrides

EventBridgeBaseClass.start

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:102


startActiveSpan

startActiveSpan<F>(name, opts, context?, fn): Promise<F>

Start a child span for opentelemetry tracking

Type parameters

Name
F

Parameters

NameTypeDefault valueDescription
namestringundefinedname of span
optsSpanOptionsundefinedspan options
contextundefined | Contextundefinedoptional context
fn(span: Span) => Promise<F>undefinedfunction to be executed within the span

Returns

Promise<F>

return value of fn

Inherited from

EventBridgeBaseClass.startActiveSpan

Defined in

core/EventBridge/EventBridgeBaseClass.impl.ts:82


unregisterCommand

unregisterCommand(address): Promise<void>

Unregister a service command

Parameters

NameTypeDescription
addressEBMessageAddressThe address (service name, version and command name) of the command to be de-registered

Returns

Promise<void>

Implementation of

EventBridge.unregisterCommand

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:262


unregisterSubscription

unregisterSubscription(address): Promise<void>

Parameters

NameType
addressEBMessageAddress

Returns

Promise<void>

Implementation of

EventBridge.unregisterSubscription

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:277


wrapInSpan

wrapInSpan<F>(name, opts, fn, context?): Promise<F>

Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!

This means during logging and similar the spanId of parent span is logged.

Use wrapInSpan for marking points in flow of one bigger function, but not to trace the program flow itself

Type parameters

Name
F

Parameters

NameTypeDescription
namestringname of span
optsSpanOptionsspan options
fn(span: Span) => Promise<F>function te be executed in the span
context?Contextspan context

Returns

Promise<F>

return value of fn

Inherited from

EventBridgeBaseClass.wrapInSpan

Defined in

core/EventBridge/EventBridgeBaseClass.impl.ts:132