Skip to content

Event Streaming

Hantera’s Event Streaming API allows you to receive real-time notifications about changes in your system. This enables building live dashboards, monitoring job execution, and reacting to resource changes as they happen.

Overview

The Event Streaming system uses WebSocket connections to push events to clients in real-time. Key features include:

  • Real-time notifications: Receive events immediately as they occur
  • Subscription-based filtering: Subscribe only to the events you need
  • Multiple resource types: Track jobs, job statistics, and actor state changes
  • Best-effort delivery: Events are delivered as they happen without persistence

Endpoint

Connect to the events endpoint using a WebSocket connection:

wss://{hostname}/events

Replace {hostname} with your tenant hostname.

Connection Lifecycle

  1. Open WebSocket connection

    Connect to the /events endpoint with a WebSocket client:

    const ws = new WebSocket('wss://{hostname}/events')
  2. Authenticate

    The first message must be an auth message with your access token:

    {
    "type": "auth",
    "token": "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."
    }
  3. Receive authentication confirmation

    On success, the server responds with:

    {
    "type": "authenticated"
    }
  4. Subscribe to events

    Subscribe to the resource paths and event types you want to receive:

    {
    "type": "subscribe",
    "subscriptions": [
    {
    "id": "my-subscription",
    "path": "jobs",
    "events": ["jobScheduled", "jobStarted", "jobCompleted", "jobFailed"]
    }
    ]
    }
  5. Receive events

    Events are pushed as they occur:

    {
    "type": "event",
    "subscriptionId": "my-subscription",
    "eventType": "jobStarted",
    "path": "jobs",
    "data": { ... },
    "timestamp": "2025-12-08T21:45:00.000Z"
    }
  6. Respond to keep-alive

    The server sends ping messages every 30 seconds. Respond with pong within 30 seconds to keep the connection alive:

    { "type": "pong" }

Authentication

Authentication uses message-based authentication rather than headers, ensuring compatibility with browser and Node.js WebSocket clients.

Auth Message

interface AuthMessage {
type: 'auth'
token: string // Bearer token without "Bearer " prefix
}

Auth Response

On success:

{
"type": "authenticated"
}

On failure, an error is returned and the connection is closed:

{
"type": "error",
"code": "UNAUTHORIZED",
"message": "Invalid or expired token"
}

Subscribing to Events

Use the subscribe message to register for events on specific resource paths.

Subscribe Message

interface SubscribeMessage {
type: 'subscribe'
requestId?: string // Optional correlation ID
subscriptions: Subscription[]
}
interface Subscription {
id: string // Client-assigned unique identifier
path: string // Resource path to subscribe to
events: string[] // Event types to receive
}

Subscription Response

{
"type": "subscribed",
"requestId": "req-1",
"subscriptions": [
{
"id": "my-subscription",
"path": "jobs",
"events": ["jobScheduled", "jobStarted", "jobCompleted", "jobFailed"]
}
]
}

Unsubscribing

Remove subscriptions when you no longer need them:

{
"type": "unsubscribe",
"ids": ["my-subscription"]
}

Supported Resources and Events

Jobs

Track job lifecycle events across all jobs or specific jobs.

PathDescription
jobsAll job lifecycle events
jobs/{jobId}Events for a specific job

Available Events:

EventDescription
jobScheduledJob created in pending state
jobStartedJob execution began
jobCompletedJob finished successfully
jobFailedJob execution failed

Example: Subscribe to all job events

{
"type": "subscribe",
"subscriptions": [{
"id": "all-jobs",
"path": "jobs",
"events": ["jobScheduled", "jobStarted", "jobCompleted", "jobFailed"]
}]
}

Example: Subscribe to job failures only

{
"type": "subscribe",
"subscriptions": [{
"id": "job-failures",
"path": "jobs",
"events": ["jobFailed"]
}]
}

Job Statistics

Track aggregated statistics per job definition with live bucket updates.

PathDescription
job-definitionsStatistics for all job definitions
job-definitions/{jobDefinitionId}Statistics for a specific job type

Available Events:

EventDescription
jobStatisticsLive bucket update with aggregated counters

Statistics Payload:

{
"type": "event",
"subscriptionId": "stats",
"eventType": "jobStatistics",
"path": "job-definitions/sync-inventory",
"data": {
"jobDefinitionId": "sync-inventory",
"bucketTime": "2025-12-08T22:00:00.000Z",
"scheduled": 45,
"successful": 40,
"failed": 2,
"minExecution": 120.5,
"maxExecution": 1250.0,
"avgExecution": 450.3
},
"timestamp": "2025-12-08T22:30:00.000Z"
}

Actors

Track state changes (checkpoints) for domain actors: Orders, Payments, SKUs, Tickets, and Assets.

PathDescription
actorsAll actor checkpoint events
actors/{actorType}Checkpoints for a specific actor type
actors/{actorType}/{actorId}Checkpoints for a specific actor instance

Actor Types:

  • orders - Order actors
  • payments - Payment actors
  • skus - SKU actors
  • Custom actors

Available Events:

EventDescription
actorCheckpointMutationSet written to actor mutation log

Example: Subscribe to all order changes

{
"type": "subscribe",
"subscriptions": [{
"id": "order-updates",
"path": "actors/orders",
"events": ["actorCheckpoint"]
}]
}

Example: Subscribe to a specific order

{
"type": "subscribe",
"subscriptions": [{
"id": "order-detail",
"path": "actors/orders/550e8400-e29b-41d4-a716-446655440000",
"events": ["actorCheckpoint"]
}]
}

Checkpoint Payload:

{
"type": "event",
"subscriptionId": "order-updates",
"eventType": "actorCheckpoint",
"path": "actors/orders/550e8400-e29b-41d4-a716-446655440000",
"data": {
"checkpointId": "770e8400-e29b-41d4-a716-446655440002",
"actorType": "orders",
"actorId": "550e8400-e29b-41d4-a716-446655440000",
"identityId": "880e8400-e29b-41d4-a716-446655440003",
"timestamp": "2025-12-08T21:47:00.000Z"
},
"timestamp": "2025-12-08T21:47:00.000Z"
}

Keep-Alive

The server sends ping messages every 30 seconds to verify the connection is alive:

{
"type": "ping",
"timestamp": "2025-12-08T21:30:00.000Z"
}

Respond with pong within 30 seconds:

{
"type": "pong"
}

If no pong is received within the timeout, the server closes the connection.

Error Handling

Errors are returned as error messages:

{
"type": "error",
"code": "INVALID_PATH",
"message": "Unknown resource path: invalid/path",
"requestId": "req-5"
}

Common Error Codes:

CodeDescription
UNAUTHORIZEDInvalid or expired token
INVALID_PATHSubscription path not recognized
INVALID_SCOPEInvalid event types for path
SUBSCRIPTION_NOT_FOUNDUnsubscribe referenced unknown ID
RATE_LIMITEDToo many requests

Backpressure and Message Drops

The Event Streaming system uses best-effort delivery. When clients consume events slower than they’re produced, events may be dropped.

How Backpressure Works

Each connection has a server-side queue that buffers events. When events are produced faster than the client consumes them:

  1. Events accumulate in the queue
  2. When the queue fills up, oldest events are dropped
  3. The server sends a warning message notifying the client

Warning Message

When events are dropped, you’ll receive a warning:

{
"type": "warning",
"code": "QUEUE_OVERFLOW",
"message": "5 events dropped for subscription 'all-jobs' due to slow consumption",
"subscriptionId": "all-jobs"
}

Handling Backpressure

To minimize dropped events:

  1. Process events quickly: Avoid blocking operations (network calls, heavy computation) in your event handler. Offload processing to a background queue if needed.

  2. Subscribe selectively: Only subscribe to events you actually need. Use specific paths (actors/orders/{orderId}) instead of broad ones (actors) when possible.

  3. Handle warnings gracefully: When you receive a QUEUE_OVERFLOW warning, sync your state by querying the Graph API to recover any missed changes.

Example: Handling overflow warnings

this.ws.onmessage = (event) => {
const message = JSON.parse(event.data)
if (message.type === 'warning' && message.code === 'QUEUE_OVERFLOW') {
console.warn(`Events dropped for ${message.subscriptionId}`)
// Re-sync state from Graph API to recover missed events
this.resyncState(message.subscriptionId)
}
// ... handle other message types
}

When to Use Webhooks Instead

For workflows where every event must be processed, use Rules with webhooks instead of event streaming. Webhooks provide:

  • Guaranteed delivery with retries
  • Persistent event history
  • Acknowledgment-based processing

Event streaming is designed for real-time UI updates and monitoring where occasional missed events are acceptable.

Complete Example

Here’s a complete example in TypeScript showing how to connect, authenticate, and subscribe to events:

class HanteraEventsClient {
private ws: WebSocket | null = null
private token: string
constructor(token: string) {
this.token = token
}
connect(url: string): Promise<void> {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(url)
this.ws.onopen = () => {
// Send auth message immediately
this.send({ type: 'auth', token: this.token })
}
this.ws.onmessage = (event) => {
const message = JSON.parse(event.data)
switch (message.type) {
case 'authenticated':
console.log('Authenticated successfully')
resolve()
break
case 'subscribed':
console.log('Subscribed:', message.subscriptions)
break
case 'event':
this.handleEvent(message)
break
case 'ping':
this.send({ type: 'pong' })
break
case 'error':
console.error('Error:', message.code, message.message)
if (message.code === 'UNAUTHORIZED') {
reject(new Error(message.message))
}
break
}
}
this.ws.onerror = (error) => reject(error)
this.ws.onclose = () => console.log('Connection closed')
})
}
subscribe(subscriptions: Array<{id: string, path: string, events: string[]}>) {
this.send({ type: 'subscribe', subscriptions })
}
unsubscribe(ids: string[]) {
this.send({ type: 'unsubscribe', ids })
}
private send(message: object) {
this.ws?.send(JSON.stringify(message))
}
private handleEvent(message: any) {
console.log(`Event [${message.eventType}]:`, message.data)
// Handle specific event types
switch (message.eventType) {
case 'jobCompleted':
console.log(`Job ${message.data.jobId} completed in ${message.data.elapsedMs}ms`)
break
case 'jobFailed':
console.error(`Job ${message.data.jobId} failed: ${message.data.error}`)
break
case 'actorCheckpoint':
console.log(`${message.data.actorType} ${message.data.actorId} updated`)
break
}
}
close() {
this.ws?.close()
}
}
// Usage
async function main() {
const client = new HanteraEventsClient('your-access-token')
await client.connect('wss://your-tenant.core.ams.hantera.cloud/events')
// Subscribe to job events
client.subscribe([
{
id: 'jobs',
path: 'jobs',
events: ['jobScheduled', 'jobStarted', 'jobCompleted', 'jobFailed']
},
{
id: 'orders',
path: 'actors/orders',
events: ['actorCheckpoint']
}
])
}
main()

Connection Limits

Connection limits are configurable for enterprise tenants. These are the defaults:

LimitDefault
Max connections500
Max subscriptions/conn50
Auth timeout10s
Ping interval30s
Ping timeout30s
Max message size (Authenticated)512 KB

Best Practices

  1. Reconnect on disconnect: Implement automatic reconnection with exponential backoff for network interruptions.

  2. Subscribe selectively: Only subscribe to the events you need to reduce message volume.

  3. Handle backpressure: If the server sends a warning with code QUEUE_OVERFLOW, your client is consuming events too slowly.

  4. Use request correlation: Include requestId in subscribe/unsubscribe messages to correlate responses.

  5. Clean up subscriptions: Unsubscribe when you no longer need events to free up server resources.