Classes · @purista/core
DefaultEventBridge
Process-local in-memory event bridge for development and tests.
Signature
class DefaultEventBridge Examples
import { DefaultEventBridge } from '@purista/core'
const eventBridge = new DefaultEventBridge()
await eventBridge.start()
// add your services Constructors
1 entry
constructor
Constructor
new constructor(config?: { defaultCommandTimeout: number; instanceId: string; logger: Logger; logLevel: LogLevelName; logWarnOnMessagesWithoutReceiver: boolean; metrics: PuristaMetricsRuntimeOptions; ... }) Properties
18 entries
capabilities
Property
capabilities: EventBridgeCapabilities Runtime capability matrix used for strict startup validation.
config
Property
config: Complete<EventBridgeConfig<ConfigType>> defaultCommandTimeout
Property
defaultCommandTimeout: number The default time until when a command invocation automatically returns a time out error
hasStarted
Property
hasStarted: boolean healthy
Property
healthy: boolean inFlightExecutions
Property
inFlightExecutions: InFlightExecutionTracker instanceId
Property
instanceId: string Stable runtime instance id used to distinguish bridge processes.
logger
Property
logger: Logger metricsRecorder
Property
metricsRecorder: PuristaMetricsRecorder name
Property
name: string Human-readable bridge name used in logs, traces, and metrics.
pendingInvocations
Property
pendingInvocations: PendingInvocationRegistry<unknown> pendingStreams
Property
pendingStreams: PendingStreamRegistry<any, any> readStream
Property
readStream: Readable serviceFunctions
Property
serviceFunctions: Map<string, (message: { contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: Command; ... }) => Promise<{ contentEncoding: unknown; contentType: unknown; correlationId: unknown; eventName: unknown; id: unknown; isHandledError: unknown; ... } | { contentEncoding: unknown; contentType: unknown; correlationId: unknown; eventName: unknown; id: unknown; messageType: unknown; ... }>> streamFunctions
Property
streamFunctions: Map<string, (message: StreamMessage) => Promise<void>> subscriptions
Property
subscriptions: Map<string, SubscriptionStorageEntry> traceProvider
Property
traceProvider: NodeTracerProvider writeStream
Property
writeStream: Writable Methods
22 entries
destroy
Method
destroy(): Promise<void> Shut down event bridge as gracefully as possible
emitMessage
Method
emitMessage(message: Omit<EBMessage, "id" | "timestamp" | "correlationId">): Promise<Readonly<EBMessage>> Emit a new message to event bridge to be delivered to receiver
getInFlightExecutionCount
Method
getInFlightExecutionCount(): number Number of currently running handlers across all work kinds.
getInFlightExecutionCounts
Method
getInFlightExecutionCounts(): InFlightExecutionCounts Number of currently running handlers grouped by work kind.
getPausedSubscriptionConsumers
Method
getPausedSubscriptionConsumers(): PausedSubscriptionConsumersByRegistrationKey Returns paused subscription consumer states keyed by adapter registration key.
getTracer
Method
getTracer(): Tracer Returns open telemetry tracer of this service
invoke
Method
invoke<T>(input: Omit<Command, "id" | "messageType" | "timestamp" | "correlationId">, commandTimeout: number): Promise<T> Call a command of a service and return the result of this command
isHealthy
Method
isHealthy(): Promise<boolean> Indicates if the eventbridge is running and works correctly
isReady
Method
isReady(): Promise<boolean> Indicates if the eventbridge has been started and is connected to underlaying message broker
openStream
Method
openStream<Chunk, Final>(input: Omit<StreamOpenRequest, "id" | "messageType" | "timestamp" | "correlationId">, commandTimeout: number): Promise<StreamHandle<Chunk, Final>> Open a stream invocation.
registerCommand
Method
registerCommand(address: EBMessageAddress, cb: (message: { contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: Command; ... }) => Promise<{ contentEncoding: "utf-8"; contentType: "application/json"; correlationId: string; eventName: string; id: string; isHandledError: boolean; ... } | { contentEncoding: string; contentType: string; correlationId: string; eventName: string; id: string; messageType: CommandSuccessResponse; ... }>, metadata: CommandDefinitionMetadataBase): Promise<string> Register a service command and ensure that there is a queue for all incoming command requests.
registerStream
Method
registerStream(address: EBMessageAddress, cb: (message: StreamMessage) => Promise<void>, metadata: StreamDefinitionMetadataBase): Promise<string> Register a service stream handler for a service target.
registerSubscription
Method
registerSubscription(subscription: Subscription, cb: (message: EBMessage) => Promise<Omit<{ contentEncoding: unknown; contentType: unknown; correlationId: unknown; eventName: unknown; id: unknown; messageType: unknown; ... }, unknown | unknown> | undefined>): Promise<string> Register a new subscription.
resumeSubscriptionConsumer
Method
resumeSubscriptionConsumer(_registrationKey: string): Promise<void> Resumes a paused subscription consumer by registration key.
runInFlight
Method
runInFlight<T>(fn: () => Promise<T>, kind: "stream" | "command" | "subscription" | "generic"): Promise<T> start
Method
start(): Promise<void> Start the eventbridge and connect to the underlaying message broker
startActiveSpan
Method
startActiveSpan<F>(name: string, opts: SpanOptions, context: Context | undefined, fn: (span: Span) => Promise<F>): Promise<F> Start a child span for opentelemetry tracking
unregisterCommand
Method
unregisterCommand(address: EBMessageAddress): Promise<void> Unregister a service command
unregisterStream
Method
unregisterStream(address: EBMessageAddress): Promise<void> Unregister a service stream
unregisterSubscription
Method
unregisterSubscription(address: EBMessageAddress): Promise<void> Unregister a subscription.
waitForInFlightDrain
Method
waitForInFlightDrain(timeoutMs: number): Promise<boolean> wrapInSpan
Method
wrapInSpan<F>(name: string, opts: SpanOptions, fn: (span: Span) => Promise<F>, context?: Context): Promise<F> Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!