Skip to main content

PubSub Functions

The PubSub functions allow workflows to publish messages to Redis PubSub channels for real-time communication.

Prerequisites

Before using pubsub functions, you must have:

  1. Redis configured - Set the REDIS_URL environment variable (e.g., redis://localhost:6379)
  2. A client listening - Clients must subscribe to channels before messages are published
Required Configuration

If REDIS_URL is not set, pubsub calls will fail with: "Redis not configured"

Overview

Redis PubSub is used for:

  • Sending chat responses back to clients
  • Real-time notifications
  • Broadcasting events to subscribers

Functions

pubsubPublish

Publish a message to a Redis PubSub channel.

Signature:

async function pubsubPublish(channel: string, message: string): Promise<void>

Parameters:

ParameterTypeDescription
channelstringThe Redis channel name to publish to
messagestringThe message to publish (must be a string)

Returns: Promise<void> - Resolves when the message is published

Important Notes:

  • Both channel and message MUST be strings
  • For objects, use JSON.stringify() to convert to string
  • The function is async - use await when calling
  • Channel names are case-sensitive
  • Requires REDIS_URL environment variable

Quick Start - Chat Response

The most common use case is sending AI responses back to chat clients:

// Get the AI response from the previous prompt
const response = await latestPromptResponse();

// Build the channel name using correlationId
const channel = "chat:" + message.correlationId;

// Publish - BOTH arguments must be strings!
await pubsubPublish(channel, JSON.stringify({
content: response,
timestamp: Date.now()
}));
Key Points
  1. channel must be a string like "chat:abc-123"
  2. message must be a string - use JSON.stringify() for objects
  3. Use message.correlationId to route responses to the right client

Usage Examples

Basic String Message

await pubsubPublish('notifications', 'New user signed up');
print('Message published');

JSON Payload

const payload = {
type: 'chat-response',
content: 'Hello there!',
timestamp: new Date().toISOString()
};

// Must stringify the object
await pubsubPublish('responses', JSON.stringify(payload));

Dynamic Channel Names

// Use correlation ID to create unique channels
const channel = 'chat:' + message.correlationId;

const response = {
content: await latestPromptResponse(),
correlationId: message.correlationId
};

await pubsubPublish(channel, JSON.stringify(response));
print('Published to channel:', channel);

Chat Response Pattern

This is the standard pattern for chat workflows:

// Get the AI response
const aiResponse = await latestPromptResponse();

if (aiResponse) {
// Build the response payload
const responsePayload = {
type: 'chat-response',
content: aiResponse,
correlationId: message.correlationId,
timestamp: new Date().toISOString()
};

// Channel format: "chat:{correlationId}"
const channel = "chat:" + message.correlationId;

// Publish (both args must be strings)
await pubsubPublish(channel, JSON.stringify(responsePayload));

print('Published chat response to:', channel);
} else {
print('No AI response to publish');
}

Common Errors

"pubsubPublish is not defined"

Cause: Using wrong function name

Solution: The correct function name is pubsubPublish, not publishToPubSub or other variations.

"A non-transferable value was passed"

Cause: Passing a non-string argument

Solution: Use JSON.stringify() for objects:

// ❌ Wrong - passing object
await pubsubPublish('channel', { foo: 'bar' });

// ✅ Correct - passing string
await pubsubPublish('channel', JSON.stringify({ foo: 'bar' }));

Client Not Receiving Messages

Possible causes:

  1. Channel name mismatch (check exact string including case)
  2. Client subscribes after message is published
  3. Redis connection issues

Solution: Ensure client subscribes before publishing and channel names match exactly.

Channel Naming Conventions

Use CasePatternExample
Chat responseschat:{correlationId}chat:abc-123-def
User notificationsuser:{userId}user:user-456
Broadcastbroadcast:{topic}broadcast:news
Organization-wideorg:{orgId}:{channel}org:abc:updates

Integration with Chat API

The Admin Console's Chat API creates a subscription pattern:

  1. Client connects to /api/chat SSE endpoint
  2. API subscribes to channel chat:{correlationId}
  3. Workflow publishes to same channel using pubsubPublish
  4. API forwards message to client via SSE

For this to work:

  • The correlationId must be consistent
  • Channel format must match: "chat:" + correlationId
  • Message should be JSON with expected structure