Evaluator Overview
The Evaluator is the core execution engine of the Stream Consumer. It processes incoming messages through workflow trees using isolated V8 contexts for secure, sandboxed execution.
Architecture
┌─────────────────────────────────────────────────────────────────────────┐
│ Stream Consumer │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────────┐ │
│ │ Kinesis │───▶│ Worker │───▶│ Evaluator │ │
│ │ Stream │ │ Pool │ │ ┌─────────────────────────┐│ │
│ └─────────────┘ └─────────────┘ │ │ V8 Isolate ││ │
│ │ │ ┌───────────────────┐ ││ │
│ │ │ │ Context │ ││ │
│ │ │ │ - message │ ││ │
│ │ │ │ - variables │ ││ │
│ │ │ │ - scripts │ ││ │
│ │ │ └───────────────────┘ ││ │
│ │ └─────────────────────────┘│ │
│ └─────────────────────────────┘ │
│ │ │
│ ┌───────────────▼─────────────┐ │
│ │ Entity Evaluators │ │
│ │ ┌───────┐ ┌───────┐ │ │
│ │ │ Event │ │Prompt │ │ │
│ │ └───────┘ └───────┘ │ │
│ │ ┌───────┐ │ │
│ │ │Action │ │ │
│ │ └───────┘ │ │
│ └─────────────────────────────┘ │
│ │ │
│ ┌───────────────▼─────────────┐ │
│ │ S3 │ │
│ │ success/ fail/ ignore/ │ │
│ └─────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
Key Components
Evaluator Class
The main orchestrator (src/workers/Evaluator.js):
class Evaluator {
constructor(message, cacheData, scriptTimeout, compiledScripts) {
this.message = message;
this.cacheData = cacheData;
this.scriptTimeout = scriptTimeout || 5000;
this.compiledScripts = compiledScripts || [];
this.executionLog = []; // Track all execution steps
this.startTime = Date.now();
// Initialize entity evaluators
this.evaluators = {
'Event': new EventEvaluator(this.scriptTimeout),
'Prompt': new PromptEvaluator(this.scriptTimeout),
'Action': new ActionEvaluator(this.scriptTimeout),
};
}
// Add entry to execution log
addLogEntry(entry) {
this.executionLog.push({
ts: Date.now(),
elapsed: Date.now() - this.startTime,
...entry,
});
}
}
V8 Isolate
Each workflow execution gets a fresh V8 isolate:
- Memory Limit: 128MB per isolate
- Script Timeout: 5 seconds (configurable)
- Isolation: Full sandbox, no access to Node.js APIs
Entity Evaluators
Specialized handlers for each entity type:
| Evaluator | Entity Type | Purpose |
|---|---|---|
EventEvaluator | Event | Condition evaluation, branching |
PromptEvaluator | Prompt | AI/LLM execution |
ActionEvaluator | Action | Script execution |
Execution Flow
1. Message Receipt
Kinesis Record
│
▼
Parse JSON payload
│
▼
Extract organizationId, environmentId
│
▼
Get worker from pool
│
▼
Create Evaluator instance
2. Initialization
async initializeIsolate() {
// Create V8 isolate with memory limit
this.isolate = new ivm.Isolate({ memoryLimit: 128 });
}
async createFreshContext() {
// Create execution context
const context = await this.isolate.createContext();
// Inject variables from cache
const variables = this.getVariables();
await context.eval(`var __variables__ = ${JSON.stringify(variables)};`);
// Inject each variable as top-level var
for (const variable of variables) {
await context.eval(`var ${variable.name} = ${JSON.stringify(variable.value)};`);
}
// Inject pre-compiled script functions
for (const script of this.compiledScripts) {
await script.inject(refContext, jail, context);
}
return context;
}
3. Workflow Traversal
async process() {
// Get workflows for this org/env
const workflows = this.getWorkflows();
for (const workflow of workflows) {
// Create fresh context for each workflow
const context = await this.createFreshContext();
// Traverse from root
const resultFound = await this.traverseNode(
workflow.workflowData.root,
context,
workflowMetadata
);
if (resultFound) {
// Result entity was reached
return;
}
context.release();
}
// No result found - save as ignore or fail
await saveWorkflowResult(orgId, envId, status, message, context);
}
4. Node Evaluation
async traverseNode(node, context, workflowMetadata) {
// Get entity from cache
const entity = this.getEntity(node.entityId);
// Get appropriate evaluator
const entityType = entity.workflowEntityType?.name || 'Action';
const evaluator = this.getEvaluator(entityType);
// Evaluate the entity
const result = await evaluator.evaluate(
context,
entity,
node,
this.message,
this.executionContext,
workflowMetadata
);
// Handle result actions
if (result.action === 'exit_workflow') {
return false;
}
if (result.action === 'workflow_complete') {
this.workflowResult = result;
return true;
}
// Process children
for (const child of result.children) {
const found = await this.traverseNode(child, context, workflowMetadata);
if (found) return true;
}
return false;
}
Cache System
The Evaluator uses pre-loaded cache data:
const cacheData = {
// Workflows indexed by org:env
workflowsByOrgEnv: Map<string, Workflow[]>,
// Variables indexed by org:env
variablesByOrgEnv: Map<string, Variable[]>,
// Entities indexed by ID
entities: Map<string, WorkflowEntity>
};
Cache Keys
workflowsByOrgEnv: "{organizationId}:{environmentId}" → Workflow[]
variablesByOrgEnv: "{organizationId}:{environmentId}" → Variable[]
entities: "{entityId}" → WorkflowEntity
Script Injection
Pre-Compiled Scripts
Scripts are compiled once at worker startup:
// From src/scripts/
const compiledScripts = [
evaluateCondition, // Condition tree evaluation
print, // Debug logging
promptCallToken, // LLM with token auth
promptCallKeys, // LLM with key/secret auth
postToMastodon, // Social media posting
// ... more
];
Injection Process
for (const script of this.compiledScripts) {
await script.inject(refContext, jail, context);
}
Each script becomes a global function in the V8 context.
Result Handling
Action Types
| Action | Meaning | Next Step |
|---|---|---|
exit_workflow | Stop processing this workflow | Try next workflow |
process_children | Continue to child nodes | Traverse children |
workflow_complete | Workflow finished | Save result, stop |
Final Outcomes
| Scenario | Status | S3 Folder |
|---|---|---|
| Workflow fired, completed successfully | success | success/ |
| Workflow fired, script error occurred | fail | fail/ |
| No workflow's first Event condition matched | ignore | ignore/ |
| No workflows exist for org/env | ignore | ignore/ |
First-Match-Wins Logic
The evaluator uses "first-match-wins" logic when processing workflows:
for (const workflow of workflows) {
// Skip workflows that don't start with an Event entity
const firstNode = workflow.workflowData.root;
const firstEntity = this.getEntity(firstNode.entityId);
if (firstEntity.workflowEntityType?.name !== 'Event') {
continue; // Skip this workflow
}
// Evaluate the first Event condition
const result = await this.traverseNode(firstNode, context, metadata);
if (this.workflowFired) {
// A workflow's Event condition passed - stop evaluating other workflows
break;
}
}
Key behaviors:
- Workflows are evaluated in order
- Only the first workflow whose Event condition passes is executed
- Once a workflow "fires", no other workflows are evaluated
- If no workflow fires, the message is marked as
ignore
Execution Log
The Evaluator tracks every step of workflow execution in executionLog. This provides a complete trace for debugging, auditing, and performance analysis.
Log Entry Structure
{
ts: 1705315800123, // Absolute timestamp
elapsed: 2500, // Ms since workflow start
type: 'api', // Entry type
entityId: 'ent_123', // Entity being evaluated
entityName: 'Get AI Response',
entityType: 'Prompt',
service: 'llm', // Type-specific fields
durationMs: 2400,
success: true
}
Entry Types
| Type | Description | Key Fields |
|---|---|---|
workflow | Workflow start/end | action, workflowId, workflowName, totalDurationMs |
event | Event condition evaluated | mode, conditionResult, branchTaken, childCount |
prompt | Prompt entity processed | action (script_executed, complete) |
api | External API call (LLM, etc.) | service, model, durationMs, success |
action | Action script executed | durationMs, success |
complete | Workflow completed | status, hasResultValue |
error | Error during evaluation | error |
How It Works
Each entity evaluator receives a logCallback function:
async traverseNode(node, context, workflowMetadata) {
const entity = this.getEntity(node.entityId);
const evaluator = this.getEvaluator(entityType);
// Create log callback for this entity
const logCallback = (entry) => {
this.addLogEntry({
entityId: entity.id,
entityName: entity.name,
entityType,
...entry,
});
};
// Pass to evaluator
const result = await evaluator.evaluate(
context, entity, node, message,
executionContext, workflowMetadata, logCallback
);
}
Entity evaluators call logCallback() at key moments:
// In EventEvaluator
logCallback({
type: 'event',
action: 'condition_passed',
mode: 'Single Path',
conditionResult: true,
childCount: 2,
});
// In PromptEvaluator (API call)
logCallback({
type: 'api',
service: 'llm',
model: entity.model.name,
durationMs: Date.now() - apiStartTime,
success: true,
});
Log Persistence
The execution log is saved with every workflow result to S3:
{
"message": { ... },
"context": {
"____execution_log____": [
{ "ts": 1705315800123, "elapsed": 0, "type": "workflow", "action": "start", ... },
{ "ts": 1705315800130, "elapsed": 7, "type": "event", "action": "condition_passed", ... },
{ "ts": 1705315802500, "elapsed": 2377, "type": "api", "service": "llm", "durationMs": 2300, ... },
{ "ts": 1705315802650, "elapsed": 2527, "type": "action", "action": "script_executed", ... },
{ "ts": 1705315802700, "elapsed": 2577, "type": "result", "status": "Success", ... }
]
}
}
See Processed Messages for viewing workflow results.
Error Handling
Script Errors
try {
const result = await evaluator.evaluate(...);
} catch (error) {
this.hadError = true;
this.errors.push({
entityId: entity.id,
entityName: entity.name,
entityType,
error: error.message,
timestamp: new Date().toISOString(),
});
// Also logged to execution log
this.addLogEntry({
type: 'error',
entityId: entity.id,
entityName: entity.name,
entityType,
error: error.message,
});
}
Timeout Handling
await context.eval(wrappedScript, {
timeout: this.scriptTimeout, // Default 5000ms
promise: true
});
Scripts exceeding timeout are terminated and logged.
Error Context
Errors are saved with the message including detailed context and suggestions:
{
"____execution_errors____": [
{
"entityId": "ent_123",
"entityName": "Process Order",
"entityType": "Action",
"error": "Cannot read properties of undefined (reading 'name')",
"errorType": "TypeError",
"suggestion": "You tried to access a property on something that is undefined. Add a null check or use optional chaining: obj?.property",
"timestamp": "2024-01-15T10:30:00Z"
}
],
"____error_count____": 1
}
Enhanced Error Logging
The evaluator provides detailed error information to help with debugging:
| Field | Description |
|---|---|
error | The JavaScript error message |
errorType | Error class (TypeError, ReferenceError, SyntaxError, etc.) |
suggestion | Human-readable fix suggestion |
scriptPreview | First 200 characters of the script |
scriptLength | Total script length |
messageSummary | Key message fields for context |
Error Types and Suggestions
| Error Type | Common Cause | Suggestion |
|---|---|---|
TypeError | Accessing property of undefined | Use optional chaining (?.) |
ReferenceError | Using undefined variable | Check spelling or define variable |
SyntaxError | Invalid JavaScript | Check brackets, quotes, semicolons |
TimeoutError | Script exceeded limit | Optimize or increase timeout |
Example error log entry:
{
"ts": 1705315800123,
"elapsed": 2500,
"entityId": "ent_123",
"entityName": "Process Data",
"entityType": "Action",
"type": "script",
"action": "executed",
"success": false,
"error": "Cannot read properties of undefined (reading 'name')",
"errorType": "TypeError",
"suggestion": "You tried to access a property on something that is undefined. Add a null check: \"if (obj && obj.property)\" or use optional chaining: \"obj?.property\".",
"scriptPreview": "const name = data.user.name;...",
"scriptLength": 342,
"messageSummary": {
"organizationId": "org_abc",
"environmentId": "env_xyz",
"eventType": "user.action"
}
}
Performance Considerations
Memory Management
- Isolates disposed after workflow completion
- Contexts released after each workflow
- 128MB limit prevents memory exhaustion
Concurrency
- Worker pool manages multiple evaluators
- Each worker handles one message at a time
- Pool size configurable based on resources
Script Compilation
- Scripts pre-compiled at startup
- Reused across all evaluations
- Reduces per-message overhead
Configuration
Environment Variables
| Variable | Default | Description |
|---|---|---|
SCRIPT_TIMEOUT | 5000 | Script timeout in milliseconds |
ISOLATE_MEMORY | 128 | V8 isolate memory limit in MB |
WORKER_POOL_SIZE | 4 | Number of worker threads |
Related Topics
- Entity Evaluators — Specialized entity handlers
- Scripts Reference — Available script functions
- Workflow Entities — Entity configuration