Skip to main content

Evaluator Overview

The Evaluator is the core execution engine of the Stream Consumer. It processes incoming messages through workflow trees using isolated V8 contexts for secure, sandboxed execution.

Architecture

┌──────────────────────────────────────────────────────────────────────────┐
│ Stream Consumer │
├──────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌──────────────────────────────┐│
│ │ Kinesis │───▶│ Worker │───▶│ Evaluator ││
│ │ Stream │ │ Pool │ │ ┌────────────────────────┐ ││
│ └─────────────┘ └─────────────┘ │ │ V8 Isolate │ ││
│ ▲ │ │ ┌──────────────────┐ │ ││
│ │ │ │ │ Global Scope │ │ ││
│ │ │ │ │ - organizationId │ │ ││
│ │ │ │ │ - environmentId │ │ ││
│ │ │ │ │ - type, payload │ │ ││
│ │ │ │ │ - variables │ │ ││
│ │ │ │ │ - scripts │ │ ││
│ │ │ │ └──────────────────┘ │ ││
│ │ │ └────────────────────────┘ ││
│ │ └──────────────────────────────┘│
│ │ │ │
│ │ ┌─────────────▼────────────────┐│
│ │ │ Entity Evaluators ││
│ │ │ ┌───────┐ ┌───────┐ ││
│ │ │ │ Event │ │Prompt │ ││
│ │ │ └───────┘ └───────┘ ││
│ │ │ ┌───────┐ ┌──────────────┐ ││
│ │ │ │Action │ │Workflow Node │ ││
│ │ │ └───────┘ └──────┬───────┘ ││
│ │ └────────────────────┼─────────┘│
│ │ │ │ │
│ │ ┌──── sub-workflow trigger ───────────────┘ │ │
│ │ │ (Kinesis PutRecord) │ │
│ └──┘ ▼ │
│ ┌──────────────────────┐ │
│ │ S3 │ │
│ │ success/ fail/ ignore│ │
│ └──────────────────────┘ │
└──────────────────────────────────────────────────────────────────────────┘

Key Components

Evaluator Class

The main orchestrator (src/workers/Evaluator.js):

class Evaluator {
constructor(message, cacheData, scriptTimeout, compiledScripts) {
this.message = message;
this.cacheData = cacheData;
this.scriptTimeout = scriptTimeout || 5000;
this.compiledScripts = compiledScripts || [];
this.executionLog = []; // Track all execution steps
this.startTime = Date.now();

// Initialize entity evaluators
this.evaluators = {
'Event': new EventEvaluator(this.scriptTimeout),
'Prompt': new PromptEvaluator(this.scriptTimeout),
'Action': new ActionEvaluator(this.scriptTimeout),
};
}

// Add entry to execution log
addLogEntry(entry) {
this.executionLog.push({
ts: Date.now(),
elapsed: Date.now() - this.startTime,
...entry,
});
}
}

V8 Isolate

Each workflow execution gets a fresh V8 isolate:

  • Memory Limit: 128MB per isolate
  • Script Timeout: 5 seconds (configurable)
  • Isolation: Full sandbox, no access to Node.js APIs

Entity Evaluators

Specialized handlers for each entity type:

EvaluatorEntity TypePurpose
EventEvaluatorEventCondition evaluation, branching
PromptEvaluatorPromptAI/LLM execution
ActionEvaluatorActionScript execution

Execution Flow

1. Message Receipt

Kinesis Record


Parse JSON payload


Extract organizationId, environmentId


Get worker from pool


Create Evaluator instance

2. Initialization

async initializeIsolate() {
// Create V8 isolate with memory limit
this.isolate = new ivm.Isolate({ memoryLimit: 128 });
}

async createFreshContext() {
// Create execution context
const context = await this.isolate.createContext();

// Inject variables from cache
const variables = this.getVariables();
await context.eval(`var __variables__ = ${JSON.stringify(variables)};`);

// Inject each variable as top-level var
for (const variable of variables) {
await context.eval(`var ${variable.name} = ${JSON.stringify(variable.value)};`);
}

// Inject pre-compiled script functions
for (const script of this.compiledScripts) {
await script.inject(refContext, jail, context);
}

return context;
}

3. Workflow Traversal

The message is injected once at the start of each workflow. All message fields are spread directly onto the V8 global scope (there is no message wrapper object). This means execution state accumulates across entities within the same workflow without being reset.

async process() {
// Get workflows for this org/env
const workflows = this.getWorkflows();

for (const workflow of workflows) {
// Create fresh context for each workflow
const context = await this.createFreshContext();

// Inject message ONCE — spreads all fields onto V8 global scope
await this.evaluator.injectMessage(context, this.message);

// Sync prompt count for sub-workflows that inherit parent state
await context.eval(`(function() {
var max = 0;
for (var k in global) {
if (k.indexOf('promptResponse_') === 0) {
var idx = parseInt(k.replace('promptResponse_', ''), 10);
if (!isNaN(idx) && idx > max) max = idx;
}
}
if (typeof __prompt_count__ !== 'undefined') __prompt_count__ = max;
})();`);

// Traverse from root
const resultFound = await this.traverseNode(
workflow.workflowData.root,
context,
workflowMetadata
);

if (resultFound) {
// Result entity was reached
return;
}

context.release();
}

// No result found - save as ignore or fail
await saveWorkflowResult(orgId, envId, status, message, context);
}

4. Node Evaluation

async traverseNode(node, context, workflowMetadata) {
// Get entity from cache
const entity = this.getEntity(node.entityId);

// Get appropriate evaluator
const entityType = entity.workflowEntityType?.name || 'Action';
const evaluator = this.getEvaluator(entityType);

// Evaluate the entity
const result = await evaluator.evaluate(
context,
entity,
node,
this.message,
this.executionContext,
workflowMetadata
);

// Handle result actions
if (result.action === 'exit_workflow') {
return false;
}

if (result.action === 'workflow_complete') {
this.workflowResult = result;
return true;
}

// Process children
for (const child of result.children) {
const found = await this.traverseNode(child, context, workflowMetadata);
if (found) return true;
}

return false;
}

Cache System

The Evaluator uses pre-loaded cache data:

const cacheData = {
// Workflows indexed by org:env
workflowsByOrgEnv: Map<string, Workflow[]>,

// Variables indexed by org:env
variablesByOrgEnv: Map<string, Variable[]>,

// Entities indexed by ID
entities: Map<string, WorkflowEntity>
};

Cache Keys

workflowsByOrgEnv: "{organizationId}:{environmentId}" → Workflow[]
variablesByOrgEnv: "{organizationId}:{environmentId}" → Variable[]
entities: "{entityId}" → WorkflowEntity

Script Injection

Pre-Compiled Scripts

Scripts are compiled once at worker startup:

// From src/scripts/
const compiledScripts = [
evaluateCondition, // Condition tree evaluation
print, // Debug logging
promptCallToken, // LLM with token auth
promptCallKeys, // LLM with key/secret auth
postToMastodon, // Social media posting
// ... more
];

Injection Process

for (const script of this.compiledScripts) {
await script.inject(refContext, jail, context);
}

Each script becomes a global function in the V8 context.

Result Handling

Action Types

ActionMeaningNext Step
exit_workflowStop processing this workflowTry next workflow
process_childrenContinue to child nodesTraverse children
workflow_completeWorkflow finishedSave result, stop

Final Outcomes

ScenarioStatusS3 Folder
Workflow fired, completed successfullysuccesssuccess/
Workflow fired, script error occurredfailfail/
No workflow's first Event condition matchedignoreignore/
No workflows exist for org/envignoreignore/

First-Match-Wins Logic

The evaluator uses "first-match-wins" logic when processing workflows:

for (const workflow of workflows) {
// Skip workflows that don't start with an Event entity
const firstNode = workflow.workflowData.root;
const firstEntity = this.getEntity(firstNode.entityId);
if (firstEntity.workflowEntityType?.name !== 'Event') {
continue; // Skip this workflow
}

// Evaluate the first Event condition
const result = await this.traverseNode(firstNode, context, metadata);

if (this.workflowFired) {
// A workflow's Event condition passed - stop evaluating other workflows
break;
}
}

Key behaviors:

  1. Workflows are evaluated in order
  2. Only the first workflow whose Event condition passes is executed
  3. Once a workflow "fires", no other workflows are evaluated
  4. If no workflow fires, the message is marked as ignore

Execution Log

The Evaluator tracks every step of workflow execution in executionLog. This provides a complete trace for debugging, auditing, and performance analysis.

Log Entry Structure

{
ts: 1705315800123, // Absolute timestamp
elapsed: 2500, // Ms since workflow start
type: 'api', // Entry type
entityId: 'ent_123', // Entity being evaluated
entityName: 'Get AI Response',
entityType: 'Prompt',
service: 'llm', // Type-specific fields
durationMs: 2400,
success: true
}

Entry Types

TypeDescriptionKey Fields
workflowWorkflow start/end or sub-workflow triggeraction, workflowId, workflowName, totalDurationMs
eventEvent condition evaluatedmode, conditionResult, branchTaken, childCount
promptPrompt entity processedaction (script_executed, complete)
apiExternal API call (LLM, etc.)service, model, durationMs, success
actionAction script executeddurationMs, success
completeWorkflow completedstatus, hasResultValue
errorError during evaluationerror

How It Works

Each entity evaluator receives a logCallback function:

async traverseNode(node, context, workflowMetadata) {
const entity = this.getEntity(node.entityId);
const evaluator = this.getEvaluator(entityType);

// Create log callback for this entity
const logCallback = (entry) => {
this.addLogEntry({
entityId: entity.id,
entityName: entity.name,
entityType,
...entry,
});
};

// Pass to evaluator
const result = await evaluator.evaluate(
context, entity, node, message,
executionContext, workflowMetadata, logCallback
);
}

Entity evaluators call logCallback() at key moments:

// In EventEvaluator
logCallback({
type: 'event',
action: 'condition_passed',
mode: 'Single Path',
conditionResult: true,
childCount: 2,
});

// In PromptEvaluator (API call)
logCallback({
type: 'api',
service: 'llm',
model: entity.model.name,
durationMs: Date.now() - apiStartTime,
success: true,
});

Log Persistence

The execution log is saved with every workflow result to S3:

{
"message": { ... },
"context": {
"____execution_log____": [
{ "ts": 1705315800123, "elapsed": 0, "type": "workflow", "action": "start", ... },
{ "ts": 1705315800130, "elapsed": 7, "type": "event", "action": "condition_passed", ... },
{ "ts": 1705315802500, "elapsed": 2377, "type": "api", "service": "llm", "durationMs": 2300, ... },
{ "ts": 1705315802650, "elapsed": 2527, "type": "action", "action": "script_executed", ... },
{ "ts": 1705315802700, "elapsed": 2577, "type": "result", "status": "Success", ... }
]
}
}

See Processed Messages for viewing workflow results.

Error Handling

Script Errors

try {
const result = await evaluator.evaluate(...);
} catch (error) {
this.hadError = true;
this.errors.push({
entityId: entity.id,
entityName: entity.name,
entityType,
error: error.message,
timestamp: new Date().toISOString(),
});

// Also logged to execution log
this.addLogEntry({
type: 'error',
entityId: entity.id,
entityName: entity.name,
entityType,
error: error.message,
});
}

Timeout Handling

await context.eval(wrappedScript, { 
timeout: this.scriptTimeout, // Default 5000ms
promise: true
});

Scripts exceeding timeout are terminated and logged.

Error Context

Errors are saved with the message including detailed context and suggestions:

{
"____execution_errors____": [
{
"entityId": "ent_123",
"entityName": "Process Order",
"entityType": "Action",
"error": "Cannot read properties of undefined (reading 'name')",
"errorType": "TypeError",
"suggestion": "You tried to access a property on something that is undefined. Add a null check or use optional chaining: obj?.property",
"timestamp": "2024-01-15T10:30:00Z"
}
],
"____error_count____": 1
}

Enhanced Error Logging

The evaluator provides detailed error information to help with debugging:

FieldDescription
errorThe JavaScript error message
errorTypeError class (TypeError, ReferenceError, SyntaxError, etc.)
suggestionHuman-readable fix suggestion
scriptPreviewFirst 200 characters of the script
scriptLengthTotal script length
messageSummaryKey message fields for context

Error Types and Suggestions

Error TypeCommon CauseSuggestion
TypeErrorAccessing property of undefinedUse optional chaining (?.)
ReferenceErrorUsing undefined variableCheck spelling or define variable
SyntaxErrorInvalid JavaScriptCheck brackets, quotes, semicolons
TimeoutErrorScript exceeded limitOptimize or increase timeout

Example error log entry:

{
"ts": 1705315800123,
"elapsed": 2500,
"entityId": "ent_123",
"entityName": "Process Data",
"entityType": "Action",
"type": "script",
"action": "executed",
"success": false,
"error": "Cannot read properties of undefined (reading 'name')",
"errorType": "TypeError",
"suggestion": "You tried to access a property on something that is undefined. Add a null check: \"if (obj && obj.property)\" or use optional chaining: \"obj?.property\".",
"scriptPreview": "const name = data.user.name;...",
"scriptLength": 342,
"messageSummary": {
"organizationId": "org_abc",
"environmentId": "env_xyz",
"eventType": "user.action"
}
}

Sub-Workflow Execution

When the evaluator encounters a workflow node (entityType: "workflow" with a workflowId), it triggers a sub-workflow instead of evaluating an entity. This enables multi-workflow orchestration where a parent workflow delegates to child workflows.

How It Works

Parent Workflow Traversal


Workflow Node (workflowId: "abc-123")

├─── 1. Capture full V8 global state
│ (all message fields + script variables)

├─── 2. Build Kinesis trigger message
│ type: "workflow_trigger"
│ triggerType: "sub_workflow"

├─── 3. PutRecord to Kinesis stream

└─── 4. Continue to child nodes in parent workflow

The parent workflow does not wait for the sub-workflow to complete — it dispatches the Kinesis message and continues processing its own children immediately.

State Propagation

The full V8 global scope is captured and sent with the trigger message. This includes:

  • All message fields (organizationId, environmentId, type, payload, etc.)
  • Prompt responses (promptResponse_1, promptResponse_2, ...)
  • Script-defined variables set during workflow execution
  • Entity arguments injected earlier in the workflow

Internal globals (names starting with __), built-in objects (global, globalThis), and functions are excluded from the snapshot.

Kinesis Trigger Message

The dispatched message has this structure:

{
"organizationId": "org-uuid",
"environmentId": "env-uuid",
"type": "workflow_trigger",
"triggerType": "sub_workflow",
"workflowId": "target-workflow-uuid",
"parentTimestamp": "2026-03-31T12:00:00.000Z",
"payload": { "...inherited..." },
"promptResponse_1": "...inherited...",
"customVar": "...inherited..."
}

When the consumer picks up this message, it routes directly to the specified workflow (bypassing the normal first-match-wins evaluation of all workflows).

Prompt Count Synchronization

Sub-workflows inherit prompt responses from the parent. To prevent index collisions, the evaluator synchronizes the __prompt_count__ at the start of each workflow:

  1. Scan global scope for promptResponse_* keys
  2. Find the maximum index (e.g., if promptResponse_2 exists, max = 2)
  3. Set __prompt_count__ to that maximum

This means the sub-workflow's first prompt produces promptResponse_3 (continuing from the parent's count) rather than overwriting promptResponse_1.

Execution Log

Sub-workflow triggers are recorded in the execution log:

{
"type": "workflow",
"action": "sub_workflow_triggered",
"workflowId": "target-workflow-uuid"
}

If the Kinesis dispatch fails, an error entry is logged:

{
"type": "error",
"action": "sub_workflow_trigger_failed",
"workflowId": "target-workflow-uuid",
"error": "Kinesis PutRecord failed: ..."
}

Script-Based Triggers

Sub-workflows can also be triggered from Action scripts using triggerWorkflow(workflowId) and getWorkflowByName(name). These functions use the same state capture and Kinesis dispatch mechanism. See Workflow Orchestration Scripts for details.

Transaction Billing

The Consumer enforces per-organization transaction limits before processing messages.

How It Works

  1. When a message arrives, the Consumer increments a Redis counter for the organization's current billing period
  2. The counter key format is txn:{organizationId}:{period} (period is YYYY-MM)
  3. The current count is compared against the organization's subscription-level limit (Subscription.transactionLimit)
  4. Based on the overage policy (Subscription.overagePolicy), the message is either processed or rejected

Transaction limits and overage policies are stored directly on the Subscription record. They default to 1000 and hard_limit respectively and can be adjusted per-organization through the Transaction Config API. When a plan is assigned (via Stripe checkout or admin action), the subscription's limits are set from the corresponding PlanConfiguration defaults.

Overage Policies

PolicyBehavior
hard_limitMessages exceeding the limit are routed to the ignore queue with status ignore. The original message payload is preserved in the result context for visibility in the admin Processed Messages UI.
warnMessages are processed normally but a warning is logged

Real-Time Plan Change Notifications

When a plan or transaction configuration changes, the consumer is notified immediately via Redis pub/sub rather than waiting for the 5-minute cache refresh cycle:

Admin / Stripe Webhook


notifyPlanChange(orgId, reason)


Redis PUBLISH "rw:plan-changes"


Consumer subscribeToPlanChanges()


workflowCache.refreshOrgLimit(orgId)

Notification triggers:

SourceChannelReason
Stripe checkout completedPer-orgcheckout_completed
Stripe subscription updated/deletedPer-orgsubscription_updated / subscription_deleted
Admin plan changePer-orgadmin_plan_change
Per-org transaction config updatePer-orgtransaction_config_updated
Plan configuration defaults changedBroadcast (*)plan_config_updated:{planKey}

When the consumer receives a per-org notification, it re-queries the subscription and current Redis counter to decide whether the org is still over its limit. A broadcast (*) triggers a full cache reload.

Stale Key Cleanup

During the periodic sync of Redis counters to Postgres (TransactionUsage), if a counter key references an organization that no longer exists in the database (foreign key violation), the consumer:

  1. Logs a warning with the list of stale org IDs
  2. Deletes the orphaned Redis counter keys
  3. Continues syncing the remaining organizations without interruption

This prevents repeated sync failures caused by deleted organizations.

Transaction Data

Transaction counters are stored in Redis with a TTL of ~35 days, ensuring automatic expiration after each billing period. The Notifications service monitors these counters to send usage alerts at 70% and 100% thresholds.

Image Recognition

The Consumer supports image recognition for Prompt entities using vision-capable models. When a Prompt entity's model supports vision:

  1. Image URLs or base64 data in the message payload are detected
  2. The universal model adapter formats the request for the provider's vision API
  3. The model processes both text and image content
  4. The response is stored via latestPromptResponse() as usual

Supported providers for vision: OpenAI (GPT-4o), Anthropic (Claude 3.5 Sonnet), Google (Gemini Pro Vision), and AWS Bedrock (Claude models).

Performance Considerations

Memory Management

  • Isolates disposed after workflow completion
  • Contexts released after each workflow
  • 128MB limit prevents memory exhaustion

Concurrency

  • Worker pool manages multiple evaluators
  • Each worker handles one message at a time
  • Pool size configurable based on resources
  • Higher throughput achieved through optimized batch processing of Kinesis records

Script Compilation

  • Scripts pre-compiled at startup
  • Reused across all evaluations
  • Reduces per-message overhead

Configuration

Environment Variables

VariableDefaultDescription
SCRIPT_TIMEOUT5000Script timeout in milliseconds
ISOLATE_MEMORY128V8 isolate memory limit in MB
WORKER_POOL_SIZE4Number of worker threads