Skip to content

@purista/core v2.0.5


PURISTA API / @purista/core / DefaultEventBridge

Class: DefaultEventBridge

Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:63

Simple implementation of some simple in-memory event bridge. Does not support threads and does not need any external databases.

Example

typescript
import { DefaultEventBridge } from '@purista/core'

const eventBridge = new DefaultEventBridge()
await eventBridge.start()

// add your services

Extends

Implements

Constructors

new DefaultEventBridge()

new DefaultEventBridge(config?): DefaultEventBridge

Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:85

Parameters

config?
defaultCommandTimeout?

number

Overwrite the hardcoded default timeout of command invocations

emitMessagesAsEventBridgeEvents?

boolean

Emit messages which have an event name set as javascript events on the event bridge instance

instanceId?

string

The instance id of the event bridge. If not set, a id will generated each time a instance is created. Use this if there is a need to always have the same instance id.

logger?

Logger

A logger instance

logLevel?

LogLevelName

If no logger instance is given, use this log level

logWarnOnMessagesWithoutReceiver?

boolean

Log warnings on messages which are emitted, but could not delivered to at least one receiver

spanProcessor?

SpanProcessor

A OpenTelemetry span processor

Returns

DefaultEventBridge

Overrides

EventBridgeBaseClass.constructor

Properties

config

config: Complete<{ defaultCommandTimeout: number; emitMessagesAsEventBridgeEvents: boolean; instanceId: string; logger: Logger; logLevel: LogLevelName; logWarnOnMessagesWithoutReceiver: boolean; spanProcessor: SpanProcessor; }>

Defined in: packages/core/src/core/EventBridge/EventBridgeBaseClass.impl.ts:23

Inherited from

EventBridgeBaseClass.config


defaultCommandTimeout

defaultCommandTimeout: number

Defined in: packages/core/src/core/EventBridge/EventBridgeBaseClass.impl.ts:29

The default time until when a command invocation automatically returns a time out error

Implementation of

EventBridge.defaultCommandTimeout

Inherited from

EventBridgeBaseClass.defaultCommandTimeout


hasStarted

protected hasStarted: boolean = false

Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:82


healthy

protected healthy: boolean = false

Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:83


instanceId

instanceId: string

Defined in: packages/core/src/core/EventBridge/EventBridgeBaseClass.impl.ts:27

Implementation of

EventBridge.instanceId

Inherited from

EventBridgeBaseClass.instanceId


logger

logger: Logger

Defined in: packages/core/src/core/EventBridge/EventBridgeBaseClass.impl.ts:20

Inherited from

EventBridgeBaseClass.logger


name

name: string

Defined in: packages/core/src/core/EventBridge/EventBridgeBaseClass.impl.ts:25

Implementation of

EventBridge.name

Inherited from

EventBridgeBaseClass.name


pendingInvocations

protected pendingInvocations: Map<string, PendigInvocation>

Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:77


readStream

protected readStream: Readable

Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:65


runningSubscriptionCount

protected runningSubscriptionCount: number = 0

Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:78


serviceFunctions

protected serviceFunctions: Map<string, (message) => Promise<{ contentEncoding: "utf-8"; contentType: "application/json"; correlationId: string; eventName: string; id: string; isHandledError: boolean; messageType: CommandErrorResponse; otp: string; payload: { data: unknown; message: string; status: StatusCode; }; principalId: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId: string; timestamp: number; traceId: string; } | { contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: CommandSuccessResponse; otp: string; payload: unknown; principalId: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId: string; timestamp: number; traceId: string; }>>

Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:72


subscriptions

protected subscriptions: Map<string, SubscriptionStorageEntry>

Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:80


traceProvider

traceProvider: NodeTracerProvider

Defined in: packages/core/src/core/EventBridge/EventBridgeBaseClass.impl.ts:21

Inherited from

EventBridgeBaseClass.traceProvider


writeStream

protected writeStream: Writable

Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:64

Methods

destroy()

destroy(): Promise<void>

Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:385

Shut down event bridge as gracefully as possible

Returns

Promise<void>

Implementation of

EventBridge.destroy

Overrides

EventBridgeBaseClass.destroy


emit()

emit<K>(eventName, parameter?): void

Defined in: packages/core/src/core/types/GenericEventEmitter.ts:24

Type Parameters

K extends EventKey<{ [key: custom-${string}]: unknown; [key: adapter-${string}]: unknown; eventbridge-connected: never; eventbridge-connection-error: unknown; eventbridge-disconnected: never; eventbridge-error: unknown; eventbridge-reconnecting: never; }>

Parameters

eventName

K

parameter?

object[K]

Returns

void

Inherited from

EventBridgeBaseClass.emit


emitMessage()

emitMessage(message): Promise<Readonly<EBMessage>>

Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:287

Emit a new message to event bridge to be delivered to receiver

Parameters

message

Omit<EBMessage, "correlationId" | "id" | "timestamp">

EBMessage

Returns

Promise<Readonly<EBMessage>>

Implementation of

EventBridge.emitMessage


getTracer()

getTracer(): Tracer

Defined in: packages/core/src/core/EventBridge/EventBridgeBaseClass.impl.ts:70

Returns open telemetry tracer of this service

Returns

Tracer

Tracer

Inherited from

EventBridgeBaseClass.getTracer


invoke()

invoke<T>(input, commandTimeout): Promise<T>

Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:328

Call a command of a service and return the result of this command

Type Parameters

T

Parameters

input

Omit<{ contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: Command; otp: string; payload: { parameter: unknown; payload: unknown; }; principalId: string; receiver: EBMessageAddress; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId: string; timestamp: number; traceId: string; }, "messageType" | "correlationId" | "id" | "timestamp">

a partial command message

commandTimeout

number = ...

the time to live (timeout) of the invocation

Returns

Promise<T>

Implementation of

EventBridge.invoke


isHealthy()

isHealthy(): Promise<boolean>

Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:98

Indicates if the eventbridge is running and works correctly

Returns

Promise<boolean>

Implementation of

EventBridge.isHealthy


isReady()

isReady(): Promise<boolean>

Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:94

Indicates if the eventbridge has been started and is connected to underlaying message broker

Returns

Promise<boolean>

Implementation of

EventBridge.isReady


off()

off<K>(eventName, fn): void

Defined in: packages/core/src/core/types/GenericEventEmitter.ts:20

Type Parameters

K extends EventKey<{ [key: custom-${string}]: unknown; [key: adapter-${string}]: unknown; eventbridge-connected: never; eventbridge-connection-error: unknown; eventbridge-disconnected: never; eventbridge-error: unknown; eventbridge-reconnecting: never; }>

Parameters

eventName

K

fn

EventReceiver<object[K]>

Returns

void

Inherited from

EventBridgeBaseClass.off


on()

on<K>(eventName, fn): void

Defined in: packages/core/src/core/types/GenericEventEmitter.ts:16

Type Parameters

K extends EventKey<{ [key: custom-${string}]: unknown; [key: adapter-${string}]: unknown; eventbridge-connected: never; eventbridge-connection-error: unknown; eventbridge-disconnected: never; eventbridge-error: unknown; eventbridge-reconnecting: never; }>

Parameters

eventName

K

fn

EventReceiver<object[K]>

Returns

void

Inherited from

EventBridgeBaseClass.on


registerCommand()

registerCommand(address, cb, metadata): Promise<string>

Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:242

Register a service command and ensure that there is a queue for all incoming command requests.

Parameters

address

EBMessageAddress

The service function address

cb

(message) => Promise<{ contentEncoding: "utf-8"; contentType: "application/json"; correlationId: string; eventName: string; id: string; isHandledError: boolean; messageType: CommandErrorResponse; otp: string; payload: { data: unknown; message: string; status: StatusCode; }; principalId: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId: string; timestamp: number; traceId: string; } | { contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: CommandSuccessResponse; otp: string; payload: unknown; principalId: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId: string; timestamp: number; traceId: string; }>

the function to call if a matching command message arrives

metadata

CommandDefinitionMetadataBase

Returns

Promise<string>

the id of command function queue

Implementation of

EventBridge.registerCommand


registerSubscription()

registerSubscription(subscription, cb): Promise<string>

Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:267

Register a new subscription

Parameters

subscription

Subscription

the subscription definition

cb

(message) => Promise<undefined | Omit<{ contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: CustomMessage; otp: string; payload: unknown; principalId: string; receiver: EBMessageAddress; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId: string; timestamp: number; traceId: string; }, "id" | "timestamp">>

the function to be called if a matching message arrives

Returns

Promise<string>

Implementation of

EventBridge.registerSubscription


removeAllListeners()

removeAllListeners(): void

Defined in: packages/core/src/core/types/GenericEventEmitter.ts:28

Returns

void

Inherited from

EventBridgeBaseClass.removeAllListeners


start()

start(): Promise<void>

Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:102

Start the eventbridge and connect to the underlaying message broker

Returns

Promise<void>

Implementation of

EventBridge.start

Overrides

EventBridgeBaseClass.start


startActiveSpan()

startActiveSpan<F>(name, opts, context, fn): Promise<F>

Defined in: packages/core/src/core/EventBridge/EventBridgeBaseClass.impl.ts:82

Start a child span for opentelemetry tracking

Type Parameters

F

Parameters

name

string

name of span

opts

SpanOptions

span options

context

optional context

undefined | Context

fn

(span) => Promise<F>

function to be executed within the span

Returns

Promise<F>

return value of fn

Inherited from

EventBridgeBaseClass.startActiveSpan


unregisterCommand()

unregisterCommand(address): Promise<void>

Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:262

Unregister a service command

Parameters

address

EBMessageAddress

The address (service name, version and command name) of the command to be de-registered

Returns

Promise<void>

Implementation of

EventBridge.unregisterCommand


unregisterSubscription()

unregisterSubscription(address): Promise<void>

Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:277

Parameters

address

EBMessageAddress

Returns

Promise<void>

Implementation of

EventBridge.unregisterSubscription


wrapInSpan()

wrapInSpan<F>(name, opts, fn, context?): Promise<F>

Defined in: packages/core/src/core/EventBridge/EventBridgeBaseClass.impl.ts:132

Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!

This means during logging and similar the spanId of parent span is logged.

Use wrapInSpan for marking points in flow of one bigger function, but not to trace the program flow itself

Type Parameters

F

Parameters

name

string

name of span

opts

SpanOptions

span options

fn

(span) => Promise<F>

function te be executed in the span

context?

Context

span context

Returns

Promise<F>

return value of fn

Inherited from

EventBridgeBaseClass.wrapInSpan