Skip to main content
backend2025年10月26日7 分钟阅读

实时数据:WebSockets、SSE 以及你何时真正需要它们

了解实时数据模式——从 WebSockets 到 Server-Sent Events、Firebase Realtime Database,以及何时轮询就足够了。

websocketsrealtimesse
实时数据:WebSockets、SSE 以及你何时真正需要它们

大多数声称需要实时数据的应用程序实际上并不需要实时数据。它们需要的是感觉当前的数据——在几秒内更新,而不是几毫秒内。这种区别很重要,因为真正的实时架构会在连接管理、状态同步、扩展和错误处理方面引入复杂性,如果你的用户对五秒的延迟完全满意,那么这些复杂性是完全不必要的。

我曾构建过真正的实时功能——餐厅平台的实时订单追踪、协作编辑界面、实时仪表盘——我也曾构建过一些功能,最初我过度设计使用了 WebSockets,后来又用每十秒刷新一次的轮询取代了它们。没有人注意到这个变化。从最简单的方法开始,只有当用户真正遇到问题时才增加复杂性,这不仅是务实的,也是负责任的工程选择。

何时你真正需要实时数据

当信息的价值随延迟迅速降低时,才真正需要实时数据。

聊天和消息。 用户期望消息在一秒内出现。五秒的延迟会让对话感觉中断。在这里,实时是不可或缺的。

协作编辑。 谷歌文档(Google Docs)式的同步编辑需要亚秒级的同步,以避免编辑冲突。这里的复杂性不仅仅在于传输——你需要操作转换(operational transformation)或 CRDTs 来解决冲突。

具有运营影响的实时仪表盘。 在部署期间,工程师监控异常的仪表盘需要实时更新。而每天查看一次的业务分析仪表盘则不需要。

游戏和互动体验。 多人游戏、实时拍卖、实时投票——任何多个用户同时与共享状态交互的场景。

金融交易。 价格馈送、订单簿更新、头寸变化。在这里,毫秒至关重要,架构也反映了这一点。

何时你不需要实时数据

社交媒体动态。 Twitter 和 Instagram 感觉是实时的,但它们结合使用了焦点轮询和推送通知。当你查看时,动态并不会更新——你需要下拉刷新。

电子商务库存。 “仅剩 3 件!”不需要实时更新。在页面加载时和结账时检查就足够了。

通知计数。 显示“5 条新通知”的红色徽章可以每 30 秒轮询一次。如果通知在创建后 30 秒才出现,用户不会注意到。

内容更新。 博客文章、产品列表、用户资料——任何不经常更改的数据。在页面加载时轮询或使用带有重新验证的 HTTP 缓存头。

轮询:被低估的默认选择

轮询——定期发送 HTTP 请求以检查新数据——是最简单的方法,并且适用于比大多数开发人员想象的更多用例。

简单轮询

function usePolledData<T>(url: string, intervalMs: number = 5000) {
  const [data, setData] = useState<T | null>(null);
  const [error, setError] = useState<Error | null>(null);

  useEffect(() => {
    let active = true;

    const fetchData = async () => {
      try {
        const response = await fetch(url);
        if (!response.ok) throw new Error(`HTTP ${response.status}`);
        const result = await response.json();
        if (active) setData(result);
      } catch (e) {
        if (active) setError(e as Error);
      }
    };

    fetchData(); // Initial fetch
    const interval = setInterval(fetchData, intervalMs);

    return () => {
      active = false;
      clearInterval(interval);
    };
  }, [url, intervalMs]);

  return { data, error };
}

带条件请求的智能轮询

HTTP 条件请求(使用 ETags 的 If-None-MatchIf-Modified-Since)允许你高效地轮询。如果数据没有改变,服务器会返回 304 Not Modified 且没有响应体,从而将未更改资源的带宽消耗降至几乎为零。

async function pollWithETag(url: string): Promise<{ data: any; changed: boolean }> {
  const cachedETag = etagCache.get(url);

  const headers: HeadersInit = {};
  if (cachedETag) {
    headers['If-None-Match'] = cachedETag;
  }

  const response = await fetch(url, { headers });

  if (response.status === 304) {
    return { data: dataCache.get(url), changed: false };
  }

  const etag = response.headers.get('ETag');
  if (etag) etagCache.set(url, etag);

  const data = await response.json();
  dataCache.set(url, data);

  return { data, changed: true };
}

轮询何时失效

当以下情况发生时,轮询不再可行:

  • 你需要亚秒级延迟。 每 500 毫秒轮询一次,本质上是对你自己的 API 进行 DoS 攻击。
  • 数据很少更改但需要即时交付。 对于每小时发生一次的事件,每 5 秒轮询一次会每小时浪费 719 个请求。
  • 你有成千上万的客户端。 每个轮询客户端都是一个独立的连接。在大规模情况下,建立连接、解析头部和认证的 HTTP 开销会累积起来。

Server-Sent Events:更简单的实时方案

Server-Sent Events (SSE) 是 Web 开发中最未被充分利用的实时技术。SSE 通过标准 HTTP 连接提供从服务器到客户端的单向流。浏览器会自动处理重连,并且协议极其简单。

服务器端实现

// Express.js SSE endpoint
app.get('/events/orders/:restaurantId', (req, res) => {
  const { restaurantId } = req.params;

  // SSE headers
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
    'X-Accel-Buffering': 'no', // Disable nginx buffering
  });

  // Send initial data
  const initialOrders = getActiveOrders(restaurantId);
  res.write(`data: ${JSON.stringify(initialOrders)}\n\n`);

  // Subscribe to order changes
  const unsubscribe = orderEmitter.on(`orders:${restaurantId}`, (order) => {
    res.write(`event: orderUpdate\n`);
    res.write(`data: ${JSON.stringify(order)}\n`);
    res.write(`id: ${order.id}-${order.updatedAt}\n\n`);
  });

  // Heartbeat to detect dead connections
  const heartbeat = setInterval(() => {
    res.write(`: heartbeat\n\n`);
  }, 30000);

  // Cleanup on disconnect
  req.on('close', () => {
    clearInterval(heartbeat);
    unsubscribe();
  });
});

客户端实现

function useSSE(url: string) {
  const [data, setData] = useState(null);
  const [connected, setConnected] = useState(false);

  useEffect(() => {
    const eventSource = new EventSource(url);

    eventSource.onopen = () => setConnected(true);

    eventSource.onmessage = (event) => {
      setData(JSON.parse(event.data));
    };

    eventSource.addEventListener('orderUpdate', (event) => {
      const order = JSON.parse(event.data);
      setData(prev => updateOrderInList(prev, order));
    });

    eventSource.onerror = () => {
      setConnected(false);
      // EventSource automatically reconnects
    };

    return () => eventSource.close();
  }, [url]);

  return { data, connected };
}

为何选择 SSE 而非 WebSockets

自动重连。 浏览器的 EventSource 实现会自动处理带有指数退避的重连。而使用 WebSockets,你需要自己实现这一点。

通过代理和负载均衡器工作。 SSE 使用标准 HTTP,因此它可以通过 nginx、CloudFlare 和大多数反向代理工作,无需特殊配置。WebSockets 需要明确的代理支持。

更简单的认证。 SSE 连接携带 cookies,并且可以使用标准的 HTTP 认证头部。WebSocket 认证需要在连接建立后发送凭据,这是一个需要额外实现的协议。

内置事件类型和 ID。 SSE 协议支持命名事件和消息 ID,这使得客户端在重连后可以从上次中断的地方恢复。

SSE 的局限性

  • 仅单向。 从服务器到客户端。如果你需要双向通信,SSE 无法做到。你可以将 SSE 与常规的 HTTP POST 请求结合使用,用于客户端到服务器的消息,这在许多用例中效果很好。
  • 浏览器连接限制。 浏览器限制对单个域的并发 HTTP 连接数(通常为六个)。每个 SSE 连接都计入此限制。HTTP/2 多路复用缓解了这个问题,但仍值得注意。
  • 无二进制数据。 SSE 仅支持文本。如果你需要流式传输二进制数据(音频、视频、文件),请使用 WebSockets 或单独的机制。

WebSockets:全双工通信

WebSockets 提供全双工通信——客户端和服务器都可以随时发送消息,而无需 HTTP 请求/响应周期的开销。当你需要双向、低延迟通信时,这是必要的。

使用 ws 的服务器端实现

import { WebSocketServer, WebSocket } from 'ws';
import { createServer } from 'http';

const server = createServer();
const wss = new WebSocketServer({ server });

// Connection management
const rooms = new Map<string, Set<WebSocket>>();

wss.on('connection', (ws, req) => {
  // Authenticate the connection
  const token = new URL(req.url!, `http://${req.headers.host}`).searchParams.get('token');
  const user = verifyToken(token);

  if (!user) {
    ws.close(4001, 'Unauthorized');
    return;
  }

  // Attach user data
  (ws as any).userId = user.id;

  ws.on('message', (raw) => {
    try {
      const message = JSON.parse(raw.toString());
      handleMessage(ws, user, message);
    } catch (e) {
      ws.send(JSON.stringify({ error: 'Invalid message format' }));
    }
  });

  ws.on('close', () => {
    // Remove from all rooms
    rooms.forEach((clients, roomId) => {
      clients.delete(ws);
      if (clients.size === 0) rooms.delete(roomId);
    });
  });

  // Heartbeat
  (ws as any).isAlive = true;
  ws.on('pong', () => { (ws as any).isAlive = true; });
});

// Dead connection detection
const heartbeatInterval = setInterval(() => {
  wss.clients.forEach((ws) => {
    if (!(ws as any).isAlive) {
      ws.terminate();
      return;
    }
    (ws as any).isAlive = false;
    ws.ping();
  });
}, 30000);

function handleMessage(ws: WebSocket, user: any, message: any) {
  switch (message.type) {
    case 'join_room':
      joinRoom(ws, message.roomId);
      break;
    case 'leave_room':
      leaveRoom(ws, message.roomId);
      break;
    case 'broadcast':
      broadcastToRoom(message.roomId, {
        type: 'message',
        userId: user.id,
        content: message.content,
        timestamp: Date.now(),
      }, ws);
      break;
  }
}

function joinRoom(ws: WebSocket, roomId: string) {
  if (!rooms.has(roomId)) rooms.set(roomId, new Set());
  rooms.get(roomId)!.add(ws);
}

function broadcastToRoom(roomId: string, data: any, exclude?: WebSocket) {
  const clients = rooms.get(roomId);
  if (!clients) return;

  const payload = JSON.stringify(data);
  clients.forEach((client) => {
    if (client !== exclude && client.readyState === WebSocket.OPEN) {
      client.send(payload);
    }
  });
}

server.listen(8080);

带重连功能的客户端实现

class WebSocketClient {
  private ws: WebSocket | null = null;
  private url: string;
  private reconnectAttempts = 0;
  private maxReconnectAttempts = 10;
  private handlers = new Map<string, Set<(data: any) => void>>();

  constructor(url: string) {
    this.url = url;
  }

  connect() {
    this.ws = new WebSocket(this.url);

    this.ws.onopen = () => {
      this.reconnectAttempts = 0;
      this.emit('connected', null);
    };

    this.ws.onmessage = (event) => {
      try {
        const message = JSON.parse(event.data);
        this.emit(message.type, message);
      } catch (e) {
        console.error('Failed to parse WebSocket message:', e);
      }
    };

    this.ws.onclose = (event) => {
      this.emit('disconnected', { code: event.code, reason: event.reason });

      if (event.code !== 4001 && this.reconnectAttempts < this.maxReconnectAttempts) {
        this.scheduleReconnect();
      }
    };

    this.ws.onerror = () => {
      // Error is followed by close event, handle reconnection there
    };
  }

  private scheduleReconnect() {
    const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
    this.reconnectAttempts++;

    setTimeout(() => {
      this.emit('reconnecting', { attempt: this.reconnectAttempts });
      this.connect();
    }, delay);
  }

  send(type: string, data: any) {
    if (this.ws?.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify({ type, ...data }));
    }
  }

  on(event: string, handler: (data: any) => void) {
    if (!this.handlers.has(event)) this.handlers.set(event, new Set());
    this.handlers.get(event)!.add(handler);
    return () => this.handlers.get(event)?.delete(handler);
  }

  private emit(event: string, data: any) {
    this.handlers.get(event)?.forEach((handler) => handler(data));
  }

  disconnect() {
    this.maxReconnectAttempts = 0; // Prevent reconnection
    this.ws?.close();
  }
}

重连策略

重连逻辑值得关注,因为它直接影响用户体验和服务器负载。

指数退避 防止了雷鸣般的群集问题。如果你的服务器重启,10,000 个客户端同时重连,服务器会再次崩溃。通过添加抖动和指数延迟,连接会随着时间分散开来:

function getReconnectDelay(attempt: number): number {
  const baseDelay = 1000;
  const maxDelay = 30000;
  const exponentialDelay = baseDelay * Math.pow(2, attempt);
  const jitter = Math.random() * 1000; // Random jitter up to 1 second
  return Math.min(exponentialDelay + jitter, maxDelay);
}

连接状态 UI 很重要。用户需要知道何时断开连接以及应用程序何时尝试重新连接。一个显示“正在重新连接...”的小横幅比默默显示陈旧数据要好。

Socket.IO:便利性与控制权

Socket.IO 是最流行的 WebSocket 库,它提供了显著的便利性:自动重连、房间管理、确认以及在 WebSocket 不可用时回退到轮询。它也增加了显著的开销,并引入了一个抽象层,可能会掩盖问题。

何时使用 Socket.IO

  • 快速原型开发。 Socket.IO 的 API 比原生 WebSockets 更简单,内置功能节省了开发时间。
  • 当你需要轮询回退时。 一些公司网络会阻止 WebSocket 连接。Socket.IO 会自动回退到 HTTP 长轮询。
  • 房间和命名空间管理。 如果你的实时功能自然地映射到房间(聊天室、游戏大厅、协作文档),Socket.IO 的房间抽象可以节省样板代码。

何时避免使用 Socket.IO

  • 高性能应用程序。 Socket.IO 的协议为每条消息增加了开销(帧、编码、元数据)。对于高吞吐量应用程序,原生 WebSockets 明显更快。
  • 当你需要互操作性时。 Socket.IO 不是标准 WebSocket——常规 WebSocket 客户端无法连接到 Socket.IO 服务器。如果你的客户端包括非 JavaScript 环境(移动应用程序、物联网设备),Socket.IO 需要为每个平台提供客户端库。
  • 当你想要了解发生了什么时。 Socket.IO 抽象了连接管理,这很方便,直到出现问题。调试 Socket.IO 连接问题通常意味着理解实际问题之上的抽象层。

Firebase Realtime Database 和 Firestore

Firebase 提供了实时功能,而无需管理任何 WebSocket 基础设施。这是其主要价值主张——你用控制权换取便利性。

Firebase Realtime Database

import { getDatabase, ref, onValue, set } from 'firebase/database';

const db = getDatabase();

// Listen for real-time updates
const ordersRef = ref(db, `restaurants/${restaurantId}/orders`);
onValue(ordersRef, (snapshot) => {
  const orders = snapshot.val();
  updateUI(orders);
});

// Write data (triggers listeners on all connected clients)
await set(ref(db, `restaurants/${restaurantId}/orders/${orderId}`), {
  status: 'preparing',
  updatedAt: Date.now(),
});

Realtime Database 是一个 JSON 树。对任何节点的每次写入都会立即传播到监听该节点或任何父节点的所有客户端。这很强大但也危险——监听高级节点的客户端会收到整个子树中所有更改的更新。

Firestore 实时监听器

import { getFirestore, collection, onSnapshot, query, where } from 'firebase/firestore';

const db = getFirestore();

// Listen for active orders
const q = query(
  collection(db, 'orders'),
  where('restaurantId', '==', restaurantId),
  where('status', 'in', ['pending', 'preparing', 'ready'])
);

const unsubscribe = onSnapshot(q, (snapshot) => {
  snapshot.docChanges().forEach((change) => {
    if (change.type === 'added') addOrder(change.doc.data());
    if (change.type === 'modified') updateOrder(change.doc.data());
    if (change.type === 'removed') removeOrder(change.doc.id);
  });
});

Firestore 的实时监听器比 Realtime Database 更细粒度——你可以监听特定的查询,而不仅仅是路径。docChanges() 方法会告诉你具体更改了什么,这使得高效的 UI 更新成为可能。

Supabase Realtime

Supabase 在 PostgreSQL 之上提供了实时功能,这让你拥有 SQL 的查询能力和实时更新。它利用 PostgreSQL 的复制功能(逻辑复制和 WAL)来检测更改并通过 WebSocket 通道广播它们。

import { createClient } from '@supabase/supabase-js';

const supabase = createClient(url, key);

// Listen for changes to the orders table
const channel = supabase
  .channel('orders')
  .on(
    'postgres_changes',
    {
      event: '*',
      schema: 'public',
      table: 'orders',
      filter: `restaurant_id=eq.${restaurantId}`,
    },
    (payload) => {
      console.log('Change type:', payload.eventType);
      console.log('New data:', payload.new);
      console.log('Old data:', payload.old);
    }
  )
  .subscribe();

与 Firebase 相比,其优势显而易见:你的数据存储在 PostgreSQL 中,具有完整的 SQL 查询能力、ACID 事务和标准工具。实时层是附加功能,而不是查询引擎的替代品。权衡之处在于,Supabase Realtime 在规模上不如 Firebase 经过实战检验,并且实时性能取决于 PostgreSQL 的复制吞吐量。

扩展 WebSocket 连接

单个服务器可以处理数万个 WebSocket 连接,但扩展到一台以上服务器会引入协调问题:在服务器 A 上发布的消息需要到达连接到服务器 B 的客户端。

发布/订阅模式

Redis 发布/订阅是标准解决方案:

import { createClient } from 'redis';

const publisher = createClient();
const subscriber = createClient();

await publisher.connect();
await subscriber.connect();

// When a message comes in on any server, publish to Redis
function publishToRoom(roomId: string, message: any) {
  publisher.publish(`room:${roomId}`, JSON.stringify(message));
}

// Each server subscribes and forwards to its local connections
await subscriber.subscribe(`room:${roomId}`, (message) => {
  const data = JSON.parse(message);
  broadcastToLocalClients(roomId, data);
});

这种模式允许你在负载均衡器后面运行多个 WebSocket 服务器。每个服务器处理自己的连接,Redis 协调跨服务器通信。

粘性会话

WebSocket 连接是有状态的——一旦建立,它们必须保持在同一台服务器上。负载均衡器需要粘性会话(也称为会话亲和性)来将客户端的 WebSocket 连接路由到处理初始 HTTP 升级的同一台服务器。

使用 nginx:

upstream websocket_servers {
    ip_hash;  # Sticky sessions based on client IP
    server ws1.example.com:8080;
    server ws2.example.com:8080;
}

server {
    location /ws {
        proxy_pass http://websocket_servers;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_read_timeout 86400;  # 24 hours
    }
}

实时与近实时

在“用户刷新页面时检查更新”和“在几毫秒内交付所有更改”之间存在一个连续的范围。大多数功能都介于两者之间。

方法 延迟 复杂性 最佳适用场景
页面刷新 用户触发 静态内容,不频繁的更改
轮询 (30秒) 最多30秒 通知计数,仪表盘更新
轮询 (5秒) 最多5秒 活动动态,订单状态
长轮询 亚秒级 中等 旧系统,对代理不友好的环境
SSE 亚秒级 中等 实时动态,通知,单向流
WebSockets 亚秒级 聊天,协作,双向实时
WebRTC 毫秒级 非常高 视频/音频,点对点

正确的方法是满足你延迟要求的最简单方法。从轮询开始。如果用户抱怨数据陈旧,升级到 SSE。如果你需要双向通信,使用 WebSockets。复杂性阶梯上的每一步都应该由具体的用户需求来证明,而不是假设的未来需求。

实用建议

在多个项目中构建实时功能后,这些是我反复使用的模式:

默认情况下,单向实时使用 SSE。 大多数实时功能都是服务器到客户端的:订单更新、通知流、实时数据馈送。SSE 以比 WebSockets 更低的复杂性处理所有这些。

仅在双向需求时使用 WebSockets。 聊天、协作编辑、多人互动——这些确实需要 WebSockets。如果你的客户端只接收数据,SSE 更简单。

始终实现带退避的重连。 连接会断开。网络会切换。服务器会重启。你的客户端需要优雅地处理这种情况,你的服务器需要经受住重连风暴。

发送差异,而不是完整状态。 不要每次一个订单更改时都发送整个订单列表,只发送更改的订单。客户端将其差异应用到本地状态。这减少了两端的带宽和处理。

有备用方案。 如果你的 WebSocket 连接失败且无法重新连接,应用程序仍应保持功能。一个带有手动刷新的“上次更新 30 秒前”指示器比空白屏幕要好。

在生产环境中监控连接数。 WebSocket 连接会消耗服务器资源(内存、文件描述符)。连接数的突然飙升——例如,由于客户端错误导致快速重连——可能会导致服务器宕机。对连接数异常发出警报。

目标不是拥有最复杂的实时架构。目标是以最少的架构复杂性,将数据足够快地交付给用户,使应用程序感觉响应迅速。

DU

Danil Ulmashev

Full Stack Developer

有兴趣一起合作吗?