PubSub Functions
The PubSub functions allow workflows to publish messages to Redis PubSub channels for real-time communication.
Prerequisites
Before using pubsub functions, you must have:
- Redis configured - Set the
REDIS_URLenvironment variable (e.g.,redis://localhost:6379) - A client listening - Clients must subscribe to channels before messages are published
If REDIS_URL is not set, pubsub calls will fail with: "Redis not configured"
Overview
Redis PubSub is used for:
- Sending chat responses back to clients
- Real-time notifications
- Broadcasting events to subscribers
Functions
pubsubPublish
Publish a message to a Redis PubSub channel.
Signature:
async function pubsubPublish(channel: string, message: string): Promise<void>
Parameters:
| Parameter | Type | Description |
|---|---|---|
channel | string | The Redis channel name to publish to |
message | string | The message to publish (must be a string) |
Returns: Promise<void> - Resolves when the message is published
Important Notes:
- Both
channelandmessageMUST be strings - For objects, use
JSON.stringify()to convert to string - The function is async - use
awaitwhen calling - Channel names are case-sensitive
- Requires
REDIS_URLenvironment variable
Quick Start - Chat Response
The most common use case is sending AI responses back to chat clients:
// Get the AI response from the previous prompt
const response = await latestPromptResponse();
// Build the channel name using correlationId
const channel = "chat:" + message.correlationId;
// Publish - BOTH arguments must be strings!
await pubsubPublish(channel, JSON.stringify({
content: response,
timestamp: Date.now()
}));
channelmust be a string like"chat:abc-123"messagemust be a string - useJSON.stringify()for objects- Use
message.correlationIdto route responses to the right client
Usage Examples
Basic String Message
await pubsubPublish('notifications', 'New user signed up');
print('Message published');
JSON Payload
const payload = {
type: 'chat-response',
content: 'Hello there!',
timestamp: new Date().toISOString()
};
// Must stringify the object
await pubsubPublish('responses', JSON.stringify(payload));
Dynamic Channel Names
// Use correlation ID to create unique channels
const channel = 'chat:' + message.correlationId;
const response = {
content: await latestPromptResponse(),
correlationId: message.correlationId
};
await pubsubPublish(channel, JSON.stringify(response));
print('Published to channel:', channel);
Chat Response Pattern
This is the standard pattern for chat workflows:
// Get the AI response
const aiResponse = await latestPromptResponse();
if (aiResponse) {
// Build the response payload
const responsePayload = {
type: 'chat-response',
content: aiResponse,
correlationId: message.correlationId,
timestamp: new Date().toISOString()
};
// Channel format: "chat:{correlationId}"
const channel = "chat:" + message.correlationId;
// Publish (both args must be strings)
await pubsubPublish(channel, JSON.stringify(responsePayload));
print('Published chat response to:', channel);
} else {
print('No AI response to publish');
}
Common Errors
"pubsubPublish is not defined"
Cause: Using wrong function name
Solution: The correct function name is pubsubPublish, not publishToPubSub or other variations.
"A non-transferable value was passed"
Cause: Passing a non-string argument
Solution: Use JSON.stringify() for objects:
// ❌ Wrong - passing object
await pubsubPublish('channel', { foo: 'bar' });
// ✅ Correct - passing string
await pubsubPublish('channel', JSON.stringify({ foo: 'bar' }));
Client Not Receiving Messages
Possible causes:
- Channel name mismatch (check exact string including case)
- Client subscribes after message is published
- Redis connection issues
Solution: Ensure client subscribes before publishing and channel names match exactly.
Channel Naming Conventions
| Use Case | Pattern | Example |
|---|---|---|
| Chat responses | chat:{correlationId} | chat:abc-123-def |
| User notifications | user:{userId} | user:user-456 |
| Broadcast | broadcast:{topic} | broadcast:news |
| Organization-wide | org:{orgId}:{channel} | org:abc:updates |
Integration with Chat API
The Admin Console's Chat API creates a subscription pattern:
- Client connects to
/api/chatSSE endpoint - API subscribes to channel
chat:{correlationId} - Workflow publishes to same channel using
pubsubPublish - API forwards message to client via SSE
For this to work:
- The
correlationIdmust be consistent - Channel format must match:
"chat:" + correlationId - Message should be JSON with expected structure
Related Topics
- Chatbot Workflow Example - Complete chat example
- Scripts Overview - All available script functions
- Prompt Functions - Getting AI responses