Skip to content

PURISTA API


PURISTA API / @purista/core / DefaultEventBridge

Class: DefaultEventBridge

Defined in: DefaultEventBridge/DefaultEventBridge.impl.ts:73

Simple implementation of some simple in-memory event bridge. Does not support threads and does not need any external databases.

Example

typescript
import { DefaultEventBridge } from '@purista/core'

const eventBridge = new DefaultEventBridge()
await eventBridge.start()

// add your services

Extends

Implements

Constructors

Constructor

new DefaultEventBridge(config?): DefaultEventBridge

Defined in: DefaultEventBridge/DefaultEventBridge.impl.ts:104

Parameters

config?
defaultCommandTimeout?

number

Overwrite the hardcoded default timeout of command invocations

instanceId?

string

The instance id of the event bridge. If not set, a id will generated each time a instance is created. Use this if there is a need to always have the same instance id.

logger?

Logger

logLevel?

LogLevelName

logWarnOnMessagesWithoutReceiver?

boolean

Log warnings on messages which are emitted, but could not delivered to at least one receiver

metrics?

PuristaMetricsRuntimeOptions

metricsRecorder?

PuristaMetricsRecorderInterface

spanProcessor?

SpanProcessor

Returns

DefaultEventBridge

Overrides

EventBridgeBaseClass.constructor

Properties

capabilities

capabilities: EventBridgeCapabilities

Defined in: core/EventBridge/EventBridgeBaseClass.impl.ts:52

Implementation of

EventBridge.capabilities

Inherited from

EventBridgeBaseClass.capabilities


config

config: Complete<EventBridgeConfig<ConfigType>>

Defined in: core/EventBridge/EventBridgeBaseClass.impl.ts:49

Inherited from

EventBridgeBaseClass.config


defaultCommandTimeout

defaultCommandTimeout: number

Defined in: core/EventBridge/EventBridgeBaseClass.impl.ts:89

The default time until when a command invocation automatically returns a time out error

Implementation of

EventBridge.defaultCommandTimeout

Inherited from

EventBridgeBaseClass.defaultCommandTimeout


hasStarted

protected hasStarted: boolean = false

Defined in: DefaultEventBridge/DefaultEventBridge.impl.ts:101


healthy

protected healthy: boolean = false

Defined in: DefaultEventBridge/DefaultEventBridge.impl.ts:102


inFlightExecutions

protected readonly inFlightExecutions: InFlightExecutionTracker

Defined in: core/EventBridge/EventBridgeBaseClass.impl.ts:90

Inherited from

EventBridgeBaseClass.inFlightExecutions


instanceId

instanceId: string

Defined in: core/EventBridge/EventBridgeBaseClass.impl.ts:87

Implementation of

EventBridge.instanceId

Inherited from

EventBridgeBaseClass.instanceId


logger

logger: Logger

Defined in: core/EventBridge/EventBridgeBaseClass.impl.ts:45

Inherited from

EventBridgeBaseClass.logger


metricsRecorder

protected metricsRecorder: PuristaMetricsRecorderInterface

Defined in: core/EventBridge/EventBridgeBaseClass.impl.ts:47

Inherited from

EventBridgeBaseClass.metricsRecorder


name

name: string

Defined in: core/EventBridge/EventBridgeBaseClass.impl.ts:51

Implementation of

EventBridge.name

Inherited from

EventBridgeBaseClass.name


pendingInvocations

protected pendingInvocations: PendingInvocationRegistry<unknown>

Defined in: DefaultEventBridge/DefaultEventBridge.impl.ts:88


pendingStreams

protected pendingStreams: PendingStreamRegistry<any, any>

Defined in: DefaultEventBridge/DefaultEventBridge.impl.ts:93


readStream

protected readStream: Readable

Defined in: DefaultEventBridge/DefaultEventBridge.impl.ts:75


serviceFunctions

protected serviceFunctions: Map<string, (message) => Promise<{ contentEncoding: "utf-8"; contentType: "application/json"; correlationId: string; eventName?: string; id: string; isHandledError: boolean; messageType: CommandErrorResponse; otp?: string; payload: { data?: unknown; message: string; status: StatusCode; }; principalId?: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; } | { contentEncoding: string; contentType: string; correlationId: string; eventName?: string; id: string; messageType: CommandSuccessResponse; otp?: string; payload: unknown; principalId?: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; }>>

Defined in: DefaultEventBridge/DefaultEventBridge.impl.ts:82


streamFunctions

protected streamFunctions: Map<string, (message) => Promise<void>>

Defined in: DefaultEventBridge/DefaultEventBridge.impl.ts:87


subscriptions

protected subscriptions: Map<string, SubscriptionStorageEntry>

Defined in: DefaultEventBridge/DefaultEventBridge.impl.ts:99


traceProvider

traceProvider: NodeTracerProvider

Defined in: core/EventBridge/EventBridgeBaseClass.impl.ts:46

Inherited from

EventBridgeBaseClass.traceProvider


writeStream

protected writeStream: Writable

Defined in: DefaultEventBridge/DefaultEventBridge.impl.ts:74

Methods

destroy()

destroy(): Promise<void>

Defined in: DefaultEventBridge/DefaultEventBridge.impl.ts:583

Shut down event bridge as gracefully as possible

Returns

Promise<void>

Implementation of

EventBridge.destroy

Overrides

EventBridgeBaseClass.destroy


emitMessage()

emitMessage(message): Promise<Readonly<EBMessage>>

Defined in: DefaultEventBridge/DefaultEventBridge.impl.ts:440

Emit a new message to event bridge to be delivered to receiver

Parameters

message

Omit<EBMessage, "id" | "timestamp" | "correlationId">

EBMessage

Returns

Promise<Readonly<EBMessage>>

Implementation of

EventBridge.emitMessage


getInFlightExecutionCount()

getInFlightExecutionCount(): number

Defined in: core/EventBridge/EventBridgeBaseClass.impl.ts:263

Number of currently running handlers across all work kinds.

Returns

number

Implementation of

EventBridge.getInFlightExecutionCount

Inherited from

EventBridgeBaseClass.getInFlightExecutionCount


getInFlightExecutionCounts()

getInFlightExecutionCounts(): InFlightExecutionCounts

Defined in: core/EventBridge/EventBridgeBaseClass.impl.ts:267

Number of currently running handlers grouped by work kind.

Returns

InFlightExecutionCounts

Implementation of

EventBridge.getInFlightExecutionCounts

Inherited from

EventBridgeBaseClass.getInFlightExecutionCounts


getPausedSubscriptionConsumers()

getPausedSubscriptionConsumers(): PausedSubscriptionConsumersByRegistrationKey

Defined in: core/EventBridge/EventBridgeBaseClass.impl.ts:271

Returns paused subscription consumer states keyed by adapter registration key.

Returns

PausedSubscriptionConsumersByRegistrationKey

Implementation of

EventBridge.getPausedSubscriptionConsumers

Inherited from

EventBridgeBaseClass.getPausedSubscriptionConsumers


getTracer()

getTracer(): Tracer

Defined in: core/EventBridge/EventBridgeBaseClass.impl.ts:141

Returns open telemetry tracer of this service

Returns

Tracer

Tracer

Inherited from

EventBridgeBaseClass.getTracer


invoke()

invoke<T>(input, commandTimeout?): Promise<T>

Defined in: DefaultEventBridge/DefaultEventBridge.impl.ts:477

Call a command of a service and return the result of this command

Type Parameters

T

T

Parameters

input

Omit<Command, "id" | "messageType" | "timestamp" | "correlationId">

a partial command message

commandTimeout?

number = ...

the time to live (timeout) of the invocation

Returns

Promise<T>

Implementation of

EventBridge.invoke


isHealthy()

isHealthy(): Promise<boolean>

Defined in: DefaultEventBridge/DefaultEventBridge.impl.ts:151

Indicates if the eventbridge is running and works correctly

Returns

Promise<boolean>

Implementation of

EventBridge.isHealthy


isReady()

isReady(): Promise<boolean>

Defined in: DefaultEventBridge/DefaultEventBridge.impl.ts:147

Indicates if the eventbridge has been started and is connected to underlaying message broker

Returns

Promise<boolean>

Implementation of

EventBridge.isReady


openStream()

openStream<Chunk, Final>(input, commandTimeout?): Promise<StreamHandle<Chunk, Final>>

Defined in: DefaultEventBridge/DefaultEventBridge.impl.ts:519

Open a stream invocation. The returned handle can be consumed via async iteration and can be cancelled by caller.

Type Parameters

Chunk

Chunk = unknown

Final

Final = unknown

Parameters

input

Omit<StreamOpenRequest, "id" | "messageType" | "timestamp" | "correlationId">

commandTimeout?

number = ...

Returns

Promise<StreamHandle<Chunk, Final>>

Implementation of

EventBridge.openStream

Overrides

EventBridgeBaseClass.openStream


registerCommand()

registerCommand(address, cb, metadata): Promise<string>

Defined in: DefaultEventBridge/DefaultEventBridge.impl.ts:364

Register a service command and ensure that there is a queue for all incoming command requests.

Parameters

address

EBMessageAddress

The service function address

cb

(message) => Promise<{ contentEncoding: "utf-8"; contentType: "application/json"; correlationId: string; eventName?: string; id: string; isHandledError: boolean; messageType: CommandErrorResponse; otp?: string; payload: { data?: unknown; message: string; status: StatusCode; }; principalId?: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; } | { contentEncoding: string; contentType: string; correlationId: string; eventName?: string; id: string; messageType: CommandSuccessResponse; otp?: string; payload: unknown; principalId?: string; receiver: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; }>

the function to call if a matching command message arrives

metadata

CommandDefinitionMetadataBase

Returns

Promise<string>

the id of command function queue

Implementation of

EventBridge.registerCommand


registerStream()

registerStream(address, cb, metadata): Promise<string>

Defined in: DefaultEventBridge/DefaultEventBridge.impl.ts:384

Register a service stream.

Parameters

address

EBMessageAddress

cb

(message) => Promise<void>

metadata

StreamDefinitionMetadataBase

Returns

Promise<string>

Implementation of

EventBridge.registerStream

Overrides

EventBridgeBaseClass.registerStream


registerSubscription()

registerSubscription(subscription, cb): Promise<string>

Defined in: DefaultEventBridge/DefaultEventBridge.impl.ts:418

Register a new subscription

Parameters

subscription

Subscription

the subscription definition

cb

(message) => Promise<Omit<{ contentEncoding: string; contentType: string; correlationId?: string; eventName: string; id: string; messageType: CustomMessage; otp?: string; payload?: unknown; principalId?: string; receiver?: EBMessageAddress; sender: { instanceId: string; serviceName: string; serviceTarget: string; serviceVersion: string; }; tenantId?: string; timestamp: number; traceId?: string; }, "id" | "timestamp"> | undefined>

the function to be called if a matching message arrives

Returns

Promise<string>

Implementation of

EventBridge.registerSubscription


resumeSubscriptionConsumer()

resumeSubscriptionConsumer(_registrationKey): Promise<void>

Defined in: core/EventBridge/EventBridgeBaseClass.impl.ts:275

Resumes a paused subscription consumer by registration key.

Parameters

_registrationKey

string

Returns

Promise<void>

Implementation of

EventBridge.resumeSubscriptionConsumer

Inherited from

EventBridgeBaseClass.resumeSubscriptionConsumer


runInFlight()

runInFlight<T>(fn, kind?): Promise<T>

Defined in: core/EventBridge/EventBridgeBaseClass.impl.ts:252

Type Parameters

T

T

Parameters

fn

() => Promise<T>

kind?

"stream" | "command" | "subscription" | "generic"

Returns

Promise<T>

Inherited from

EventBridgeBaseClass.runInFlight


start()

start(): Promise<void>

Defined in: DefaultEventBridge/DefaultEventBridge.impl.ts:155

Start the eventbridge and connect to the underlaying message broker

Returns

Promise<void>

Implementation of

EventBridge.start

Overrides

EventBridgeBaseClass.start


startActiveSpan()

startActiveSpan<F>(name, opts, context, fn): Promise<F>

Defined in: core/EventBridge/EventBridgeBaseClass.impl.ts:153

Start a child span for opentelemetry tracking

Type Parameters

F

F

Parameters

name

string

name of span

opts

SpanOptions

span options

context

Context | undefined

optional context

fn

(span) => Promise<F>

function to be executed within the span

Returns

Promise<F>

return value of fn

Inherited from

EventBridgeBaseClass.startActiveSpan


unregisterCommand()

unregisterCommand(address): Promise<void>

Defined in: DefaultEventBridge/DefaultEventBridge.impl.ts:404

Unregister a service command

Parameters

address

EBMessageAddress

The address (service name, version and command name) of the command to be de-registered

Returns

Promise<void>

Implementation of

EventBridge.unregisterCommand


unregisterStream()

unregisterStream(address): Promise<void>

Defined in: DefaultEventBridge/DefaultEventBridge.impl.ts:411

Unregister a service stream

Parameters

address

EBMessageAddress

Returns

Promise<void>

Implementation of

EventBridge.unregisterStream

Overrides

EventBridgeBaseClass.unregisterStream


unregisterSubscription()

unregisterSubscription(address): Promise<void>

Defined in: DefaultEventBridge/DefaultEventBridge.impl.ts:428

Parameters

address

EBMessageAddress

Returns

Promise<void>

Implementation of

EventBridge.unregisterSubscription


waitForInFlightDrain()

waitForInFlightDrain(timeoutMs?): Promise<boolean>

Defined in: core/EventBridge/EventBridgeBaseClass.impl.ts:259

Parameters

timeoutMs?

number = ...

Returns

Promise<boolean>

Inherited from

EventBridgeBaseClass.waitForInFlightDrain


wrapInSpan()

wrapInSpan<F>(name, opts, fn, context?): Promise<F>

Defined in: core/EventBridge/EventBridgeBaseClass.impl.ts:226

Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!

This means during logging and similar the spanId of parent span is logged.

Use wrapInSpan for marking points in flow of one bigger function, but not to trace the program flow itself

Type Parameters

F

F

Parameters

name

string

name of span

opts

SpanOptions

span options

fn

(span) => Promise<F>

function te be executed in the span

context?

Context

span context

Returns

Promise<F>

return value of fn

Inherited from

EventBridgeBaseClass.wrapInSpan