Temporal to PURISTA
To be able to connect PURISTA commands, and to emit PURISTA events, we need to create a connection from Temporal to the eventbridge.
But before connecting Temporal to PURISTA, we will create a command in our PURISTA application, which gets invoked from a Temporal workflow.
In this example, it is expected that you have created a service Email
with a command sendEmailVerification
. Also, we will use the NATS event bridge.
Connect worker to eventbridge
We need to connect the worker in Temporal to the eventbridge used in our PURISTA application.
We add a few lines to src/temporal/worker.ts
import { fileURLToPath } from 'node:url'
import { Context } from '@temporalio/activity'
import { NativeConnection, Worker } from '@temporalio/worker'
import type { EventBridge } from '@purista/core'
import { initLogger } from '@purista/core'
import { NatsBridge } from '@purista/natsbridge'
import temporalConfig from '../config/temporalConfig.js'
import natsBridgeConfig from '../config/natsBridgeConfig.js'
import * as activities from './activities/index.js'
export type ActivitiesType = typeof activities
async function run() {
const logger = initLogger('debug')
// inject eventBride and logger
const eventBridge = new NatsBridge({ ...natsBridgeConfig, logger })
await eventBridge.start()
const connection = await NativeConnection.connect(temporalConfig.connect)
const worker = await Worker.create({
connection,
namespace: temporalConfig.namespace,
taskQueue: temporalConfig.taskQueue,
workflowsPath: fileURLToPath(new URL('./workflows', import.meta.url)),
activities: {
...activities,
},
})
await worker.run()
}
run().catch((err) => {
// eslint-disable-next-line no-console
console.error(err)
process.exit(1)
})
To make our life easier, we create a simple helper function getInvoke
in file src/temporal/getInvoke.ts
.
import type { EventBridge } from '@purista/core'
import { Context } from '@temporalio/activity'
// a small get which returns the invoke function
export const getInvoke =
(eventBridge: EventBridge) =>
async <Output = unknown>(
serviceName: string,
serviceVersion: string,
serviceTarget: string,
payload: unknown,
parameter = {},
): Promise<Output> => {
const ctx = Context.current()
return eventBridge.invoke<Output>({
sender: {
serviceName: ctx.info.workflowType,
serviceVersion: '1',
serviceTarget: ctx.info.activityType,
instanceId: eventBridge.instanceId,
},
receiver: {
serviceName,
serviceVersion,
serviceTarget,
},
payload: {
payload,
parameter,
},
contentEncoding: 'application/json',
contentType: 'utf-8',
})
}
Add an activity
Before we can use an activity in a workflow, we must register the activity in the worker.
import { fileURLToPath } from 'node:url'
import { Context } from '@temporalio/activity'
import { NativeConnection, Worker } from '@temporalio/worker'
import type { EventBridge } from '@purista/core'
import { initLogger } from '@purista/core'
import { NatsBridge } from '@purista/natsbridge'
import temporalConfig from '../config/temporalConfig.js'
import natsBridgeConfig from '../config/natsBridgeConfig.js'
import * as activities from './activities/index.js'
const getPuristaBasedActivities = (eventbridge: EventBridge) => {
const invoke = getInvoke(eventBridge)
return {
sendEmailVerification: (email: string) =>
invoke<{ userId: string }> // our helper with return type
('Email', '1', 'sendVerificationEmail', { email }),
}
}
export type ActivitiesType = typeof activities & ReturnType<typeof getPuristaBasedActivities>
async function run() {
const logger = initLogger('debug')
// inject eventBride and logger
const eventBridge = new NatsBridge({ ...natsBridgeConfig, logger })
await eventBridge.start()
const connection = await NativeConnection.connect(temporalConfig.connect)
const worker = await Worker.create({
connection,
namespace: temporalConfig.namespace,
taskQueue: temporalConfig.taskQueue,
workflowsPath: fileURLToPath(new URL('./workflows', import.meta.url)),
activities: {
...activities,
...getPuristaBasedActivities(eventBridge)
},
})
await worker.run()
}
run().catch((err) => {
// eslint-disable-next-line no-console
console.error(err)
process.exit(1)
})
Use an activity
Our workflow currently does nothing instead logging the input.
We will now add an activity, which calls a PURISTA command and returns the commands result.
import { proxyActivities } from '@temporalio/workflow'
import type { ActivitiesType } from '../worker.js'
const { } = proxyActivities<ActivitiesType>({
const { sendEmailVerification } = proxyActivities<ActivitiesType>({
startToCloseTimeout: '1 minute',
})
export async function onboardingWorkflow(input: unknown): Promise<void> {
export async function onboardingWorkflow(input: { email: string } ): Promise<void> {
const result = await sendEmailVerification(input.email)
console.log(input)
console.log(result)
}
After starting the PURISTA application and the Temporal worker, you should be able to call the register
endpoint via the OpenAPI UI with a payload like this:
{
"email": "mail@example.com"
}
The onboardingWorkflow
will start, execute the activity sendEmailVerification
, which invokes the PURISTA command sendEmailVerification
of service Email
. The response of that command invocation will be logged by the workflow.
We now have the communication from Temoral to the PURISTA application, and the opposite from the PURISTA application to Temporal.