PURISTA API / Modules / @purista/core / DefaultEventBridge
Class: DefaultEventBridge
@purista/core.DefaultEventBridge
Simple implementation of some simple in-memory event bridge. Does not support threads and does not need any external databases.
Example
import { DefaultEventBridge } from '@purista/core'
const eventBridge = new DefaultEventBridge()
await eventBridge.start()
// add your services
Hierarchy
EventBridgeBaseClass
<DefaultEventBridgeConfig
>↳
DefaultEventBridge
Implements
Table of contents
Constructors
Properties
- config
- defaultCommandTimeout
- hasStarted
- healthy
- instanceId
- logger
- name
- pendingInvocations
- readStream
- runningSubscriptionCount
- serviceFunctions
- subscriptions
- traceProvider
- writeStream
Methods
- destroy
- emit
- emitMessage
- getTracer
- invoke
- isHealthy
- isReady
- off
- on
- registerCommand
- registerSubscription
- removeAllListeners
- start
- startActiveSpan
- unregisterCommand
- unregisterSubscription
- wrapInSpan
Constructors
constructor
• new DefaultEventBridge(config?
): DefaultEventBridge
Parameters
Name | Type | Description |
---|---|---|
config? | Object | - |
config.defaultCommandTimeout? | number | Overwrite the hardcoded default timeout of command invocations |
config.emitMessagesAsEventBridgeEvents? | boolean | Emit messages which have an event name set as javascript events on the event bridge instance |
config.instanceId? | string | The instance id of the event bridge. If not set, a id will generated each time a instance is created. Use this if there is a need to always have the same instance id. |
config.logLevel? | LogLevelName | If no logger instance is given, use this log level |
config.logWarnOnMessagesWithoutReceiver? | boolean | Log warnings on messages which are emitted, but could not delivered to at least one receiver |
config.logger? | Logger | A logger instance |
config.spanProcessor? | SpanProcessor | A OpenTelemetry span processor |
Returns
Overrides
EventBridgeBaseClass.constructor
Defined in
DefaultEventBridge/DefaultEventBridge.impl.ts:85
Properties
config
• config: Complete
<{ defaultCommandTimeout?
: number
; emitMessagesAsEventBridgeEvents?
: boolean
; instanceId?
: string
; logLevel?
: LogLevelName
; logWarnOnMessagesWithoutReceiver?
: boolean
; logger?
: Logger
; spanProcessor?
: SpanProcessor
}>
Inherited from
Defined in
core/EventBridge/EventBridgeBaseClass.impl.ts:23
defaultCommandTimeout
• defaultCommandTimeout: number
The default time until when a command invocation automatically returns a time out error
Implementation of
EventBridge.defaultCommandTimeout
Inherited from
EventBridgeBaseClass.defaultCommandTimeout
Defined in
core/EventBridge/EventBridgeBaseClass.impl.ts:29
hasStarted
• Protected
hasStarted: boolean
= false
Defined in
DefaultEventBridge/DefaultEventBridge.impl.ts:82
healthy
• Protected
healthy: boolean
= false
Defined in
DefaultEventBridge/DefaultEventBridge.impl.ts:83
instanceId
• instanceId: string
Implementation of
Inherited from
EventBridgeBaseClass.instanceId
Defined in
core/EventBridge/EventBridgeBaseClass.impl.ts:27
logger
• logger: Logger
Inherited from
Defined in
core/EventBridge/EventBridgeBaseClass.impl.ts:20
name
• name: string
Implementation of
Inherited from
Defined in
core/EventBridge/EventBridgeBaseClass.impl.ts:25
pendingInvocations
• Protected
pendingInvocations: Map
<string
, PendigInvocation
>
Defined in
DefaultEventBridge/DefaultEventBridge.impl.ts:77
readStream
• Protected
readStream: Readable
Defined in
DefaultEventBridge/DefaultEventBridge.impl.ts:65
runningSubscriptionCount
• Protected
runningSubscriptionCount: number
= 0
Defined in
DefaultEventBridge/DefaultEventBridge.impl.ts:78
serviceFunctions
• Protected
serviceFunctions: Map
<string
, (message
: { contentEncoding
: string
; contentType
: string
; correlationId
: string
; eventName?
: string
; id
: string
; messageType
: Command
; otp?
: string
; payload
: { parameter
: unknown
; payload
: unknown
} ; principalId?
: string
; receiver
: EBMessageAddress
; sender
: { instanceId
: string
; serviceName
: string
; serviceTarget
: string
; serviceVersion
: string
} ; tenantId?
: string
; timestamp
: number
; traceId?
: string
}) => Promise
<{ contentEncoding
: "utf-8"
; contentType
: "application/json"
; correlationId
: string
; eventName?
: string
; id
: string
; isHandledError
: boolean
; messageType
: CommandErrorResponse
; otp?
: string
; payload
: { data?
: unknown
; message
: string
; status
: StatusCode
} ; principalId?
: string
; receiver
: { instanceId
: string
; serviceName
: string
; serviceTarget
: string
; serviceVersion
: string
} ; sender
: { instanceId
: string
; serviceName
: string
; serviceTarget
: string
; serviceVersion
: string
} ; tenantId?
: string
; timestamp
: number
; traceId?
: string
} | { contentEncoding
: string
; contentType
: string
; correlationId
: string
; eventName?
: string
; id
: string
; messageType
: CommandSuccessResponse
; otp?
: string
; payload
: unknown
; principalId?
: string
; receiver
: { instanceId
: string
; serviceName
: string
; serviceTarget
: string
; serviceVersion
: string
} ; sender
: { instanceId
: string
; serviceName
: string
; serviceTarget
: string
; serviceVersion
: string
} ; tenantId?
: string
; timestamp
: number
; traceId?
: string
}>>
Defined in
DefaultEventBridge/DefaultEventBridge.impl.ts:72
subscriptions
• Protected
subscriptions: Map
<string
, SubscriptionStorageEntry
>
Defined in
DefaultEventBridge/DefaultEventBridge.impl.ts:80
traceProvider
• traceProvider: NodeTracerProvider
Inherited from
EventBridgeBaseClass.traceProvider
Defined in
core/EventBridge/EventBridgeBaseClass.impl.ts:21
writeStream
• Protected
writeStream: Writable
Defined in
DefaultEventBridge/DefaultEventBridge.impl.ts:64
Methods
destroy
▸ destroy(): Promise
<void
>
Shut down event bridge as gracefully as possible
Returns
Promise
<void
>
Implementation of
Overrides
Defined in
DefaultEventBridge/DefaultEventBridge.impl.ts:385
emit
▸ emit<K
>(eventName
, parameter?
): void
Type parameters
Name | Type |
---|---|
K | extends EventKey <{ eventbridge-connected : never ; eventbridge-connection-error : unknown ; eventbridge-disconnected : never ; eventbridge-error : unknown ; eventbridge-reconnecting : never }> |
Parameters
Name | Type |
---|---|
eventName | K |
parameter? | { eventbridge-connected : never ; eventbridge-connection-error : unknown ; eventbridge-disconnected : never ; eventbridge-error : unknown ; eventbridge-reconnecting : never }[K ] |
Returns
void
Inherited from
Defined in
core/types/GenericEventEmitter.ts:24
emitMessage
▸ emitMessage(message
): Promise
<Readonly
<EBMessage
>>
Emit a new message to event bridge to be delivered to receiver
Parameters
Name | Type | Description |
---|---|---|
message | Omit <EBMessage , "correlationId" | "id" | "timestamp" > | EBMessage |
Returns
Promise
<Readonly
<EBMessage
>>
Implementation of
Defined in
DefaultEventBridge/DefaultEventBridge.impl.ts:287
getTracer
▸ getTracer(): Tracer
Returns open telemetry tracer of this service
Returns
Tracer
Tracer
Inherited from
EventBridgeBaseClass.getTracer
Defined in
core/EventBridge/EventBridgeBaseClass.impl.ts:70
invoke
▸ invoke<T
>(input
, commandTimeout?
): Promise
<T
>
Call a command of a service and return the result of this command
Type parameters
Name |
---|
T |
Parameters
Name | Type | Description |
---|---|---|
input | Omit <{ contentEncoding : string ; contentType : string ; correlationId : string ; eventName? : string ; id : string ; messageType : Command ; otp? : string ; payload : { parameter : unknown ; payload : unknown } ; principalId? : string ; receiver : EBMessageAddress ; sender : { instanceId : string ; serviceName : string ; serviceTarget : string ; serviceVersion : string } ; tenantId? : string ; timestamp : number ; traceId? : string }, "messageType" | "correlationId" | "id" | "timestamp" > | a partial command message |
commandTimeout | number | the time to live (timeout) of the invocation |
Returns
Promise
<T
>
Implementation of
Defined in
DefaultEventBridge/DefaultEventBridge.impl.ts:328
isHealthy
▸ isHealthy(): Promise
<boolean
>
Indicates if the eventbridge is running and works correctly
Returns
Promise
<boolean
>
Implementation of
Defined in
DefaultEventBridge/DefaultEventBridge.impl.ts:98
isReady
▸ isReady(): Promise
<boolean
>
Indicates if the eventbridge has been started and is connected to underlaying message broker
Returns
Promise
<boolean
>
Implementation of
Defined in
DefaultEventBridge/DefaultEventBridge.impl.ts:94
off
▸ off<K
>(eventName
, fn
): void
Type parameters
Name | Type |
---|---|
K | extends EventKey <{ eventbridge-connected : never ; eventbridge-connection-error : unknown ; eventbridge-disconnected : never ; eventbridge-error : unknown ; eventbridge-reconnecting : never }> |
Parameters
Name | Type |
---|---|
eventName | K |
fn | EventReceiver <{ eventbridge-connected : never ; eventbridge-connection-error : unknown ; eventbridge-disconnected : never ; eventbridge-error : unknown ; eventbridge-reconnecting : never }[K ]> |
Returns
void
Inherited from
Defined in
core/types/GenericEventEmitter.ts:20
on
▸ on<K
>(eventName
, fn
): void
Type parameters
Name | Type |
---|---|
K | extends EventKey <{ eventbridge-connected : never ; eventbridge-connection-error : unknown ; eventbridge-disconnected : never ; eventbridge-error : unknown ; eventbridge-reconnecting : never }> |
Parameters
Name | Type |
---|---|
eventName | K |
fn | EventReceiver <{ eventbridge-connected : never ; eventbridge-connection-error : unknown ; eventbridge-disconnected : never ; eventbridge-error : unknown ; eventbridge-reconnecting : never }[K ]> |
Returns
void
Inherited from
Defined in
core/types/GenericEventEmitter.ts:16
registerCommand
▸ registerCommand(address
, cb
, metadata
): Promise
<string
>
Register a service command and ensure that there is a queue for all incoming command requests.
Parameters
Name | Type | Description |
---|---|---|
address | EBMessageAddress | The service function address |
cb | (message : { contentEncoding : string ; contentType : string ; correlationId : string ; eventName? : string ; id : string ; messageType : Command ; otp? : string ; payload : { parameter : unknown ; payload : unknown } ; principalId? : string ; receiver : EBMessageAddress ; sender : { instanceId : string ; serviceName : string ; serviceTarget : string ; serviceVersion : string } ; tenantId? : string ; timestamp : number ; traceId? : string }) => Promise <{ contentEncoding : "utf-8" ; contentType : "application/json" ; correlationId : string ; eventName? : string ; id : string ; isHandledError : boolean ; messageType : CommandErrorResponse ; otp? : string ; payload : { data? : unknown ; message : string ; status : StatusCode } ; principalId? : string ; receiver : { instanceId : string ; serviceName : string ; serviceTarget : string ; serviceVersion : string } ; sender : { instanceId : string ; serviceName : string ; serviceTarget : string ; serviceVersion : string } ; tenantId? : string ; timestamp : number ; traceId? : string } | { contentEncoding : string ; contentType : string ; correlationId : string ; eventName? : string ; id : string ; messageType : CommandSuccessResponse ; otp? : string ; payload : unknown ; principalId? : string ; receiver : { instanceId : string ; serviceName : string ; serviceTarget : string ; serviceVersion : string } ; sender : { instanceId : string ; serviceName : string ; serviceTarget : string ; serviceVersion : string } ; tenantId? : string ; timestamp : number ; traceId? : string }> | the function to call if a matching command message arrives |
metadata | CommandDefinitionMetadataBase | - |
Returns
Promise
<string
>
the id of command function queue
Implementation of
Defined in
DefaultEventBridge/DefaultEventBridge.impl.ts:242
registerSubscription
▸ registerSubscription(subscription
, cb
): Promise
<string
>
Register a new subscription
Parameters
Name | Type | Description |
---|---|---|
subscription | Subscription | the subscription definition |
cb | (message : EBMessage ) => Promise <undefined | Omit <{ contentEncoding : string ; contentType : string ; correlationId? : string ; eventName : string ; id : string ; messageType : CustomMessage ; otp? : string ; payload? : unknown ; principalId? : string ; receiver? : EBMessageAddress ; sender : { instanceId : string ; serviceName : string ; serviceTarget : string ; serviceVersion : string } ; tenantId? : string ; timestamp : number ; traceId? : string }, "id" | "timestamp" >> | the function to be called if a matching message arrives |
Returns
Promise
<string
>
Implementation of
EventBridge.registerSubscription
Defined in
DefaultEventBridge/DefaultEventBridge.impl.ts:267
removeAllListeners
▸ removeAllListeners(): void
Returns
void
Inherited from
EventBridgeBaseClass.removeAllListeners
Defined in
core/types/GenericEventEmitter.ts:28
start
▸ start(): Promise
<void
>
Start the eventbridge and connect to the underlaying message broker
Returns
Promise
<void
>
Implementation of
Overrides
Defined in
DefaultEventBridge/DefaultEventBridge.impl.ts:102
startActiveSpan
▸ startActiveSpan<F
>(name
, opts
, context?
, fn
): Promise
<F
>
Start a child span for opentelemetry tracking
Type parameters
Name |
---|
F |
Parameters
Name | Type | Default value | Description |
---|---|---|---|
name | string | undefined | name of span |
opts | SpanOptions | undefined | span options |
context | undefined | Context | undefined | optional context |
fn | (span : Span ) => Promise <F > | undefined | function to be executed within the span |
Returns
Promise
<F
>
return value of fn
Inherited from
EventBridgeBaseClass.startActiveSpan
Defined in
core/EventBridge/EventBridgeBaseClass.impl.ts:82
unregisterCommand
▸ unregisterCommand(address
): Promise
<void
>
Unregister a service command
Parameters
Name | Type | Description |
---|---|---|
address | EBMessageAddress | The address (service name, version and command name) of the command to be de-registered |
Returns
Promise
<void
>
Implementation of
Defined in
DefaultEventBridge/DefaultEventBridge.impl.ts:262
unregisterSubscription
▸ unregisterSubscription(address
): Promise
<void
>
Parameters
Name | Type |
---|---|
address | EBMessageAddress |
Returns
Promise
<void
>
Implementation of
EventBridge.unregisterSubscription
Defined in
DefaultEventBridge/DefaultEventBridge.impl.ts:277
wrapInSpan
▸ wrapInSpan<F
>(name
, opts
, fn
, context?
): Promise
<F
>
Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!
This means during logging and similar the spanId of parent span is logged.
Use wrapInSpan for marking points in flow of one bigger function, but not to trace the program flow itself
Type parameters
Name |
---|
F |
Parameters
Name | Type | Description |
---|---|---|
name | string | name of span |
opts | SpanOptions | span options |
fn | (span : Span ) => Promise <F > | function te be executed in the span |
context? | Context | span context |
Returns
Promise
<F
>
return value of fn
Inherited from
EventBridgeBaseClass.wrapInSpan