PURISTA API / @purista/core / DefaultEventBridge
Class: DefaultEventBridge
Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:63
Simple implementation of some simple in-memory event bridge. Does not support threads and does not need any external databases.
Example
import { DefaultEventBridge } from '@purista/core'
const eventBridge = new DefaultEventBridge()
await eventBridge.start()
// add your services
Extends
Implements
Constructors
new DefaultEventBridge()
new DefaultEventBridge(
config
?):DefaultEventBridge
Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:85
Parameters
config?
defaultCommandTimeout?
number
Overwrite the hardcoded default timeout of command invocations
emitMessagesAsEventBridgeEvents?
boolean
Emit messages which have an event name set as javascript events on the event bridge instance
instanceId?
string
The instance id of the event bridge. If not set, a id will generated each time a instance is created. Use this if there is a need to always have the same instance id.
logger?
A logger instance
logLevel?
If no logger instance is given, use this log level
logWarnOnMessagesWithoutReceiver?
boolean
Log warnings on messages which are emitted, but could not delivered to at least one receiver
spanProcessor?
SpanProcessor
A OpenTelemetry span processor
Returns
Overrides
EventBridgeBaseClass
.constructor
Properties
config
config:
Complete
<{defaultCommandTimeout
:number
;emitMessagesAsEventBridgeEvents
:boolean
;instanceId
:string
;logger
:Logger
;logLevel
:LogLevelName
;logWarnOnMessagesWithoutReceiver
:boolean
;spanProcessor
:SpanProcessor
; }>
Defined in: packages/core/src/core/EventBridge/EventBridgeBaseClass.impl.ts:23
Inherited from
defaultCommandTimeout
defaultCommandTimeout:
number
Defined in: packages/core/src/core/EventBridge/EventBridgeBaseClass.impl.ts:29
The default time until when a command invocation automatically returns a time out error
Implementation of
EventBridge
.defaultCommandTimeout
Inherited from
EventBridgeBaseClass
.defaultCommandTimeout
hasStarted
protected
hasStarted:boolean
=false
Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:82
healthy
protected
healthy:boolean
=false
Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:83
instanceId
instanceId:
string
Defined in: packages/core/src/core/EventBridge/EventBridgeBaseClass.impl.ts:27
Implementation of
Inherited from
EventBridgeBaseClass
.instanceId
logger
logger:
Logger
Defined in: packages/core/src/core/EventBridge/EventBridgeBaseClass.impl.ts:20
Inherited from
name
name:
string
Defined in: packages/core/src/core/EventBridge/EventBridgeBaseClass.impl.ts:25
Implementation of
Inherited from
pendingInvocations
protected
pendingInvocations:Map
<string
,PendigInvocation
>
Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:77
readStream
protected
readStream:Readable
Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:65
runningSubscriptionCount
protected
runningSubscriptionCount:number
=0
Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:78
serviceFunctions
protected
serviceFunctions:Map
<string
, (message
) =>Promise
<{contentEncoding
:"utf-8"
;contentType
:"application/json"
;correlationId
:string
;eventName
:string
;id
:string
;isHandledError
:boolean
;messageType
:CommandErrorResponse
;otp
:string
;payload
: {data
:unknown
;message
:string
;status
:StatusCode
; };principalId
:string
;receiver
: {instanceId
:string
;serviceName
:string
;serviceTarget
:string
;serviceVersion
:string
; };sender
: {instanceId
:string
;serviceName
:string
;serviceTarget
:string
;serviceVersion
:string
; };tenantId
:string
;timestamp
:number
;traceId
:string
; } | {contentEncoding
:string
;contentType
:string
;correlationId
:string
;eventName
:string
;id
:string
;messageType
:CommandSuccessResponse
;otp
:string
;payload
:unknown
;principalId
:string
;receiver
: {instanceId
:string
;serviceName
:string
;serviceTarget
:string
;serviceVersion
:string
; };sender
: {instanceId
:string
;serviceName
:string
;serviceTarget
:string
;serviceVersion
:string
; };tenantId
:string
;timestamp
:number
;traceId
:string
; }>>
Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:72
subscriptions
protected
subscriptions:Map
<string
,SubscriptionStorageEntry
>
Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:80
traceProvider
traceProvider:
NodeTracerProvider
Defined in: packages/core/src/core/EventBridge/EventBridgeBaseClass.impl.ts:21
Inherited from
EventBridgeBaseClass
.traceProvider
writeStream
protected
writeStream:Writable
Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:64
Methods
destroy()
destroy():
Promise
<void
>
Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:385
Shut down event bridge as gracefully as possible
Returns
Promise
<void
>
Implementation of
Overrides
emit()
emit<
K
>(eventName
,parameter
?):void
Defined in: packages/core/src/core/types/GenericEventEmitter.ts:24
Type Parameters
• K extends EventKey
<{ [key:
custom-${string}]
: unknown
; [key:
adapter-${string}]
: unknown
; eventbridge-connected
: never
; eventbridge-connection-error
: unknown
; eventbridge-disconnected
: never
; eventbridge-error
: unknown
; eventbridge-reconnecting
: never
; }>
Parameters
eventName
K
parameter?
object
[K
]
Returns
void
Inherited from
emitMessage()
emitMessage(
message
):Promise
<Readonly
<EBMessage
>>
Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:287
Emit a new message to event bridge to be delivered to receiver
Parameters
message
Omit
<EBMessage
, "correlationId"
| "id"
| "timestamp"
>
EBMessage
Returns
Promise
<Readonly
<EBMessage
>>
Implementation of
getTracer()
getTracer():
Tracer
Defined in: packages/core/src/core/EventBridge/EventBridgeBaseClass.impl.ts:70
Returns open telemetry tracer of this service
Returns
Tracer
Tracer
Inherited from
EventBridgeBaseClass
.getTracer
invoke()
invoke<
T
>(input
,commandTimeout
):Promise
<T
>
Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:328
Call a command of a service and return the result of this command
Type Parameters
• T
Parameters
input
Omit
<{ contentEncoding
: string
; contentType
: string
; correlationId
: string
; eventName
: string
; id
: string
; messageType
: Command
; otp
: string
; payload
: { parameter
: unknown
; payload
: unknown
; }; principalId
: string
; receiver
: EBMessageAddress
; sender
: { instanceId
: string
; serviceName
: string
; serviceTarget
: string
; serviceVersion
: string
; }; tenantId
: string
; timestamp
: number
; traceId
: string
; }, "messageType"
| "correlationId"
| "id"
| "timestamp"
>
a partial command message
commandTimeout
number
= ...
the time to live (timeout) of the invocation
Returns
Promise
<T
>
Implementation of
isHealthy()
isHealthy():
Promise
<boolean
>
Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:98
Indicates if the eventbridge is running and works correctly
Returns
Promise
<boolean
>
Implementation of
isReady()
isReady():
Promise
<boolean
>
Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:94
Indicates if the eventbridge has been started and is connected to underlaying message broker
Returns
Promise
<boolean
>
Implementation of
off()
off<
K
>(eventName
,fn
):void
Defined in: packages/core/src/core/types/GenericEventEmitter.ts:20
Type Parameters
• K extends EventKey
<{ [key:
custom-${string}]
: unknown
; [key:
adapter-${string}]
: unknown
; eventbridge-connected
: never
; eventbridge-connection-error
: unknown
; eventbridge-disconnected
: never
; eventbridge-error
: unknown
; eventbridge-reconnecting
: never
; }>
Parameters
eventName
K
fn
EventReceiver
<object
[K
]>
Returns
void
Inherited from
on()
on<
K
>(eventName
,fn
):void
Defined in: packages/core/src/core/types/GenericEventEmitter.ts:16
Type Parameters
• K extends EventKey
<{ [key:
custom-${string}]
: unknown
; [key:
adapter-${string}]
: unknown
; eventbridge-connected
: never
; eventbridge-connection-error
: unknown
; eventbridge-disconnected
: never
; eventbridge-error
: unknown
; eventbridge-reconnecting
: never
; }>
Parameters
eventName
K
fn
EventReceiver
<object
[K
]>
Returns
void
Inherited from
registerCommand()
registerCommand(
address
,cb
,metadata
):Promise
<string
>
Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:242
Register a service command and ensure that there is a queue for all incoming command requests.
Parameters
address
The service function address
cb
(message
) => Promise
<{ contentEncoding
: "utf-8"
; contentType
: "application/json"
; correlationId
: string
; eventName
: string
; id
: string
; isHandledError
: boolean
; messageType
: CommandErrorResponse
; otp
: string
; payload
: { data
: unknown
; message
: string
; status
: StatusCode
; }; principalId
: string
; receiver
: { instanceId
: string
; serviceName
: string
; serviceTarget
: string
; serviceVersion
: string
; }; sender
: { instanceId
: string
; serviceName
: string
; serviceTarget
: string
; serviceVersion
: string
; }; tenantId
: string
; timestamp
: number
; traceId
: string
; } | { contentEncoding
: string
; contentType
: string
; correlationId
: string
; eventName
: string
; id
: string
; messageType
: CommandSuccessResponse
; otp
: string
; payload
: unknown
; principalId
: string
; receiver
: { instanceId
: string
; serviceName
: string
; serviceTarget
: string
; serviceVersion
: string
; }; sender
: { instanceId
: string
; serviceName
: string
; serviceTarget
: string
; serviceVersion
: string
; }; tenantId
: string
; timestamp
: number
; traceId
: string
; }>
the function to call if a matching command message arrives
metadata
Returns
Promise
<string
>
the id of command function queue
Implementation of
registerSubscription()
registerSubscription(
subscription
,cb
):Promise
<string
>
Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:267
Register a new subscription
Parameters
subscription
the subscription definition
cb
(message
) => Promise
<undefined
| Omit
<{ contentEncoding
: string
; contentType
: string
; correlationId
: string
; eventName
: string
; id
: string
; messageType
: CustomMessage
; otp
: string
; payload
: unknown
; principalId
: string
; receiver
: EBMessageAddress
; sender
: { instanceId
: string
; serviceName
: string
; serviceTarget
: string
; serviceVersion
: string
; }; tenantId
: string
; timestamp
: number
; traceId
: string
; }, "id"
| "timestamp"
>>
the function to be called if a matching message arrives
Returns
Promise
<string
>
Implementation of
EventBridge
.registerSubscription
removeAllListeners()
removeAllListeners():
void
Defined in: packages/core/src/core/types/GenericEventEmitter.ts:28
Returns
void
Inherited from
EventBridgeBaseClass
.removeAllListeners
start()
start():
Promise
<void
>
Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:102
Start the eventbridge and connect to the underlaying message broker
Returns
Promise
<void
>
Implementation of
Overrides
startActiveSpan()
startActiveSpan<
F
>(name
,opts
,context
,fn
):Promise
<F
>
Defined in: packages/core/src/core/EventBridge/EventBridgeBaseClass.impl.ts:82
Start a child span for opentelemetry tracking
Type Parameters
• F
Parameters
name
string
name of span
opts
SpanOptions
span options
context
optional context
undefined
| Context
fn
(span
) => Promise
<F
>
function to be executed within the span
Returns
Promise
<F
>
return value of fn
Inherited from
EventBridgeBaseClass
.startActiveSpan
unregisterCommand()
unregisterCommand(
address
):Promise
<void
>
Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:262
Unregister a service command
Parameters
address
The address (service name, version and command name) of the command to be de-registered
Returns
Promise
<void
>
Implementation of
unregisterSubscription()
unregisterSubscription(
address
):Promise
<void
>
Defined in: packages/core/src/DefaultEventBridge/DefaultEventBridge.impl.ts:277
Parameters
address
Returns
Promise
<void
>
Implementation of
EventBridge
.unregisterSubscription
wrapInSpan()
wrapInSpan<
F
>(name
,opts
,fn
,context
?):Promise
<F
>
Defined in: packages/core/src/core/EventBridge/EventBridgeBaseClass.impl.ts:132
Start span for opentelemetry tracking on same level. The created span will not become the "active" span within opentelemetry!
This means during logging and similar the spanId of parent span is logged.
Use wrapInSpan for marking points in flow of one bigger function, but not to trace the program flow itself
Type Parameters
• F
Parameters
name
string
name of span
opts
SpanOptions
span options
fn
(span
) => Promise
<F
>
function te be executed in the span
context?
Context
span context
Returns
Promise
<F
>
return value of fn