Evaluator Overview
The Evaluator is the core execution engine of the Stream Consumer. It processes incoming messages through workflow trees using isolated V8 contexts for secure, sandboxed execution.
Architecture
┌──────────────────────────────────────────────────────────────────────────┐
│ Stream Consumer │
├──────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌──────────────────────────────┐│
│ │ Kinesis │───▶│ Worker │───▶│ Evaluator ││
│ │ Stream │ │ Pool │ │ ┌────────────────────────┐ ││
│ └─────────────┘ └─────────────┘ │ │ V8 Isolate │ ││
│ ▲ │ │ ┌──────────────────┐ │ ││
│ │ │ │ │ Global Scope │ │ ││
│ │ │ │ │ - organizationId │ │ ││
│ │ │ │ │ - environmentId │ │ ││
│ │ │ │ │ - type, payload │ │ ││
│ │ │ │ │ - variables │ │ ││
│ │ │ │ │ - scripts │ │ ││
│ │ │ │ └──────────────────┘ │ ││
│ │ │ └────────────────────────┘ ││
│ │ └──────────────────────────────┘│
│ │ │ │
│ │ ┌─────────────▼────────────────┐│
│ │ │ Entity Evaluators ││
│ │ │ ┌───────┐ ┌───────┐ ││
│ │ │ │ Event │ │Prompt │ ││
│ │ │ └───────┘ └───────┘ ││
│ │ │ ┌───────┐ ┌──────────────┐ ││
│ │ │ │Action │ │Workflow Node │ ││
│ │ │ └───────┘ └──────┬───────┘ ││
│ │ └────────────────────┼─────────┘│
│ │ │ │ │
│ │ ┌──── sub-workflow trigger ───────────────┘ │ │
│ │ │ (Kinesis PutRecord) │ │
│ └──┘ ▼ │
│ ┌──────────────────────┐ │
│ │ S3 │ │
│ │ success/ fail/ ignore│ │
│ └──────────────────────┘ │
└──────────────────────────────────────────────────────────────────────────┘
Key Components
Evaluator Class
The main orchestrator (src/workers/Evaluator.js):
class Evaluator {
constructor(message, cacheData, scriptTimeout, compiledScripts) {
this.message = message;
this.cacheData = cacheData;
this.scriptTimeout = scriptTimeout || 5000;
this.compiledScripts = compiledScripts || [];
this.executionLog = []; // Track all execution steps
this.startTime = Date.now();
// Initialize entity evaluators
this.evaluators = {
'Event': new EventEvaluator(this.scriptTimeout),
'Prompt': new PromptEvaluator(this.scriptTimeout),
'Action': new ActionEvaluator(this.scriptTimeout),
};
}
// Add entry to execution log
addLogEntry(entry) {
this.executionLog.push({
ts: Date.now(),
elapsed: Date.now() - this.startTime,
...entry,
});
}
}
V8 Isolate
Each workflow execution gets a fresh V8 isolate:
- Memory Limit: 128MB per isolate
- Script Timeout: 5 seconds (configurable)
- Isolation: Full sandbox, no access to Node.js APIs
Entity Evaluators
Specialized handlers for each entity type:
| Evaluator | Entity Type | Purpose |
|---|---|---|
EventEvaluator | Event | Condition evaluation, branching |
PromptEvaluator | Prompt | AI/LLM execution |
ActionEvaluator | Action | Script execution |
Execution Flow
1. Message Receipt
Kinesis Record
│
▼
Parse JSON payload
│
▼
Extract organizationId, environmentId
│
▼
Get worker from pool
│
▼
Create Evaluator instance
2. Initialization
async initializeIsolate() {
// Create V8 isolate with memory limit
this.isolate = new ivm.Isolate({ memoryLimit: 128 });
}
async createFreshContext() {
// Create execution context
const context = await this.isolate.createContext();
// Inject variables from cache
const variables = this.getVariables();
await context.eval(`var __variables__ = ${JSON.stringify(variables)};`);
// Inject each variable as top-level var
for (const variable of variables) {
await context.eval(`var ${variable.name} = ${JSON.stringify(variable.value)};`);
}
// Inject pre-compiled script functions
for (const script of this.compiledScripts) {
await script.inject(refContext, jail, context);
}
return context;
}
3. Workflow Traversal
The message is injected once at the start of each workflow. All message fields are spread directly onto the V8 global scope (there is no message wrapper object). This means execution state accumulates across entities within the same workflow without being reset.
async process() {
// Get workflows for this org/env
const workflows = this.getWorkflows();
for (const workflow of workflows) {
// Create fresh context for each workflow
const context = await this.createFreshContext();
// Inject message ONCE — spreads all fields onto V8 global scope
await this.evaluator.injectMessage(context, this.message);
// Sync prompt count for sub-workflows that inherit parent state
await context.eval(`(function() {
var max = 0;
for (var k in global) {
if (k.indexOf('promptResponse_') === 0) {
var idx = parseInt(k.replace('promptResponse_', ''), 10);
if (!isNaN(idx) && idx > max) max = idx;
}
}
if (typeof __prompt_count__ !== 'undefined') __prompt_count__ = max;
})();`);
// Traverse from root
const resultFound = await this.traverseNode(
workflow.workflowData.root,
context,
workflowMetadata
);
if (resultFound) {
// Result entity was reached
return;
}
context.release();
}
// No result found - save as ignore or fail
await saveWorkflowResult(orgId, envId, status, message, context);
}
4. Node Evaluation
async traverseNode(node, context, workflowMetadata) {
// Get entity from cache
const entity = this.getEntity(node.entityId);
// Get appropriate evaluator
const entityType = entity.workflowEntityType?.name || 'Action';
const evaluator = this.getEvaluator(entityType);
// Evaluate the entity
const result = await evaluator.evaluate(
context,
entity,
node,
this.message,
this.executionContext,
workflowMetadata
);
// Handle result actions
if (result.action === 'exit_workflow') {
return false;
}
if (result.action === 'workflow_complete') {
this.workflowResult = result;
return true;
}
// Process children
for (const child of result.children) {
const found = await this.traverseNode(child, context, workflowMetadata);
if (found) return true;
}
return false;
}
Cache System
The Evaluator uses pre-loaded cache data:
const cacheData = {
// Workflows indexed by org:env
workflowsByOrgEnv: Map<string, Workflow[]>,
// Variables indexed by org:env
variablesByOrgEnv: Map<string, Variable[]>,
// Entities indexed by ID
entities: Map<string, WorkflowEntity>
};
Cache Keys
workflowsByOrgEnv: "{organizationId}:{environmentId}" → Workflow[]
variablesByOrgEnv: "{organizationId}:{environmentId}" → Variable[]
entities: "{entityId}" → WorkflowEntity
Script Injection
Pre-Compiled Scripts
Scripts are compiled once at worker startup:
// From src/scripts/
const compiledScripts = [
evaluateCondition, // Condition tree evaluation
print, // Debug logging
promptCallToken, // LLM with token auth
promptCallKeys, // LLM with key/secret auth
postToMastodon, // Social media posting
// ... more
];
Injection Process
for (const script of this.compiledScripts) {
await script.inject(refContext, jail, context);
}
Each script becomes a global function in the V8 context.
Result Handling
Action Types
| Action | Meaning | Next Step |
|---|---|---|
exit_workflow | Stop processing this workflow | Try next workflow |
process_children | Continue to child nodes | Traverse children |
workflow_complete | Workflow finished | Save result, stop |
Final Outcomes
| Scenario | Status | S3 Folder |
|---|---|---|
| Workflow fired, completed successfully | success | success/ |
| Workflow fired, script error occurred | fail | fail/ |
| No workflow's first Event condition matched | ignore | ignore/ |
| No workflows exist for org/env | ignore | ignore/ |
First-Match-Wins Logic
The evaluator uses "first-match-wins" logic when processing workflows:
for (const workflow of workflows) {
// Skip workflows that don't start with an Event entity
const firstNode = workflow.workflowData.root;
const firstEntity = this.getEntity(firstNode.entityId);
if (firstEntity.workflowEntityType?.name !== 'Event') {
continue; // Skip this workflow
}
// Evaluate the first Event condition
const result = await this.traverseNode(firstNode, context, metadata);
if (this.workflowFired) {
// A workflow's Event condition passed - stop evaluating other workflows
break;
}
}
Key behaviors:
- Workflows are evaluated in order
- Only the first workflow whose Event condition passes is executed
- Once a workflow "fires", no other workflows are evaluated
- If no workflow fires, the message is marked as
ignore
Execution Log
The Evaluator tracks every step of workflow execution in executionLog. This provides a complete trace for debugging, auditing, and performance analysis.
Log Entry Structure
{
ts: 1705315800123, // Absolute timestamp
elapsed: 2500, // Ms since workflow start
type: 'api', // Entry type
entityId: 'ent_123', // Entity being evaluated
entityName: 'Get AI Response',
entityType: 'Prompt',
service: 'llm', // Type-specific fields
durationMs: 2400,
success: true
}
Entry Types
| Type | Description | Key Fields |
|---|---|---|
workflow | Workflow start/end or sub-workflow trigger | action, workflowId, workflowName, totalDurationMs |
event | Event condition evaluated | mode, conditionResult, branchTaken, childCount |
prompt | Prompt entity processed | action (script_executed, complete) |
api | External API call (LLM, etc.) | service, model, durationMs, success |
action | Action script executed | durationMs, success |
complete | Workflow completed | status, hasResultValue |
error | Error during evaluation | error |
How It Works
Each entity evaluator receives a logCallback function:
async traverseNode(node, context, workflowMetadata) {
const entity = this.getEntity(node.entityId);
const evaluator = this.getEvaluator(entityType);
// Create log callback for this entity
const logCallback = (entry) => {
this.addLogEntry({
entityId: entity.id,
entityName: entity.name,
entityType,
...entry,
});
};
// Pass to evaluator
const result = await evaluator.evaluate(
context, entity, node, message,
executionContext, workflowMetadata, logCallback
);
}
Entity evaluators call logCallback() at key moments:
// In EventEvaluator
logCallback({
type: 'event',
action: 'condition_passed',
mode: 'Single Path',
conditionResult: true,
childCount: 2,
});
// In PromptEvaluator (API call)
logCallback({
type: 'api',
service: 'llm',
model: entity.model.name,
durationMs: Date.now() - apiStartTime,
success: true,
});
Log Persistence
The execution log is saved with every workflow result to S3:
{
"message": { ... },
"context": {
"____execution_log____": [
{ "ts": 1705315800123, "elapsed": 0, "type": "workflow", "action": "start", ... },
{ "ts": 1705315800130, "elapsed": 7, "type": "event", "action": "condition_passed", ... },
{ "ts": 1705315802500, "elapsed": 2377, "type": "api", "service": "llm", "durationMs": 2300, ... },
{ "ts": 1705315802650, "elapsed": 2527, "type": "action", "action": "script_executed", ... },
{ "ts": 1705315802700, "elapsed": 2577, "type": "result", "status": "Success", ... }
]
}
}
See Processed Messages for viewing workflow results.
Error Handling
Script Errors
try {
const result = await evaluator.evaluate(...);
} catch (error) {
this.hadError = true;
this.errors.push({
entityId: entity.id,
entityName: entity.name,
entityType,
error: error.message,
timestamp: new Date().toISOString(),
});
// Also logged to execution log
this.addLogEntry({
type: 'error',
entityId: entity.id,
entityName: entity.name,
entityType,
error: error.message,
});
}
Timeout Handling
await context.eval(wrappedScript, {
timeout: this.scriptTimeout, // Default 5000ms
promise: true
});
Scripts exceeding timeout are terminated and logged.
Error Context
Errors are saved with the message including detailed context and suggestions:
{
"____execution_errors____": [
{
"entityId": "ent_123",
"entityName": "Process Order",
"entityType": "Action",
"error": "Cannot read properties of undefined (reading 'name')",
"errorType": "TypeError",
"suggestion": "You tried to access a property on something that is undefined. Add a null check or use optional chaining: obj?.property",
"timestamp": "2024-01-15T10:30:00Z"
}
],
"____error_count____": 1
}
Enhanced Error Logging
The evaluator provides detailed error information to help with debugging:
| Field | Description |
|---|---|
error | The JavaScript error message |
errorType | Error class (TypeError, ReferenceError, SyntaxError, etc.) |
suggestion | Human-readable fix suggestion |
scriptPreview | First 200 characters of the script |
scriptLength | Total script length |
messageSummary | Key message fields for context |
Error Types and Suggestions
| Error Type | Common Cause | Suggestion |
|---|---|---|
TypeError | Accessing property of undefined | Use optional chaining (?.) |
ReferenceError | Using undefined variable | Check spelling or define variable |
SyntaxError | Invalid JavaScript | Check brackets, quotes, semicolons |
TimeoutError | Script exceeded limit | Optimize or increase timeout |
Example error log entry:
{
"ts": 1705315800123,
"elapsed": 2500,
"entityId": "ent_123",
"entityName": "Process Data",
"entityType": "Action",
"type": "script",
"action": "executed",
"success": false,
"error": "Cannot read properties of undefined (reading 'name')",
"errorType": "TypeError",
"suggestion": "You tried to access a property on something that is undefined. Add a null check: \"if (obj && obj.property)\" or use optional chaining: \"obj?.property\".",
"scriptPreview": "const name = data.user.name;...",
"scriptLength": 342,
"messageSummary": {
"organizationId": "org_abc",
"environmentId": "env_xyz",
"eventType": "user.action"
}
}
Sub-Workflow Execution
When the evaluator encounters a workflow node (entityType: "workflow" with a workflowId), it triggers a sub-workflow instead of evaluating an entity. This enables multi-workflow orchestration where a parent workflow delegates to child workflows.
How It Works
Parent Workflow Traversal
│
▼
Workflow Node (workflowId: "abc-123")
│
├─── 1. Capture full V8 global state
│ (all message fields + script variables)
│
├─── 2. Build Kinesis trigger message
│ type: "workflow_trigger"
│ triggerType: "sub_workflow"
│
├─── 3. PutRecord to Kinesis stream
│
└─── 4. Continue to child nodes in parent workflow
The parent workflow does not wait for the sub-workflow to complete — it dispatches the Kinesis message and continues processing its own children immediately.
State Propagation
The full V8 global scope is captured and sent with the trigger message. This includes:
- All message fields (
organizationId,environmentId,type,payload, etc.) - Prompt responses (
promptResponse_1,promptResponse_2, ...) - Script-defined variables set during workflow execution
- Entity arguments injected earlier in the workflow
Internal globals (names starting with __), built-in objects (global, globalThis), and functions are excluded from the snapshot.
Kinesis Trigger Message
The dispatched message has this structure:
{
"organizationId": "org-uuid",
"environmentId": "env-uuid",
"type": "workflow_trigger",
"triggerType": "sub_workflow",
"workflowId": "target-workflow-uuid",
"parentTimestamp": "2026-03-31T12:00:00.000Z",
"payload": { "...inherited..." },
"promptResponse_1": "...inherited...",
"customVar": "...inherited..."
}
When the consumer picks up this message, it routes directly to the specified workflow (bypassing the normal first-match-wins evaluation of all workflows).
Prompt Count Synchronization
Sub-workflows inherit prompt responses from the parent. To prevent index collisions, the evaluator synchronizes the __prompt_count__ at the start of each workflow:
- Scan global scope for
promptResponse_*keys - Find the maximum index (e.g., if
promptResponse_2exists, max = 2) - Set
__prompt_count__to that maximum
This means the sub-workflow's first prompt produces promptResponse_3 (continuing from the parent's count) rather than overwriting promptResponse_1.
Execution Log
Sub-workflow triggers are recorded in the execution log:
{
"type": "workflow",
"action": "sub_workflow_triggered",
"workflowId": "target-workflow-uuid"
}
If the Kinesis dispatch fails, an error entry is logged:
{
"type": "error",
"action": "sub_workflow_trigger_failed",
"workflowId": "target-workflow-uuid",
"error": "Kinesis PutRecord failed: ..."
}
Script-Based Triggers
Sub-workflows can also be triggered from Action scripts using triggerWorkflow(workflowId) and getWorkflowByName(name). These functions use the same state capture and Kinesis dispatch mechanism. See Workflow Orchestration Scripts for details.
Transaction Billing
The Consumer enforces per-organization transaction limits before processing messages.
How It Works
- When a message arrives, the Consumer increments a Redis counter for the organization's current billing period
- The counter key format is
txn:{organizationId}:{period}(period isYYYY-MM) - The current count is compared against the organization's subscription-level limit (
Subscription.transactionLimit) - Based on the overage policy (
Subscription.overagePolicy), the message is either processed or rejected
Transaction limits and overage policies are stored directly on the Subscription record. They default to 1000 and hard_limit respectively and can be adjusted per-organization through the Transaction Config API. When a plan is assigned (via Stripe checkout or admin action), the subscription's limits are set from the corresponding PlanConfiguration defaults.
Overage Policies
| Policy | Behavior |
|---|---|
hard_limit | Messages exceeding the limit are routed to the ignore queue with status ignore. The original message payload is preserved in the result context for visibility in the admin Processed Messages UI. |
warn | Messages are processed normally but a warning is logged |
Real-Time Plan Change Notifications
When a plan or transaction configuration changes, the consumer is notified immediately via Redis pub/sub rather than waiting for the 5-minute cache refresh cycle:
Admin / Stripe Webhook
│
▼
notifyPlanChange(orgId, reason)
│
▼
Redis PUBLISH "rw:plan-changes"
│
▼
Consumer subscribeToPlanChanges()
│
▼
workflowCache.refreshOrgLimit(orgId)
Notification triggers:
| Source | Channel | Reason |
|---|---|---|
| Stripe checkout completed | Per-org | checkout_completed |
| Stripe subscription updated/deleted | Per-org | subscription_updated / subscription_deleted |
| Admin plan change | Per-org | admin_plan_change |
| Per-org transaction config update | Per-org | transaction_config_updated |
| Plan configuration defaults changed | Broadcast (*) | plan_config_updated:{planKey} |
When the consumer receives a per-org notification, it re-queries the subscription and current Redis counter to decide whether the org is still over its limit. A broadcast (*) triggers a full cache reload.
Stale Key Cleanup
During the periodic sync of Redis counters to Postgres (TransactionUsage), if a counter key references an organization that no longer exists in the database (foreign key violation), the consumer:
- Logs a warning with the list of stale org IDs
- Deletes the orphaned Redis counter keys
- Continues syncing the remaining organizations without interruption
This prevents repeated sync failures caused by deleted organizations.
Transaction Data
Transaction counters are stored in Redis with a TTL of ~35 days, ensuring automatic expiration after each billing period. The Notifications service monitors these counters to send usage alerts at 70% and 100% thresholds.
Image Recognition
The Consumer supports image recognition for Prompt entities using vision-capable models. When a Prompt entity's model supports vision:
- Image URLs or base64 data in the message payload are detected
- The universal model adapter formats the request for the provider's vision API
- The model processes both text and image content
- The response is stored via
latestPromptResponse()as usual
Supported providers for vision: OpenAI (GPT-4o), Anthropic (Claude 3.5 Sonnet), Google (Gemini Pro Vision), and AWS Bedrock (Claude models).
Performance Considerations
Memory Management
- Isolates disposed after workflow completion
- Contexts released after each workflow
- 128MB limit prevents memory exhaustion
Concurrency
- Worker pool manages multiple evaluators
- Each worker handles one message at a time
- Pool size configurable based on resources
- Higher throughput achieved through optimized batch processing of Kinesis records
Script Compilation
- Scripts pre-compiled at startup
- Reused across all evaluations
- Reduces per-message overhead
Configuration
Environment Variables
| Variable | Default | Description |
|---|---|---|
SCRIPT_TIMEOUT | 5000 | Script timeout in milliseconds |
ISOLATE_MEMORY | 128 | V8 isolate memory limit in MB |
WORKER_POOL_SIZE | 4 | Number of worker threads |
Related Topics
- Entity Evaluators — Specialized entity handlers
- Scripts Reference — Available script functions
- Workflow Orchestration — triggerWorkflow and getWorkflowByName functions
- Workflow Entities — Entity configuration
- Workflow Data Schema — Workflow node types including sub-workflow nodes