Run & Invoke Agents
An agent build result is inert until runtime dependencies are bound via getInstance(...).
This page focuses on four runtime concerns:
- definition vs runtime instance
- bootstrap and start/stop
- invocation patterns (context + standalone)
- async queue execution and worker concurrency
Definition vs runtime instance
An AgentBuilder result (supportAgent) is a definition only: schema contract, handler logic, allowed tools, and endpoint metadata.
getInstance(eventBridge, options) is where runtime wiring happens:
- concrete model provider instances
- concrete session/knowledge adapters
- logger/tracer integration
- runtime pool size (
poolConfig)
This separation keeps business logic stable and moves deployment/runtime settings to startup code.
Bootstrap the instance
import { DefaultEventBridge } from '@purista/core'
import { AiSdkProvider } from '@purista/ai'
import { createOpenAI } from '@ai-sdk/openai'
import { supportAgent } from './agents/supportAgent/v1/supportAgent.js'
const eventBridge = new DefaultEventBridge()
await eventBridge.start()
const openai = createOpenAI({ apiKey: process.env.OPENAI_API_KEY! })
const provider = new AiSdkProvider({
model: openai('gpt-4o-mini'),
systemPrompt: 'You are a friendly support engineer.',
defaults: { temperature: 0.2 },
})
const supportAgentInstance = await supportAgent.getInstance(eventBridge, {
models: {
'openai:gpt-4o-mini': provider,
},
sessionStore: aiConversationStore,
knowledgeAdapters: {
supportFaq: faqKnowledgeAdapter,
},
logger,
tracer,
poolConfig: {
poolId: 'support',
maxWorkers: 4,
},
})
await supportAgentInstance.start()eventBridgeis mandatory; every agent registers an internal service (<agentName>.run).modelsmust satisfy aliases declared via.defineModel(...)in the agent builder.sessionStore/knowledgeAdaptersdefine whatcontext.conversation,context.session, andcontext.knowledgeaccess in the handler.- When not provided, session stores, knowledge adapters, and pool managers default to in-memory implementations.
- If the agent definition declares knowledge aliases via
.useKnowledgeAdapter(...), TypeScript requiresknowledgeAdaptersingetInstance(...).
How runtime injection maps to handler access
// definition
export const supportAgent = new AgentBuilder({ agentName: 'supportAgent', agentVersion: '1' })
.defineModel('openai:gpt-4o-mini')
.useKnowledgeAdapter('supportFaq')
.setHandler(async (context, payload) => {
// from getInstance(..., { models })
const model = context.models['openai:gpt-4o-mini']
// from getInstance(..., { knowledgeAdapters: { supportFaq: ... } })
const docs = await context.knowledge.supportFaq.query(payload.prompt, { limit: 3 })
// from getInstance(..., { sessionStore })
await context.conversation.addUser(payload.prompt)
return { message: `${docs.length} docs` }
})
.build()Definition decides aliases/capabilities; instance injection provides concrete implementations behind those aliases. Knowledge operations automatically receive scope metadata (tenantId, principalId, agentName, agentVersion, sessionId) from the current message/session context.
Runtime options reference
getInstance(eventBridge, options) supports:
| Option | Purpose | Typical choice | Notes |
|---|---|---|---|
models | bind model aliases to provider instances | required in real workloads | fail-fast when a declared alias is missing |
poolConfig.poolId | select execution pool namespace | explicit per workload class | defaults to agent:<agentName> |
poolConfig.maxWorkers | cap parallel runs per process/instance | 1 locally, tuned in prod | runtime/deploy setting, not hardcoded |
concurrencyHints.replicaCountHint | optional host replica hint for telemetry | set by deployment bootstrap | informational only (no runtime admission control) |
sessionStore | persistence backend for conversation/session state | in-memory locally, Redis/DB in prod | context.conversation uses this backend |
knowledgeAdapters | RAG/document adapters by alias | in-memory or vector-store-backed | must match aliases used by builder |
logger, tracer, spanProcessor | observability integration | inherit app defaults | keeps agent telemetry aligned with services |
config, resources | custom app-specific dependencies | optional | use sparingly to keep handlers focused |
Read-only runtime status snapshot
Agent instances expose read-only pool status for ops endpoints and dashboards:
import { getAgentRuntimeStatuses } from '@purista/ai'
const status = supportAgentInstance.getStatus()
// { poolId, maxWorkersPerInstance, activeWorkers, waitingWorkers, ... }
const all = getAgentRuntimeStatuses([supportAgentInstance, triageAgentInstance])This is informational only. Admission control still happens through per-instance pool limits (poolConfig.maxWorkers).
Runtime pool config (important)
Pool identity and parallelism are runtime/deploy settings:
const supportAgentInstance = await supportAgent.getInstance(eventBridge, {
models: { 'openai:gpt-4o-mini': provider },
poolConfig: {
poolId: 'support', // optional; defaults to agent:<agentName>
maxWorkers: 4, // default is 1
},
})maxWorkers controls how many agent runs can execute in parallel for that agent instance.
maxWorkers is always per instance/process.
System-wide estimate:
effectiveMaxConcurrency = replicas * maxWorkersPerInstance
- default is
1(safe baseline) - keep this low in local/dev
- tune this in deployment config for production
- use separate pools when different agent workloads need isolation
Operational rule of thumb:
- queue controls how much work is waiting
maxWorkerscontrols how much work runs now- deployment replicas multiply the total available agent slots
- provider/API rate limits still apply downstream
Queue, pool, worker flow
flowchart LR
A["Caller (HTTP/Command/Subscription)"] --> B["Agent Invoke"]
B --> C["Queue (optional)"]
C --> D["PoolManager (poolId, maxWorkers)"]
D --> E["Worker Slot"]
E --> F["Agent Handler"]
F --> G["Model Provider"]
F --> H["Tools/Knowledge/Session"]
F --> I["Protocol Frames + Final Result"]sequenceDiagram
participant C as Caller
participant Q as Queue
participant P as PoolManager
participant A as Agent
participant M as Model
C->>Q: enqueue run (optional)
Q->>P: request slot
P-->>Q: slot granted
Q->>A: execute run
A->>M: generate / stream
M-->>A: tokens + output
A-->>C: protocol frames + final response
A->>P: release slotInvoke an agent programmatically
1. Integrated Service Pattern (Recommended)
When working inside Commands, Subscriptions, or Streams, use the .canInvokeAgent builder method. This integrates the agent into the functional context with full type safety.
const supportInvokePayloadSchema = z.object({
message: z.string().min(1),
})
const supportInvokeParameterSchema = z.object({
channel: z.enum(['command', 'queue']),
locale: z.string().optional(),
})
export const notifyCommand = supportServiceBuilder
.getCommandBuilder('notifySupportAgent', 'Runs the support agent from a command')
.canInvokeAgent('supportAgent', '1', {
payloadSchema: supportInvokePayloadSchema,
parameterSchema: supportInvokeParameterSchema,
})
.addPayloadSchema(supportInputSchema)
.setCommandFunction(async function (context, payload) {
// 1. Get the final result
const result = await context.invokeAgent.supportAgent['1']
.call({ message: payload.prompt }, { channel: 'command' })
.final()
// 2. Or stream frames manually
const invocation = context.invokeAgent.supportAgent['1']
.call({ message: payload.prompt }, { channel: 'command' })
for await (const frame of invocation) {
context.logger.info({ frame }, 'Agent frame received')
}
return result
})The .call() method returns an AgentInvocation object which is an AsyncIterable yielding protocol frames as they arrive. .final() is collector sugar over the same stream session.
What .canInvokeAgent(...) validates
.canInvokeAgent(agentName, agentVersion, config?) supports:
payloadSchema: types + validates the first.call(payload, parameter)argumentparameterSchema: types + validates the second.call(payload, parameter)argumentpayloadis the agent input payload (primary business input)parameteris side-channel metadata (for example channel/locale/feature flags)
const invokeParameterSchema = z.object({
channel: z.enum(['command', 'queue']),
locale: z.string().optional(),
})
commandBuilder
.canInvokeAgent('supportAgent', '1', {
payloadSchema: z.object({ message: z.string() }),
parameterSchema: invokeParameterSchema,
})
.setCommandFunction(async context => {
return context.invokeAgent.supportAgent['1']
.call(
{ message: 'Need help with refund' },
{ channel: 'command', locale: 'en-US' }, // validated against schema
)
.final()
})Legacy shorthand still works:
.canInvokeAgent('supportAgent', '1', invokeParameterSchema)That only configures parameterSchema.
Payload vs parameter (short rule)
Use this split consistently:
payload: business input for the agent run (prompt/message, domain fields)parameter: invocation metadata controlled by caller (channel, locale, flags)
Without explicit payloadSchema, payload falls back to core AgentProtocolPayload shape (message, optional history, optional attachments, passthrough for extra fields). That means minimal calls still work:
await context.invokeAgent.supportAgent['1']
.call({ message: payload.prompt }, { channel: 'command' })
.final()If you need stricter payload typing per agent, enforce it at the agent boundary with the agent payload schema (.addPayloadSchema(...)) and keep caller-side TypeScript helpers local to your app.
2. Standalone Invocation
The helper invokeAgent (from @purista/ai) mirrors invokeCommand but opens a stream session first and collects protocol envelopes for you. It only falls back to command invoke when stream support is unavailable. This is ideal for scripts, manual triggers, or controllers where you don't have a Purista context.
import { invokeAgent } from '@purista/ai'
const result = await invokeAgent({
eventBridge,
agentName: 'supportAgent',
agentVersion: '1',
payload: { prompt: 'How do I reset my password?' },
sessionId: 'chat-123',
parameter: { locale: 'en' },
})
for (const envelope of result) {
console.log(envelope.frame.kind, envelope.frame)
}Use the optional stream argument to attach a responder that processes frames as the agent emits them (ideal for WebSockets or web streams). If sessionId is provided and payload is an object, invokeAgent injects it automatically when missing so implicit context.conversation / context.session resolution works without manual payload wiring.
invokeAgent options reference
| Option | Purpose | Use case |
|---|---|---|
agentName, agentVersion | target agent | required for every invocation |
payload | main input | same shape as agent payload schema |
parameter | optional side-channel input | locale, channel, feature flags |
sessionId | stable conversation identity | continue existing conversation across invocations |
principalId, tenantId | identity/multi-tenant isolation | per-user or per-tenant memory partitioning |
correlationId | trace correlation | linking runs to upstream workflows |
timeoutMs | invoke timeout | fail faster for synchronous APIs |
stream | receive frames incrementally | websockets/custom transports |
HTTP exposure
.exposeAsHttpEndpoint('POST', 'agents/supportAgent') automatically adds an endpoint to your generated OpenAPI spec. The endpoint behaves like any streaming command:
export const supportAgent = new AgentBuilder({ ... })
.exposeAsHttpEndpoint('POST', 'agents/supportAgent')
.setHandler(...)
.build()SSE is the default streaming mode for exposed agent endpoints. Call .setStreamingMode(...) only when you need a non-default mode.
If your API gateway already maps POST /api/v1/agents/supportAgent to the bridge, clients can fetch it directly. For custom controllers, pipe the envelopes to SSE/chunked responses using the Protocol & Streaming helpers.
For protocol semantics and a client-side parser loop, see AI Protocol.
Background & queues
For production workloads, queue-driven execution is usually the default pattern.
flowchart LR
A["Producer (command/subscription/http)"] -->|enqueue| B["Queue Bridge"]
B --> C["Queue Worker"]
C -->|invokeAgent| D["Agent Runtime"]
D --> E["Agent Pool (maxWorkers)"]
E --> F["LLM Provider"]Queue-driven pattern
- expose a command/subscription/event path that enqueues work
- queue worker invokes the agent
- pool settings protect concurrency and upstream APIs
// inside a queue worker / background handler
const envelopes = await invokeAgent({
eventBridge,
agentName: 'supportAgent',
agentVersion: '1',
payload: { prompt: 'Summarize ticket #42' },
})If you already use Purista queues, keep that setup.
The AI package does not require a dedicated queue implementation.
How queue settings and agent pool settings relate
- queue worker concurrency (
queue-workersetup) controls how many jobs are leased/processed - agent pool config (
poolConfig.maxWorkers) controls how many agent runs execute in-process - effective parallel LLM calls are bounded by the lower of these limits in each process
Use both knobs:
- queue knobs for delivery throughput and retry timing
- agent pool knobs for provider pressure and application-level backpressure
Queue + pool sizing (must configure both)
Example target profile:
- queue worker concurrency:
10 - agent pool
maxWorkers:4
Result: up to 10 jobs may be leased from queue, but only 4 agent runs execute at once in this process.
This protects provider APIs from bursty parallelism while still keeping the queue busy.
Why this is the default operational mode
- caller does not block on long LLM execution
- retries/delivery semantics are handled by queue infrastructure
- worker parallelism and
poolConfig.maxWorkerstogether control throughput - easier cost/rate-limit control than unbounded sync invocations
What runs where
- Queue bridge decides delivery/lease/retry mechanics
- Agent pool decides in-process parallel execution cap (
maxWorkers) - Provider/LLM executes model calls
Both queue worker concurrency and agent pool size matter. Set both intentionally.
Bring your own queue setup
You can keep full control over queue bridge, retry, visibility timeout, and scheduling. The agent layer only needs an invocation call in your worker.
// command handler -> enqueue
await context.queue.publish.aiWorkloads({
agentName: 'supportAgent',
agentVersion: '1',
payload: { prompt: 'Summarize ticket #42' },
})
// queue worker handler -> invoke
await invokeAgent({
eventBridge: context.eventBridge,
agentName: payload.agentName,
agentVersion: payload.agentVersion,
payload: payload.payload,
})Use normal queue builder/worker options for transport-level behavior:
Built-in runtime helpers
@purista/aiships reference services (AIOrchestratorService,AIWorkerService) that ingest manifests, enqueue runs, and execute them in isolated workers.- Queue bridges (Redis, NATS, AMQP, …) treat agents like any other workload—define a queue worker that calls
invokeAgentinternally, then rely on the queue bridge for delayed or batched execution. - Concurrency pools apply across sync and async invocations. Configure
poolConfig.maxWorkersat runtime/deploy-time so each environment controls throughput independently.
Failure behavior in queue mode
- transient failures are retried by your queue setup and/or agent retry policy
- handled errors emit protocol error frames and can still be inspected in worker logs
- telemetry frames include duration/token usage and pool metrics (
activeWorkers,waitingWorkers,maxWorkersPerInstance,waitTimeMs) so operations can alert on degraded runs
Pick the approach that matches your deployment. Local development usually starts agents inside the same process; production often combines HTTP exposure for real-time calls plus queue workers for heavy background chains.
