Skip to main content

Class: DefaultEventBridge


PURISTA API / Modules / @purista/core / DefaultEventBridge

Class: DefaultEventBridge

@purista/core.DefaultEventBridge

Simple implementation of some simple in-memory event bridge. Does not support threads and does not need any external databases.

Example

import { DefaultEventBridge } from '@purista/core'

const eventBridge = new DefaultEventBridge()
await eventBridge.start()

// add your services

Hierarchy

Implements

Table of contents

Constructors

Properties

Methods

Constructors

constructor

new DefaultEventBridge(config?): DefaultEventBridge

Parameters

NameTypeDescription
config?Object-
config.defaultCommandTimeout?numberOverwrite the hardcoded default timeout of command invocations
config.emitMessagesAsEventBridgeEvents?booleanEmit messages which have an event name set as javascript events on the event bridge instance
config.instanceId?stringThe instance id of the event bridge. If not set, a id will generated each time a instance is created. Use this if there is a need to always have the same instance id.
config.logLevel?LogLevelNameIf no logger instance is given, use this log level
config.logWarnOnMessagesWithoutReceiver?booleanLog warnings on messages which are emitted, but could not delivered to at least one receiver
config.logger?LoggerA logger instance
config.spanProcessor?SpanProcessorA OpenTelemetry span processor

Returns

DefaultEventBridge

Overrides

EventBridgeBaseClass.constructor

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:83open in new window

Properties

config

config: Complete<{ defaultCommandTimeout?: number ; emitMessagesAsEventBridgeEvents?: boolean ; instanceId?: string ; logLevel?: LogLevelName ; logWarnOnMessagesWithoutReceiver?: boolean ; logger?: Logger ; spanProcessor?: SpanProcessor }>

Inherited from

EventBridgeBaseClass.config

Defined in

core/EventBridge/EventBridgeBaseClass.impl.ts:21open in new window


defaultCommandTimeout

defaultCommandTimeout: number

The default time until when a command invocation automatically returns a time out error

Implementation of

EventBridge.defaultCommandTimeout

Inherited from

EventBridgeBaseClass.defaultCommandTimeout

Defined in

core/EventBridge/EventBridgeBaseClass.impl.ts:27open in new window


hasStarted

Protected hasStarted: boolean = false

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:80open in new window


healthy

Protected healthy: boolean = false

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:81open in new window


instanceId

instanceId: string

Implementation of

EventBridge.instanceId

Inherited from

EventBridgeBaseClass.instanceId

Defined in

core/EventBridge/EventBridgeBaseClass.impl.ts:25open in new window


logger

logger: Logger

Inherited from

EventBridgeBaseClass.logger

Defined in

core/EventBridge/EventBridgeBaseClass.impl.ts:18open in new window


name

name: string

Implementation of

EventBridge.name

Inherited from

EventBridgeBaseClass.name

Defined in

core/EventBridge/EventBridgeBaseClass.impl.ts:23open in new window


pendingInvocations

Protected pendingInvocations: Map<string, PendigInvocation>

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:75open in new window


readStream

Protected readStream: Readable

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:63open in new window


runningSubscriptionCount

Protected runningSubscriptionCount: number = 0

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:76open in new window


serviceFunctions

Protected serviceFunctions: Map<string, (message: { contentEncoding: string ; contentType: string ; correlationId: string ; eventName?: string ; id: string ; messageType: Command ; otp?: string ; payload: { parameter: unknown ; payload: unknown } ; principalId?: string ; receiver: EBMessageAddress ; sender: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId?: string ; timestamp: number ; traceId?: string }) => Promise<{ contentEncoding: "utf-8" ; contentType: "application/json" ; correlationId: string ; eventName?: string ; id: string ; isHandledError: boolean ; messageType: CommandErrorResponse ; otp?: string ; payload: { data?: unknown ; message: string ; status: StatusCode } ; principalId?: string ; receiver: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; sender: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId?: string ; timestamp: number ; traceId?: string } | { contentEncoding: string ; contentType: string ; correlationId: string ; eventName?: string ; id: string ; messageType: CommandSuccessResponse ; otp?: string ; payload: unknown ; principalId?: string ; receiver: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; sender: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId?: string ; timestamp: number ; traceId?: string }>>

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:70open in new window


subscriptions

Protected subscriptions: Map<string, SubscriptionStorageEntry>

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:78open in new window


traceProvider

traceProvider: NodeTracerProvider

Inherited from

EventBridgeBaseClass.traceProvider

Defined in

core/EventBridge/EventBridgeBaseClass.impl.ts:19open in new window


writeStream

Protected writeStream: Writable

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:62open in new window

Methods

destroy

destroy(): Promise<void>

Shut down event bridge as gracefully as possible

Returns

Promise<void>

Implementation of

EventBridge.destroy

Overrides

EventBridgeBaseClass.destroy

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:382open in new window


emit

emit<K>(eventName, parameter?): void

Type parameters

NameType
Kextends EventKey<{ eventbridge-connected: never ; eventbridge-connection-error: unknown ; eventbridge-disconnected: never ; eventbridge-error: unknown ; eventbridge-reconnecting: never }>

Parameters

NameType
eventNameK
parameter?{ eventbridge-connected: never ; eventbridge-connection-error: unknown ; eventbridge-disconnected: never ; eventbridge-error: unknown ; eventbridge-reconnecting: never }[K]

Returns

void

Inherited from

EventBridgeBaseClass.emit

Defined in

core/types/GenericEventEmitter.ts:24open in new window


emitMessage

emitMessage(message): Promise<Readonly<EBMessage>>

Emit a new message to event bridge to be delivered to receiver

Parameters

NameTypeDescription
messageOmit<EBMessage, "correlationId" | "id" | "timestamp">EBMessage

Returns

Promise<Readonly<EBMessage>>

Implementation of

EventBridge.emitMessage

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:285open in new window


getTracer

getTracer(): Tracer

Returns open telemetry tracer of this service

Returns

Tracer

Tracer

Inherited from

EventBridgeBaseClass.getTracer

Defined in

core/EventBridge/EventBridgeBaseClass.impl.ts:68open in new window


invoke

invoke<T>(input, commandTimeout?): Promise<T>

Call a command of a service and return the result of this command

Type parameters

Name
T

Parameters

NameTypeDescription
inputOmit<{ contentEncoding: string ; contentType: string ; correlationId: string ; eventName?: string ; id: string ; messageType: Command ; otp?: string ; payload: { parameter: unknown ; payload: unknown } ; principalId?: string ; receiver: EBMessageAddress ; sender: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId?: string ; timestamp: number ; traceId?: string }, "messageType" | "correlationId" | "id" | "timestamp">a partial command message
commandTimeoutnumberthe time to live (timeout) of the invocation

Returns

Promise<T>

Implementation of

EventBridge.invoke

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:326open in new window


isHealthy

isHealthy(): Promise<boolean>

Indicates if the eventbridge is running and works correctly

Returns

Promise<boolean>

Implementation of

EventBridge.isHealthy

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:96open in new window


isReady

isReady(): Promise<boolean>

Indicates if the eventbridge has been started and is connected to underlaying message broker

Returns

Promise<boolean>

Implementation of

EventBridge.isReady

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:92open in new window


off

off<K>(eventName, fn): void

Type parameters

NameType
Kextends EventKey<{ eventbridge-connected: never ; eventbridge-connection-error: unknown ; eventbridge-disconnected: never ; eventbridge-error: unknown ; eventbridge-reconnecting: never }>

Parameters

NameType
eventNameK
fnEventReceiver<{ eventbridge-connected: never ; eventbridge-connection-error: unknown ; eventbridge-disconnected: never ; eventbridge-error: unknown ; eventbridge-reconnecting: never }[K]>

Returns

void

Inherited from

EventBridgeBaseClass.off

Defined in

core/types/GenericEventEmitter.ts:20open in new window


on

on<K>(eventName, fn): void

Type parameters

NameType
Kextends EventKey<{ eventbridge-connected: never ; eventbridge-connection-error: unknown ; eventbridge-disconnected: never ; eventbridge-error: unknown ; eventbridge-reconnecting: never }>

Parameters

NameType
eventNameK
fnEventReceiver<{ eventbridge-connected: never ; eventbridge-connection-error: unknown ; eventbridge-disconnected: never ; eventbridge-error: unknown ; eventbridge-reconnecting: never }[K]>

Returns

void

Inherited from

EventBridgeBaseClass.on

Defined in

core/types/GenericEventEmitter.ts:16open in new window


registerCommand

registerCommand(address, cb, metadata): Promise<string>

Register a service command and ensure that there is a queue for all incoming command requests.

Parameters

NameTypeDescription
addressEBMessageAddressThe service function address
cb(message: { contentEncoding: string ; contentType: string ; correlationId: string ; eventName?: string ; id: string ; messageType: Command ; otp?: string ; payload: { parameter: unknown ; payload: unknown } ; principalId?: string ; receiver: EBMessageAddress ; sender: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId?: string ; timestamp: number ; traceId?: string }) => Promise<{ contentEncoding: "utf-8" ; contentType: "application/json" ; correlationId: string ; eventName?: string ; id: string ; isHandledError: boolean ; messageType: CommandErrorResponse ; otp?: string ; payload: { data?: unknown ; message: string ; status: StatusCode } ; principalId?: string ; receiver: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; sender: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId?: string ; timestamp: number ; traceId?: string } | { contentEncoding: string ; contentType: string ; correlationId: string ; eventName?: string ; id: string ; messageType: CommandSuccessResponse ; otp?: string ; payload: unknown ; principalId?: string ; receiver: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; sender: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId?: string ; timestamp: number ; traceId?: string }>the function to call if a matching command message arrives
metadataCommandDefinitionMetadataBase-

Returns

Promise<string>

the id of command function queue

Implementation of

EventBridge.registerCommand

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:240open in new window


registerSubscription

registerSubscription(subscription, cb): Promise<string>

Register a new subscription

Parameters

NameTypeDescription
subscriptionSubscriptionthe subscription definition
cb(message: EBMessage) => Promise<undefined | Omit<{ contentEncoding: string ; contentType: string ; correlationId?: string ; eventName: string ; id: string ; messageType: CustomMessage ; otp?: string ; payload?: unknown ; principalId?: string ; receiver?: EBMessageAddress ; sender: { serviceName: string; serviceVersion: string; serviceTarget: string; instanceId: string; } ; tenantId?: string ; timestamp: number ; traceId?: string }, "id" | "timestamp">>the function to be called if a matching message arrives

Returns

Promise<string>

Implementation of

EventBridge.registerSubscription

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:265open in new window


removeAllListeners

removeAllListeners(): void

Returns

void

Inherited from

EventBridgeBaseClass.removeAllListeners

Defined in

core/types/GenericEventEmitter.ts:28open in new window


start

start(): Promise<void>

Start the eventbridge and connect to the underlaying message broker

Returns

Promise<void>

Implementation of

EventBridge.start

Overrides

EventBridgeBaseClass.start

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:100open in new window


startActiveSpan

startActiveSpan<F>(name, opts, context?, fn): Promise<F>

Start a child span for opentelemetry tracking

Type parameters

Name
F

Parameters

NameTypeDefault valueDescription
namestringundefinedname of span
optsSpanOptionsundefinedspan options
contextundefined | Contextundefinedoptional context
fn(span: Span) => Promise<F>undefinedfunction to be executed within the span

Returns

Promise<F>

return value of fn

Inherited from

EventBridgeBaseClass.startActiveSpan

Defined in

core/EventBridge/EventBridgeBaseClass.impl.ts:80open in new window


unregisterCommand

unregisterCommand(address): Promise<void>

Unregister a service command

Parameters

NameTypeDescription
addressEBMessageAddressThe address (service name, version and command name) of the command to be de-registered

Returns

Promise<void>

Implementation of

EventBridge.unregisterCommand

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:260open in new window


unregisterSubscription

unregisterSubscription(address): Promise<void>

Parameters

NameType
addressEBMessageAddress

Returns

Promise<void>

Implementation of

EventBridge.unregisterSubscription

Defined in

DefaultEventBridge/DefaultEventBridge.impl.ts:275open in new window


wrapInSpan

wrapInSpan<F>(name, opts, fn, context?): Promise<F>

Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!

This means during logging and similar the spanId of parent span is logged.

Use wrapInSpan for marking points in flow of one bigger function, but not to trace the program flow itself

Type parameters

Name
F

Parameters

NameTypeDescription
namestringname of span
optsSpanOptionsspan options
fn(span: Span) => Promise<F>function te be executed in the span
context?Contextspan context

Returns

Promise<F>

return value of fn

Inherited from

EventBridgeBaseClass.wrapInSpan

Defined in

core/EventBridge/EventBridgeBaseClass.impl.ts:130open in new window

Last update:
Contributors: Sebastian Wessel