Event Streaming
Hantera’s Event Streaming API allows you to receive real-time notifications about changes in your system. This enables building live dashboards, monitoring job execution, and reacting to resource changes as they happen.
Overview
The Event Streaming system uses WebSocket connections to push events to clients in real-time. Key features include:
- Real-time notifications: Receive events immediately as they occur
- Subscription-based filtering: Subscribe only to the events you need
- Multiple resource types: Track jobs, job statistics, and actor state changes
- Best-effort delivery: Events are delivered as they happen without persistence
Endpoint
Connect to the events endpoint using a WebSocket connection:
wss://{hostname}/eventsReplace {hostname} with your tenant hostname.
Connection Lifecycle
-
Open WebSocket connection
Connect to the
/eventsendpoint with a WebSocket client:const ws = new WebSocket('wss://{hostname}/events') -
Authenticate
The first message must be an
authmessage with your access token:{"type": "auth","token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."} -
Receive authentication confirmation
On success, the server responds with:
{"type": "authenticated"} -
Subscribe to events
Subscribe to the resource paths and event types you want to receive:
{"type": "subscribe","subscriptions": [{"id": "my-subscription","path": "jobs","events": ["jobScheduled", "jobStarted", "jobCompleted", "jobFailed"]}]} -
Receive events
Events are pushed as they occur:
{"type": "event","subscriptionId": "my-subscription","eventType": "jobStarted","path": "jobs","data": { ... },"timestamp": "2025-12-08T21:45:00.000Z"} -
Respond to keep-alive
The server sends
pingmessages every 30 seconds. Respond withpongwithin 30 seconds to keep the connection alive:{ "type": "pong" }
Authentication
Authentication uses message-based authentication rather than headers, ensuring compatibility with browser and Node.js WebSocket clients.
Auth Message
interface AuthMessage { type: 'auth' token: string // Bearer token without "Bearer " prefix}Auth Response
On success:
{ "type": "authenticated"}On failure, an error is returned and the connection is closed:
{ "type": "error", "code": "UNAUTHORIZED", "message": "Invalid or expired token"}Subscribing to Events
Use the subscribe message to register for events on specific resource paths.
Subscribe Message
interface SubscribeMessage { type: 'subscribe' requestId?: string // Optional correlation ID subscriptions: Subscription[]}
interface Subscription { id: string // Client-assigned unique identifier path: string // Resource path to subscribe to events: string[] // Event types to receive}Subscription Response
{ "type": "subscribed", "requestId": "req-1", "subscriptions": [ { "id": "my-subscription", "path": "jobs", "events": ["jobScheduled", "jobStarted", "jobCompleted", "jobFailed"] } ]}Unsubscribing
Remove subscriptions when you no longer need them:
{ "type": "unsubscribe", "ids": ["my-subscription"]}Supported Resources and Events
Jobs
Track job lifecycle events across all jobs or specific jobs.
| Path | Description |
|---|---|
jobs | All job lifecycle events |
jobs/{jobId} | Events for a specific job |
Available Events:
| Event | Description |
|---|---|
jobScheduled | Job created in pending state |
jobStarted | Job execution began |
jobCompleted | Job finished successfully |
jobFailed | Job execution failed |
Example: Subscribe to all job events
{ "type": "subscribe", "subscriptions": [{ "id": "all-jobs", "path": "jobs", "events": ["jobScheduled", "jobStarted", "jobCompleted", "jobFailed"] }]}Example: Subscribe to job failures only
{ "type": "subscribe", "subscriptions": [{ "id": "job-failures", "path": "jobs", "events": ["jobFailed"] }]}Job Statistics
Track aggregated statistics per job definition with live bucket updates.
| Path | Description |
|---|---|
job-definitions | Statistics for all job definitions |
job-definitions/{jobDefinitionId} | Statistics for a specific job type |
Available Events:
| Event | Description |
|---|---|
jobStatistics | Live bucket update with aggregated counters |
Statistics Payload:
{ "type": "event", "subscriptionId": "stats", "eventType": "jobStatistics", "path": "job-definitions/sync-inventory", "data": { "jobDefinitionId": "sync-inventory", "bucketTime": "2025-12-08T22:00:00.000Z", "scheduled": 45, "successful": 40, "failed": 2, "minExecution": 120.5, "maxExecution": 1250.0, "avgExecution": 450.3 }, "timestamp": "2025-12-08T22:30:00.000Z"}Actors
Track state changes (checkpoints) for domain actors: Orders, Payments, SKUs, Tickets, and Assets.
| Path | Description |
|---|---|
actors | All actor checkpoint events |
actors/{actorType} | Checkpoints for a specific actor type |
actors/{actorType}/{actorId} | Checkpoints for a specific actor instance |
Actor Types:
orders- Order actorspayments- Payment actorsskus- SKU actors- Custom actors
Available Events:
| Event | Description |
|---|---|
actorCheckpoint | MutationSet written to actor mutation log |
Example: Subscribe to all order changes
{ "type": "subscribe", "subscriptions": [{ "id": "order-updates", "path": "actors/orders", "events": ["actorCheckpoint"] }]}Example: Subscribe to a specific order
{ "type": "subscribe", "subscriptions": [{ "id": "order-detail", "path": "actors/orders/550e8400-e29b-41d4-a716-446655440000", "events": ["actorCheckpoint"] }]}Checkpoint Payload:
{ "type": "event", "subscriptionId": "order-updates", "eventType": "actorCheckpoint", "path": "actors/orders/550e8400-e29b-41d4-a716-446655440000", "data": { "checkpointId": "770e8400-e29b-41d4-a716-446655440002", "actorType": "orders", "actorId": "550e8400-e29b-41d4-a716-446655440000", "identityId": "880e8400-e29b-41d4-a716-446655440003", "timestamp": "2025-12-08T21:47:00.000Z" }, "timestamp": "2025-12-08T21:47:00.000Z"}Keep-Alive
The server sends ping messages every 30 seconds to verify the connection is alive:
{ "type": "ping", "timestamp": "2025-12-08T21:30:00.000Z"}Respond with pong within 30 seconds:
{ "type": "pong"}If no pong is received within the timeout, the server closes the connection.
Error Handling
Errors are returned as error messages:
{ "type": "error", "code": "INVALID_PATH", "message": "Unknown resource path: invalid/path", "requestId": "req-5"}Common Error Codes:
| Code | Description |
|---|---|
UNAUTHORIZED | Invalid or expired token |
INVALID_PATH | Subscription path not recognized |
INVALID_SCOPE | Invalid event types for path |
SUBSCRIPTION_NOT_FOUND | Unsubscribe referenced unknown ID |
RATE_LIMITED | Too many requests |
Backpressure and Message Drops
The Event Streaming system uses best-effort delivery. When clients consume events slower than they’re produced, events may be dropped.
How Backpressure Works
Each connection has a server-side queue that buffers events. When events are produced faster than the client consumes them:
- Events accumulate in the queue
- When the queue fills up, oldest events are dropped
- The server sends a
warningmessage notifying the client
Warning Message
When events are dropped, you’ll receive a warning:
{ "type": "warning", "code": "QUEUE_OVERFLOW", "message": "5 events dropped for subscription 'all-jobs' due to slow consumption", "subscriptionId": "all-jobs"}Handling Backpressure
To minimize dropped events:
-
Process events quickly: Avoid blocking operations (network calls, heavy computation) in your event handler. Offload processing to a background queue if needed.
-
Subscribe selectively: Only subscribe to events you actually need. Use specific paths (
actors/orders/{orderId}) instead of broad ones (actors) when possible. -
Handle warnings gracefully: When you receive a
QUEUE_OVERFLOWwarning, sync your state by querying the Graph API to recover any missed changes.
Example: Handling overflow warnings
this.ws.onmessage = (event) => { const message = JSON.parse(event.data)
if (message.type === 'warning' && message.code === 'QUEUE_OVERFLOW') { console.warn(`Events dropped for ${message.subscriptionId}`)
// Re-sync state from Graph API to recover missed events this.resyncState(message.subscriptionId) }
// ... handle other message types}When to Use Webhooks Instead
For workflows where every event must be processed, use Rules with webhooks instead of event streaming. Webhooks provide:
- Guaranteed delivery with retries
- Persistent event history
- Acknowledgment-based processing
Event streaming is designed for real-time UI updates and monitoring where occasional missed events are acceptable.
Complete Example
Here’s a complete example in TypeScript showing how to connect, authenticate, and subscribe to events:
class HanteraEventsClient { private ws: WebSocket | null = null private token: string
constructor(token: string) { this.token = token }
connect(url: string): Promise<void> { return new Promise((resolve, reject) => { this.ws = new WebSocket(url)
this.ws.onopen = () => { // Send auth message immediately this.send({ type: 'auth', token: this.token }) }
this.ws.onmessage = (event) => { const message = JSON.parse(event.data)
switch (message.type) { case 'authenticated': console.log('Authenticated successfully') resolve() break
case 'subscribed': console.log('Subscribed:', message.subscriptions) break
case 'event': this.handleEvent(message) break
case 'ping': this.send({ type: 'pong' }) break
case 'error': console.error('Error:', message.code, message.message) if (message.code === 'UNAUTHORIZED') { reject(new Error(message.message)) } break } }
this.ws.onerror = (error) => reject(error) this.ws.onclose = () => console.log('Connection closed') }) }
subscribe(subscriptions: Array<{id: string, path: string, events: string[]}>) { this.send({ type: 'subscribe', subscriptions }) }
unsubscribe(ids: string[]) { this.send({ type: 'unsubscribe', ids }) }
private send(message: object) { this.ws?.send(JSON.stringify(message)) }
private handleEvent(message: any) { console.log(`Event [${message.eventType}]:`, message.data)
// Handle specific event types switch (message.eventType) { case 'jobCompleted': console.log(`Job ${message.data.jobId} completed in ${message.data.elapsedMs}ms`) break case 'jobFailed': console.error(`Job ${message.data.jobId} failed: ${message.data.error}`) break case 'actorCheckpoint': console.log(`${message.data.actorType} ${message.data.actorId} updated`) break } }
close() { this.ws?.close() }}
// Usageasync function main() { const client = new HanteraEventsClient('your-access-token')
await client.connect('wss://your-tenant.core.ams.hantera.cloud/events')
// Subscribe to job events client.subscribe([ { id: 'jobs', path: 'jobs', events: ['jobScheduled', 'jobStarted', 'jobCompleted', 'jobFailed'] }, { id: 'orders', path: 'actors/orders', events: ['actorCheckpoint'] } ])}
main()Connection Limits
Connection limits are configurable for enterprise tenants. These are the defaults:
| Limit | Default |
|---|---|
| Max connections | 500 |
| Max subscriptions/conn | 50 |
| Auth timeout | 10s |
| Ping interval | 30s |
| Ping timeout | 30s |
| Max message size (Authenticated) | 512 KB |
Best Practices
-
Reconnect on disconnect: Implement automatic reconnection with exponential backoff for network interruptions.
-
Subscribe selectively: Only subscribe to the events you need to reduce message volume.
-
Handle backpressure: If the server sends a
warningwith codeQUEUE_OVERFLOW, your client is consuming events too slowly. -
Use request correlation: Include
requestIdin subscribe/unsubscribe messages to correlate responses. -
Clean up subscriptions: Unsubscribe when you no longer need events to free up server resources.