Skip to main content

Evaluator Overview

The Evaluator is the core execution engine of the Stream Consumer. It processes incoming messages through workflow trees using isolated V8 contexts for secure, sandboxed execution.

Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│ Stream Consumer │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────────┐ │
│ │ Kinesis │───▶│ Worker │───▶│ Evaluator │ │
│ │ Stream │ │ Pool │ │ ┌─────────────────────────┐│ │
│ └─────────────┘ └─────────────┘ │ │ V8 Isolate ││ │
│ │ │ ┌───────────────────┐ ││ │
│ │ │ │ Context │ ││ │
│ │ │ │ - message │ ││ │
│ │ │ │ - variables │ ││ │
│ │ │ │ - scripts │ ││ │
│ │ │ └───────────────────┘ ││ │
│ │ └─────────────────────────┘│ │
│ └─────────────────────────────┘ │
│ │ │
│ ┌───────────────▼─────────────┐ │
│ │ Entity Evaluators │ │
│ │ ┌───────┐ ┌───────┐ │ │
│ │ │ Event │ │Prompt │ │ │
│ │ └───────┘ └───────┘ │ │
│ │ ┌───────┐ │ │
│ │ │Action │ │ │
│ │ └───────┘ │ │
│ └─────────────────────────────┘ │
│ │ │
│ ┌───────────────▼─────────────┐ │
│ │ S3 │ │
│ │ success/ fail/ ignore/ │ │
│ └─────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘

Key Components

Evaluator Class

The main orchestrator (src/workers/Evaluator.js):

class Evaluator {
constructor(message, cacheData, scriptTimeout, compiledScripts) {
this.message = message;
this.cacheData = cacheData;
this.scriptTimeout = scriptTimeout || 5000;
this.compiledScripts = compiledScripts || [];
this.executionLog = []; // Track all execution steps
this.startTime = Date.now();

// Initialize entity evaluators
this.evaluators = {
'Event': new EventEvaluator(this.scriptTimeout),
'Prompt': new PromptEvaluator(this.scriptTimeout),
'Action': new ActionEvaluator(this.scriptTimeout),
};
}

// Add entry to execution log
addLogEntry(entry) {
this.executionLog.push({
ts: Date.now(),
elapsed: Date.now() - this.startTime,
...entry,
});
}
}

V8 Isolate

Each workflow execution gets a fresh V8 isolate:

  • Memory Limit: 128MB per isolate
  • Script Timeout: 5 seconds (configurable)
  • Isolation: Full sandbox, no access to Node.js APIs

Entity Evaluators

Specialized handlers for each entity type:

EvaluatorEntity TypePurpose
EventEvaluatorEventCondition evaluation, branching
PromptEvaluatorPromptAI/LLM execution
ActionEvaluatorActionScript execution

Execution Flow

1. Message Receipt

Kinesis Record


Parse JSON payload


Extract organizationId, environmentId


Get worker from pool


Create Evaluator instance

2. Initialization

async initializeIsolate() {
// Create V8 isolate with memory limit
this.isolate = new ivm.Isolate({ memoryLimit: 128 });
}

async createFreshContext() {
// Create execution context
const context = await this.isolate.createContext();

// Inject variables from cache
const variables = this.getVariables();
await context.eval(`var __variables__ = ${JSON.stringify(variables)};`);

// Inject each variable as top-level var
for (const variable of variables) {
await context.eval(`var ${variable.name} = ${JSON.stringify(variable.value)};`);
}

// Inject pre-compiled script functions
for (const script of this.compiledScripts) {
await script.inject(refContext, jail, context);
}

return context;
}

3. Workflow Traversal

async process() {
// Get workflows for this org/env
const workflows = this.getWorkflows();

for (const workflow of workflows) {
// Create fresh context for each workflow
const context = await this.createFreshContext();

// Traverse from root
const resultFound = await this.traverseNode(
workflow.workflowData.root,
context,
workflowMetadata
);

if (resultFound) {
// Result entity was reached
return;
}

context.release();
}

// No result found - save as ignore or fail
await saveWorkflowResult(orgId, envId, status, message, context);
}

4. Node Evaluation

async traverseNode(node, context, workflowMetadata) {
// Get entity from cache
const entity = this.getEntity(node.entityId);

// Get appropriate evaluator
const entityType = entity.workflowEntityType?.name || 'Action';
const evaluator = this.getEvaluator(entityType);

// Evaluate the entity
const result = await evaluator.evaluate(
context,
entity,
node,
this.message,
this.executionContext,
workflowMetadata
);

// Handle result actions
if (result.action === 'exit_workflow') {
return false;
}

if (result.action === 'workflow_complete') {
this.workflowResult = result;
return true;
}

// Process children
for (const child of result.children) {
const found = await this.traverseNode(child, context, workflowMetadata);
if (found) return true;
}

return false;
}

Cache System

The Evaluator uses pre-loaded cache data:

const cacheData = {
// Workflows indexed by org:env
workflowsByOrgEnv: Map<string, Workflow[]>,

// Variables indexed by org:env
variablesByOrgEnv: Map<string, Variable[]>,

// Entities indexed by ID
entities: Map<string, WorkflowEntity>
};

Cache Keys

workflowsByOrgEnv: "{organizationId}:{environmentId}" → Workflow[]
variablesByOrgEnv: "{organizationId}:{environmentId}" → Variable[]
entities: "{entityId}" → WorkflowEntity

Script Injection

Pre-Compiled Scripts

Scripts are compiled once at worker startup:

// From src/scripts/
const compiledScripts = [
evaluateCondition, // Condition tree evaluation
print, // Debug logging
promptCallToken, // LLM with token auth
promptCallKeys, // LLM with key/secret auth
postToMastodon, // Social media posting
// ... more
];

Injection Process

for (const script of this.compiledScripts) {
await script.inject(refContext, jail, context);
}

Each script becomes a global function in the V8 context.

Result Handling

Action Types

ActionMeaningNext Step
exit_workflowStop processing this workflowTry next workflow
process_childrenContinue to child nodesTraverse children
workflow_completeWorkflow finishedSave result, stop

Final Outcomes

ScenarioStatusS3 Folder
Workflow fired, completed successfullysuccesssuccess/
Workflow fired, script error occurredfailfail/
No workflow's first Event condition matchedignoreignore/
No workflows exist for org/envignoreignore/

First-Match-Wins Logic

The evaluator uses "first-match-wins" logic when processing workflows:

for (const workflow of workflows) {
// Skip workflows that don't start with an Event entity
const firstNode = workflow.workflowData.root;
const firstEntity = this.getEntity(firstNode.entityId);
if (firstEntity.workflowEntityType?.name !== 'Event') {
continue; // Skip this workflow
}

// Evaluate the first Event condition
const result = await this.traverseNode(firstNode, context, metadata);

if (this.workflowFired) {
// A workflow's Event condition passed - stop evaluating other workflows
break;
}
}

Key behaviors:

  1. Workflows are evaluated in order
  2. Only the first workflow whose Event condition passes is executed
  3. Once a workflow "fires", no other workflows are evaluated
  4. If no workflow fires, the message is marked as ignore

Execution Log

The Evaluator tracks every step of workflow execution in executionLog. This provides a complete trace for debugging, auditing, and performance analysis.

Log Entry Structure

{
ts: 1705315800123, // Absolute timestamp
elapsed: 2500, // Ms since workflow start
type: 'api', // Entry type
entityId: 'ent_123', // Entity being evaluated
entityName: 'Get AI Response',
entityType: 'Prompt',
service: 'llm', // Type-specific fields
durationMs: 2400,
success: true
}

Entry Types

TypeDescriptionKey Fields
workflowWorkflow start/endaction, workflowId, workflowName, totalDurationMs
eventEvent condition evaluatedmode, conditionResult, branchTaken, childCount
promptPrompt entity processedaction (script_executed, complete)
apiExternal API call (LLM, etc.)service, model, durationMs, success
actionAction script executeddurationMs, success
completeWorkflow completedstatus, hasResultValue
errorError during evaluationerror

How It Works

Each entity evaluator receives a logCallback function:

async traverseNode(node, context, workflowMetadata) {
const entity = this.getEntity(node.entityId);
const evaluator = this.getEvaluator(entityType);

// Create log callback for this entity
const logCallback = (entry) => {
this.addLogEntry({
entityId: entity.id,
entityName: entity.name,
entityType,
...entry,
});
};

// Pass to evaluator
const result = await evaluator.evaluate(
context, entity, node, message,
executionContext, workflowMetadata, logCallback
);
}

Entity evaluators call logCallback() at key moments:

// In EventEvaluator
logCallback({
type: 'event',
action: 'condition_passed',
mode: 'Single Path',
conditionResult: true,
childCount: 2,
});

// In PromptEvaluator (API call)
logCallback({
type: 'api',
service: 'llm',
model: entity.model.name,
durationMs: Date.now() - apiStartTime,
success: true,
});

Log Persistence

The execution log is saved with every workflow result to S3:

{
"message": { ... },
"context": {
"____execution_log____": [
{ "ts": 1705315800123, "elapsed": 0, "type": "workflow", "action": "start", ... },
{ "ts": 1705315800130, "elapsed": 7, "type": "event", "action": "condition_passed", ... },
{ "ts": 1705315802500, "elapsed": 2377, "type": "api", "service": "llm", "durationMs": 2300, ... },
{ "ts": 1705315802650, "elapsed": 2527, "type": "action", "action": "script_executed", ... },
{ "ts": 1705315802700, "elapsed": 2577, "type": "result", "status": "Success", ... }
]
}
}

See Processed Messages for viewing workflow results.

Error Handling

Script Errors

try {
const result = await evaluator.evaluate(...);
} catch (error) {
this.hadError = true;
this.errors.push({
entityId: entity.id,
entityName: entity.name,
entityType,
error: error.message,
timestamp: new Date().toISOString(),
});

// Also logged to execution log
this.addLogEntry({
type: 'error',
entityId: entity.id,
entityName: entity.name,
entityType,
error: error.message,
});
}

Timeout Handling

await context.eval(wrappedScript, { 
timeout: this.scriptTimeout, // Default 5000ms
promise: true
});

Scripts exceeding timeout are terminated and logged.

Error Context

Errors are saved with the message including detailed context and suggestions:

{
"____execution_errors____": [
{
"entityId": "ent_123",
"entityName": "Process Order",
"entityType": "Action",
"error": "Cannot read properties of undefined (reading 'name')",
"errorType": "TypeError",
"suggestion": "You tried to access a property on something that is undefined. Add a null check or use optional chaining: obj?.property",
"timestamp": "2024-01-15T10:30:00Z"
}
],
"____error_count____": 1
}

Enhanced Error Logging

The evaluator provides detailed error information to help with debugging:

FieldDescription
errorThe JavaScript error message
errorTypeError class (TypeError, ReferenceError, SyntaxError, etc.)
suggestionHuman-readable fix suggestion
scriptPreviewFirst 200 characters of the script
scriptLengthTotal script length
messageSummaryKey message fields for context

Error Types and Suggestions

Error TypeCommon CauseSuggestion
TypeErrorAccessing property of undefinedUse optional chaining (?.)
ReferenceErrorUsing undefined variableCheck spelling or define variable
SyntaxErrorInvalid JavaScriptCheck brackets, quotes, semicolons
TimeoutErrorScript exceeded limitOptimize or increase timeout

Example error log entry:

{
"ts": 1705315800123,
"elapsed": 2500,
"entityId": "ent_123",
"entityName": "Process Data",
"entityType": "Action",
"type": "script",
"action": "executed",
"success": false,
"error": "Cannot read properties of undefined (reading 'name')",
"errorType": "TypeError",
"suggestion": "You tried to access a property on something that is undefined. Add a null check: \"if (obj && obj.property)\" or use optional chaining: \"obj?.property\".",
"scriptPreview": "const name = data.user.name;...",
"scriptLength": 342,
"messageSummary": {
"organizationId": "org_abc",
"environmentId": "env_xyz",
"eventType": "user.action"
}
}

Performance Considerations

Memory Management

  • Isolates disposed after workflow completion
  • Contexts released after each workflow
  • 128MB limit prevents memory exhaustion

Concurrency

  • Worker pool manages multiple evaluators
  • Each worker handles one message at a time
  • Pool size configurable based on resources

Script Compilation

  • Scripts pre-compiled at startup
  • Reused across all evaluations
  • Reduces per-message overhead

Configuration

Environment Variables

VariableDefaultDescription
SCRIPT_TIMEOUT5000Script timeout in milliseconds
ISOLATE_MEMORY128V8 isolate memory limit in MB
WORKER_POOL_SIZE4Number of worker threads