On this page
Streaming Architecture
The /v1/stream WebSocket gateway is the most non-trivial piece of Artery.
This page explains the layers, why they exist, and what swaps when we move
from in-memory to Redis on the roadmap.
Component diagram
┌─────────────┐ ┌──────────────┐
│ Client │──ws──▶│ StreamGateway│
└─────────────┘ │ (per-conn) │
└──────┬───────┘
│ subscribe / unsubscribe / ping
▼
┌──────────────┐
│StreamHubSvc │ in-process subscription store
│ (singleton) │
└──────┬───────┘
│ subscribe(channel, handler)
▼
┌──────────────┐
│InMemoryPubSub│ pluggable; Redis Pub/Sub (roadmap)
└──────▲───────┘
│ publish(channel, event)
┌──────┴───────┐
│ StreamWorker │ one per upstream feed
└──────┬───────┘
│ ws → upstream provider
▼
┌──────────────┐
│ Hyperliquid │ (or Polymarket / Kalshi)
└──────────────┘
Layer responsibilities
| Layer | Responsibility | Current impl | Roadmap |
|---|---|---|---|
StreamGateway | Accept WS conn, auth, subscribe/unsubscribe wire protocol | @fastify/websocket v11 | unchanged |
StreamHubService | Per-connection subscription map, fan-out routing | in-process Map | unchanged |
InMemoryPubSub | Process-wide channel pub/sub | in-process EventEmitter | Redis Pub/Sub |
InMemoryLeaderLock | Elect one worker process per upstream feed | in-process boolean | Redis SET NX EX |
StreamWorker | Maintain upstream WS, publish to pub/sub | one worker per provider | scaled horizontally |
The pluggable boundary is PubSub + LeaderLock. Swap two implementations
and Artery goes from single-node to multi-node with no caller-side change.
Connection handshake (race-condition resistant)
Client StreamGateway ApiKeysService
│ │ │
├──── ws upgrade ─────────▶│ │
│ │ register listeners │
│ │ (sync, before await) │
│ │ │
├── {type:subscribe} ─────▶│ buffer in pending[] │
│ │ │
│ ├── verify(token) ────────▶│
│ │ │
│ │◀────── { user, scopes } ─┤
│ │ │
│ │ flush pending[] │
│◀── {type:subscribed} ────┤ │
│ │ install heartbeat │
│ │ │
│◀── {type:event,data} ────┤ │
The early-frame race: subscribers send subscribe immediately on open. If the message
listener is registered after await apiKeys.verify(), those frames are dropped. The gateway
registers socket.on('message', …) synchronously at the start of handleConnection, then
buffers frames into pending[] until auth resolves. See apps/api/src/stream/stream.gateway.ts.
Leader-lock pattern
We don't want every API replica to open its own upstream WS — Hyperliquid
limits 1000 subs / 10 unique users per connection. One worker per feed,
elected via LeaderLock:
ts// apps/api/src/stream/stream-workers.service.ts
async onApplicationBootstrap() {
const lockKey = 'artery:lock:stream:hyperliquid';
if (await leaderLock.acquire(lockKey, { ttlSeconds: 30 })) {
this.startHyperliquidWorker();
setInterval(() => leaderLock.renew(lockKey), 10_000);
}
}| Step | What |
|---|---|
| Acquire | SET key replica-id NX EX 30 (Redis) — only one wins |
| Renew | every 10s, EXPIRE key 30 (slot keeps the lock alive) |
| Release | crash → TTL expires within 30s, another replica takes over |
ART_STREAM_WORKERS=off in env disables auto-start — useful for local
debugging and CI.
Channel naming
Channels are flat strings of the form:
artery:stream:{provider}:{marketKey}
Where provider is the ArteryProvider enum value (e.g. hyperliquid_perp,
polymarket, kalshi) and marketKey is the provider-specific market
identifier — or * to match all markets under that provider/feed.
Examples:
artery:stream:hyperliquid_perp:*— all HLallMidsupdatesartery:stream:hyperliquid_perp:BTC— HL l2Book for BTC perpartery:stream:hyperliquid_perp:100000421— HIP-4 outcome asset idartery:stream:polymarket:0xCONDITION_ID— Polymarket book updates
The single * wildcard is set by workers when the upstream event has no
per-market identifier (e.g. HL allMids carries every coin in one frame).
Subscribing to artery:stream:hyperliquid_perp:* matches that channel
exactly — wildcard expansion is set-side, not subscriber-side.
Backpressure & limits
| Layer | Bound | Drop policy |
|---|---|---|
| Per-connection send queue | 256 events | drop oldest non-event frames; close on overflow |
| Per-connection subs | 1000 | reject new subscribe with subscription_limit |
| Heartbeat idle | 60s | server closes connection |
Roadmap: Redis swap
Two file replacements:
ts// from: in-memory
new InMemoryPubSub();
new InMemoryLeaderLock();
// to: Redis-backed
new RedisPubSub(redisClient);
new RedisLeaderLock(redisClient);The interface is a subset of Redis Pub/Sub semantics, so the in-memory impl is a strict refinement of the Redis one.
See also
- WebSocket streaming — wire protocol
- Adapter contract — how providers declare stream support
- Streaming quickstart recipe