Skip to main content
backend2025年10月26日6分で読めます

リアルタイムデータ:WebSockets、SSE、そして本当に必要な時

WebSocketsからServer-Sent Events、Firebase Realtime Databaseまで、リアルタイムデータパターンを理解し、ポーリングで十分な場合を知る。

websocketsrealtimesse
リアルタイムデータ:WebSockets、SSE、そして本当に必要な時

リアルタイムデータを必要と主張するほとんどのアプリケーションは、実際にはリアルタイムデータを必要としていません。彼らが必要としているのは、数ミリ秒ではなく、数秒以内に更新される「最新だと感じる」データです。この区別は重要です。なぜなら、真のリアルタイムアーキテクチャは、接続管理、状態同期、スケーリング、エラー処理において複雑さを導入しますが、ユーザーが5秒の遅延で完全に満足するなら、それは全く不要だからです。

私は、レストランプラットフォームのライブ注文追跡、共同編集インターフェース、ライブダッシュボードなど、真にリアルタイムな機能を構築してきました。また、当初WebSocketsで過剰に設計し、後に10秒ごとに更新するポーリングに置き換えた機能も構築しました。誰もその変更に気づきませんでした。最もシンプルなアプローチから始め、ユーザーが実際に問題を経験したときにのみ複雑さを追加することは、単に実用的であるだけでなく、責任あるエンジニアリングの選択です。

本当にリアルタイムが必要な時

情報の価値が遅延によって急速に低下する場合、真のリアルタイムが保証されます。

チャットとメッセージング。 ユーザーはメッセージが1秒以内に表示されることを期待します。5秒の遅延は会話が途切れているように感じさせます。ここではリアルタイムは必須です。

共同編集。 Google Docsのような同時編集は、競合する編集を避けるためにサブ秒の同期を必要とします。ここでの複雑さはトランスポートだけでなく、競合解決のためにオペレーショナル・トランスフォーメーションやCRDTsが必要です。

運用に影響を与えるライブダッシュボード。 デプロイ中にエンジニアが異常を監視しているモニタリングダッシュボードは、リアルタイム更新が必要です。1日に1回しか見られないビジネス分析ダッシュボードは必要ありません。

ゲームとインタラクティブな体験。 マルチプレイヤーゲーム、ライブオークション、リアルタイム投票など、複数のユーザーが共有状態と同時にインタラクトするあらゆるもの。

金融取引。 価格フィード、注文板の更新、ポジションの変更。ここではミリ秒が重要であり、アーキテクチャもそれを反映しています。

リアルタイムが必要ない時

ソーシャルメディアフィード。 TwitterやInstagramはリアルタイムに感じられますが、フォーカス時のポーリングとプッシュ通知の組み合わせを使用しています。見ている間にフィードが更新されることはなく、プルして更新します。

Eコマースの在庫。 「残り3点!」はリアルタイムで更新する必要はありません。ページロード時とチェックアウト時に確認すれば十分です。

通知数。 「新しい通知が5件」を示す赤いバッジは30秒ごとにポーリングできます。通知が作成されてから30秒後に表示されても、ユーザーは気づきません。

コンテンツ更新。 ブログ記事、商品リスト、ユーザープロフィールなど、頻繁に変更されないデータ。ページロード時にポーリングするか、再検証付きのHTTPキャッシュヘッダーを使用します。

ポーリング:過小評価されているデフォルト

ポーリング — 新しいデータをチェックするために定期的なHTTPリクエストを行うこと — は最もシンプルなアプローチであり、ほとんどの開発者が想定するよりも多くのユースケースで機能します。

シンプルなポーリング

function usePolledData<T>(url: string, intervalMs: number = 5000) {
  const [data, setData] = useState<T | null>(null);
  const [error, setError] = useState<Error | null>(null);

  useEffect(() => {
    let active = true;

    const fetchData = async () => {
      try {
        const response = await fetch(url);
        if (!response.ok) throw new Error(`HTTP ${response.status}`);
        const result = await response.json();
        if (active) setData(result);
      } catch (e) {
        if (active) setError(e as Error);
      }
    };

    fetchData(); // Initial fetch
    const interval = setInterval(fetchData, intervalMs);

    return () => {
      active = false;
      clearInterval(interval);
    };
  }, [url, intervalMs]);

  return { data, error };
}

条件付きリクエストによるスマートポーリング

HTTP条件付きリクエスト(ETagを使用したIf-None-MatchまたはIf-Modified-Since)を使用すると、効率的にポーリングできます。データが変更されていない場合、サーバーはボディなしで304 Not Modifiedを返し、変更されていないリソースの帯域幅をほぼゼロに削減します。

async function pollWithETag(url: string): Promise<{ data: any; changed: boolean }> {
  const cachedETag = etagCache.get(url);

  const headers: HeadersInit = {};
  if (cachedETag) {
    headers['If-None-Match'] = cachedETag;
  }

  const response = await fetch(url, { headers });

  if (response.status === 304) {
    return { data: dataCache.get(url), changed: false };
  }

  const etag = response.headers.get('ETag');
  if (etag) etagCache.set(url, etag);

  const data = await response.json();
  dataCache.set(url, data);

  return { data, changed: true };
}

ポーリングが破綻する時

ポーリングが機能しなくなるのは次のような場合です。

  • サブ秒のレイテンシが必要な場合。 500msごとにポーリングすることは、実質的に自身のAPIに対するDoS攻撃です。
  • データがめったに変わらないが、即座に配信する必要がある場合。 1時間に1回しか発生しないイベントのために5秒ごとにポーリングすると、1時間あたり719回のリクエストが無駄になります。
  • 数千のクライアントがいる場合。 各ポーリングクライアントは独立した接続です。大規模になると、接続の確立、ヘッダーの解析、認証といったHTTPのオーバーヘッドが積み重なります。

Server-Sent Events:よりシンプルなリアルタイム

Server-Sent Events (SSE) は、Web開発において最も活用されていないリアルタイム技術です。SSEは、標準のHTTP接続を介してサーバーからクライアントへの一方向ストリームを提供します。ブラウザが自動的に再接続を処理し、プロトコルは非常にシンプルです。

サーバーの実装

// Express.js SSE endpoint
app.get('/events/orders/:restaurantId', (req, res) => {
  const { restaurantId } = req.params;

  // SSE headers
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
    'X-Accel-Buffering': 'no', // Disable nginx buffering
  });

  // Send initial data
  const initialOrders = getActiveOrders(restaurantId);
  res.write(`data: ${JSON.stringify(initialOrders)}\n\n`);

  // Subscribe to order changes
  const unsubscribe = orderEmitter.on(`orders:${restaurantId}`, (order) => {
    res.write(`event: orderUpdate\n`);
    res.write(`data: ${JSON.stringify(order)}\n`);
    res.write(`id: ${order.id}-${order.updatedAt}\n\n`);
  });

  // Heartbeat to detect dead connections
  const heartbeat = setInterval(() => {
    res.write(`: heartbeat\n\n`);
  }, 30000);

  // Cleanup on disconnect
  req.on('close', () => {
    clearInterval(heartbeat);
    unsubscribe();
  });
});

クライアントの実装

function useSSE(url: string) {
  const [data, setData] = useState(null);
  const [connected, setConnected] = useState(false);

  useEffect(() => {
    const eventSource = new EventSource(url);

    eventSource.onopen = () => setConnected(true);

    eventSource.onmessage = (event) => {
      setData(JSON.parse(event.data));
    };

    eventSource.addEventListener('orderUpdate', (event) => {
      const order = JSON.parse(event.data);
      setData(prev => updateOrderInList(prev, order));
    });

    eventSource.onerror = () => {
      setConnected(false);
      // EventSource automatically reconnects
    };

    return () => eventSource.close();
  }, [url]);

  return { data, connected };
}

SSEがWebSocketsよりも優れている理由

自動再接続。 ブラウザのEventSource実装は、指数関数的バックオフで自動的に再接続を処理します。WebSocketsでは、これを自分で実装する必要があります。

プロキシとロードバランサーを介して動作する。 SSEは標準HTTPを使用するため、nginx、CloudFlare、およびほとんどのリバースプロキシを特別な設定なしで動作します。WebSocketsには明示的なプロキシサポートが必要です。

よりシンプルな認証。 SSE接続はCookieを運び、標準のHTTP認証ヘッダーを使用できます。WebSocket認証は、接続確立後に資格情報を送信する必要があり、これは実装すべき追加のプロトコルです。

組み込みのイベントタイプとID。 SSEプロトコルは名前付きイベントとメッセージIDをサポートしており、クライアントが再接続後に中断した場所から再開できるようにします。

SSEの制限事項

  • 一方向のみ。 サーバーからクライアントへ。双方向通信が必要な場合、SSEではできません。クライアントからサーバーへのメッセージには、SSEと通常のHTTP POSTリクエストを組み合わせることができます。これは多くのユースケースでうまく機能します。
  • ブラウザの接続制限。 ブラウザは単一ドメインへの同時HTTP接続数を制限しています(通常6つ)。各SSE接続はこの制限にカウントされます。HTTP/2の多重化はこれを軽減しますが、認識しておく価値はあります。
  • バイナリデータなし。 SSEはテキストのみです。バイナリデータ(オーディオ、ビデオ、ファイル)をストリーミングする必要がある場合は、WebSocketsまたは別のメカニズムを使用してください。

WebSockets:全二重通信

WebSocketsは全二重通信を提供します。クライアントとサーバーの両方が、HTTPリクエスト/レスポンスサイクルのオーバーヘッドなしに、いつでもメッセージを送信できます。これは、双方向で低レイテンシの通信が必要な場合に不可欠です。

wsを使用したサーバーの実装

import { WebSocketServer, WebSocket } from 'ws';
import { createServer } from 'http';

const server = createServer();
const wss = new WebSocketServer({ server });

// Connection management
const rooms = new Map<string, Set<WebSocket>>();

wss.on('connection', (ws, req) => {
  // Authenticate the connection
  const token = new URL(req.url!, `http://${req.headers.host}`).searchParams.get('token');
  const user = verifyToken(token);

  if (!user) {
    ws.close(4001, 'Unauthorized');
    return;
  }

  // Attach user data
  (ws as any).userId = user.id;

  ws.on('message', (raw) => {
    try {
      const message = JSON.parse(raw.toString());
      handleMessage(ws, user, message);
    } catch (e) {
      ws.send(JSON.stringify({ error: 'Invalid message format' }));
    }
  });

  ws.on('close', () => {
    // Remove from all rooms
    rooms.forEach((clients, roomId) => {
      clients.delete(ws);
      if (clients.size === 0) rooms.delete(roomId);
    });
  });

  // Heartbeat
  (ws as any).isAlive = true;
  ws.on('pong', () => { (ws as any).isAlive = true; });
});

// Dead connection detection
const heartbeatInterval = setInterval(() => {
  wss.clients.forEach((ws) => {
    if (!(ws as any).isAlive) {
      ws.terminate();
      return;
    }
    (ws as any).isAlive = false;
    ws.ping();
  });
}, 30000);

function handleMessage(ws: WebSocket, user: any, message: any) {
  switch (message.type) {
    case 'join_room':
      joinRoom(ws, message.roomId);
      break;
    case 'leave_room':
      leaveRoom(ws, message.roomId);
      break;
    case 'broadcast':
      broadcastToRoom(message.roomId, {
        type: 'message',
        userId: user.id,
        content: message.content,
        timestamp: Date.now(),
      }, ws);
      break;
  }
}

function joinRoom(ws: WebSocket, roomId: string) {
  if (!rooms.has(roomId)) rooms.set(roomId, new Set());
  rooms.get(roomId)!.add(ws);
}

function broadcastToRoom(roomId: string, data: any, exclude?: WebSocket) {
  const clients = rooms.get(roomId);
  if (!clients) return;

  const payload = JSON.stringify(data);
  clients.forEach((client) => {
    if (client !== exclude && client.readyState === WebSocket.OPEN) {
      client.send(payload);
    }
  });
}

server.listen(8080);

再接続機能付きクライアントの実装

class WebSocketClient {
  private ws: WebSocket | null = null;
  private url: string;
  private reconnectAttempts = 0;
  private maxReconnectAttempts = 10;
  private handlers = new Map<string, Set<(data: any) => void>>();

  constructor(url: string) {
    this.url = url;
  }

  connect() {
    this.ws = new WebSocket(this.url);

    this.ws.onopen = () => {
      this.reconnectAttempts = 0;
      this.emit('connected', null);
    };

    this.ws.onmessage = (event) => {
      try {
        const message = JSON.parse(event.data);
        this.emit(message.type, message);
      } catch (e) {
        console.error('Failed to parse WebSocket message:', e);
      }
    };

    this.ws.onclose = (event) => {
      this.emit('disconnected', { code: event.code, reason: event.reason });

      if (event.code !== 4001 && this.reconnectAttempts < this.maxReconnectAttempts) {
        this.scheduleReconnect();
      }
    };

    this.ws.onerror = () => {
      // Error is followed by close event, handle reconnection there
    };
  }

  private scheduleReconnect() {
    const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
    this.reconnectAttempts++;

    setTimeout(() => {
      this.emit('reconnecting', { attempt: this.reconnectAttempts });
      this.connect();
    }, delay);
  }

  send(type: string, data: any) {
    if (this.ws?.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify({ type, ...data }));
    }
  }

  on(event: string, handler: (data: any) => void) {
    if (!this.handlers.has(event)) this.handlers.set(event, new Set());
    this.handlers.get(event)!.add(handler);
    return () => this.handlers.get(event)?.delete(handler);
  }

  private emit(event: string, data: any) {
    this.handlers.get(event)?.forEach((handler) => handler(data));
  }

  disconnect() {
    this.maxReconnectAttempts = 0; // Prevent reconnection
    this.ws?.close();
  }
}

再接続戦略

再接続ロジックは、ユーザーエクスペリエンスとサーバー負荷に直接影響するため、注意が必要です。

指数関数的バックオフは、サンダリング・ハード問題を防止します。サーバーが再起動し、10,000のクライアントが同時に再接続しようとすると、サーバーは再びクラッシュします。ジッターと指数関数的な遅延を追加することで、接続は時間とともに分散されます。

function getReconnectDelay(attempt: number): number {
  const baseDelay = 1000;
  const maxDelay = 30000;
  const exponentialDelay = baseDelay * Math.pow(2, attempt);
  const jitter = Math.random() * 1000; // Random jitter up to 1 second
  return Math.min(exponentialDelay + jitter, maxDelay);
}

接続状態のUIは重要です。ユーザーは、切断されたときやアプリが再接続を試みているときにそれを知る必要があります。「再接続中...」という小さなバナーは、サイレントに古いデータを表示するよりも優れています。

Socket.IO:利便性 vs 制御

Socket.IOは最も人気のあるWebSocketライブラリであり、自動再接続、ルーム管理、確認応答、WebSocketsが利用できない場合のポーリングへのフォールバックなど、大きな利便性を提供します。また、かなりのオーバーヘッドを追加し、問題を隠蔽する抽象化レイヤーを導入することもあります。

Socket.IOを使用すべき時

  • 迅速なプロトタイピング。 Socket.IOのAPIは生のWebSocketsよりもシンプルで、組み込み機能により開発時間を節約できます。
  • ポーリングフォールバックが必要な場合。 一部の企業ネットワークではWebSocket接続がブロックされます。Socket.IOは自動的にHTTPロングポーリングにフォールバックします。
  • ルームと名前空間の管理。 リアルタイム機能がルーム(チャットルーム、ゲームロビー、共同ドキュメント)に自然にマッピングされる場合、Socket.IOのルーム抽象化はボイラープレートを節約します。

Socket.IOを避けるべき時

  • 高性能アプリケーション。 Socket.IOのプロトコルは、すべてのメッセージにオーバーヘッド(フレーミング、エンコーディング、メタデータ)を追加します。高スループットのアプリケーションでは、生のWebSocketsの方が明らかに高速です。
  • 相互運用性が必要な場合。 Socket.IOは標準のWebSocketではありません。通常のWebSocketクライアントはSocket.IOサーバーに接続できません。クライアントに非JavaScript環境(モバイルアプリ、IoTデバイス)が含まれる場合、Socket.IOは各プラットフォーム用のクライアントライブラリを必要とします。
  • 何が起こっているかを理解したい場合。 Socket.IOは接続管理を抽象化しており、問題が発生するまでは便利です。Socket.IOの接続問題のデバッグは、多くの場合、実際の問題の上に構築された抽象化レイヤーを理解することを意味します。

Firebase Realtime DatabaseとFirestore

Firebaseは、WebSocketインフラストラクチャを管理することなくリアルタイム機能を提供します。これがその主な価値提案です。制御と引き換えに利便性を得ます。

Firebase Realtime Database

import { getDatabase, ref, onValue, set } from 'firebase/database';

const db = getDatabase();

// Listen for real-time updates
const ordersRef = ref(db, `restaurants/${restaurantId}/orders`);
onValue(ordersRef, (snapshot) => {
  const orders = snapshot.val();
  updateUI(orders);
});

// Write data (triggers listeners on all connected clients)
await set(ref(db, `restaurants/${restaurantId}/orders/${orderId}`), {
  status: 'preparing',
  updatedAt: Date.now(),
});

Realtime DatabaseはJSONツリーです。任意のノードへの書き込みは、そのノードまたは親ノードをリッスンしているすべてのクライアントに即座に伝播します。これは強力ですが危険です。高レベルのノードをリッスンしているリスナーは、サブツリー全体のすべての変更に対する更新を受け取ります。

Firestoreリアルタイムリスナー

import { getFirestore, collection, onSnapshot, query, where } from 'firebase/firestore';

const db = getFirestore();

// Listen for active orders
const q = query(
  collection(db, 'orders'),
  where('restaurantId', '==', restaurantId),
  where('status', 'in', ['pending', 'preparing', 'ready'])
);

const unsubscribe = onSnapshot(q, (snapshot) => {
  snapshot.docChanges().forEach((change) => {
    if (change.type === 'added') addOrder(change.doc.data());
    if (change.type === 'modified') updateOrder(change.doc.data());
    if (change.type === 'removed') removeOrder(change.doc.id);
  });
});

Firestoreのリアルタイムリスナーは、Realtime Databaseよりも粒度が高く、パスだけでなく特定のクエリをリッスンできます。docChanges()メソッドは、何が変更されたかを正確に伝え、効率的なUI更新を可能にします。

Supabase Realtime

SupabaseはPostgreSQLの上にリアルタイム機能を提供し、SQLのクエリ能力とリアルタイム更新を両立させます。PostgreSQLのレプリケーション機能(論理レプリケーションとWAL)を使用して変更を検出し、WebSocketチャネルを通じてブロードキャストします。

import { createClient } from '@supabase/supabase-js';

const supabase = createClient(url, key);

// Listen for changes to the orders table
const channel = supabase
  .channel('orders')
  .on(
    'postgres_changes',
    {
      event: '*',
      schema: 'public',
      table: 'orders',
      filter: `restaurant_id=eq.${restaurantId}`,
    },
    (payload) => {
      console.log('Change type:', payload.eventType);
      console.log('New data:', payload.new);
      console.log('Old data:', payload.old);
    }
  )
  .subscribe();

Firebaseに対する利点は明確です。データはPostgreSQLに存在し、完全なSQLクエリ機能、ACIDトランザクション、標準ツールを利用できます。リアルタイムレイヤーはクエリエンジンの代替ではなく、追加機能です。トレードオフとして、Supabase RealtimeはFirebaseほど大規模での実績がなく、リアルタイムパフォーマンスはPostgreSQLのレプリケーションスループットに依存します。

WebSocket接続のスケーリング

単一のサーバーは数万のWebSocket接続を処理できますが、1つのサーバーを超えてスケーリングすると、調整の問題が発生します。サーバーAで公開されたメッセージは、サーバーBに接続されているクライアントに到達する必要があります。

Pub/Subパターン

Redis pub/subは標準的なソリューションです。

import { createClient } from 'redis';

const publisher = createClient();
const subscriber = createClient();

await publisher.connect();
await subscriber.connect();

// When a message comes in on any server, publish to Redis
function publishToRoom(roomId: string, message: any) {
  publisher.publish(`room:${roomId}`, JSON.stringify(message));
}

// Each server subscribes and forwards to its local connections
await subscriber.subscribe(`room:${roomId}`, (message) => {
  const data = JSON.parse(message);
  broadcastToLocalClients(roomId, data);
});

このパターンにより、ロードバランサーの背後で複数のWebSocketサーバーを実行できます。各サーバーは独自の接続を処理し、Redisがサーバー間の通信を調整します。

スティッキーセッション

WebSocket接続はステートフルです。一度確立されると、同じサーバーに留まる必要があります。ロードバランサーは、クライアントのWebSocket接続を、最初のHTTPアップグレードを処理したのと同じサーバーにルーティングするために、スティッキーセッション(セッションアフィニティとも呼ばれます)を必要とします。

nginxの場合:

upstream websocket_servers {
    ip_hash;  # Sticky sessions based on client IP
    server ws1.example.com:8080;
    server ws2.example.com:8080;
}

server {
    location /ws {
        proxy_pass http://websocket_servers;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_read_timeout 86400;  # 24 hours
    }
}

リアルタイム vs ニアリアルタイム

「ユーザーがページを更新したときに更新をチェックする」と「すべての変更をミリ秒単位で配信する」の間にはスペクトラムがあります。ほとんどの機能はその中間に位置します。

アプローチ レイテンシ 複雑さ 最適な用途
ページ更新 ユーザー主導 なし 静的コンテンツ、頻繁でない変更
ポーリング (30秒) 最大30秒 通知数、ダッシュボード更新
ポーリング (5秒) 最大5秒 アクティビティフィード、注文ステータス
ロングポーリング サブ秒 中程度 古いシステム、プロキシに不向きな環境
SSE サブ秒 中程度 ライブフィード、通知、一方向ストリーム
WebSockets サブ秒 チャット、コラボレーション、双方向リアルタイム
WebRTC ミリ秒 非常に高 ビデオ/オーディオ、ピアツーピア

適切なアプローチは、レイテンシ要件を満たす最もシンプルなものです。ポーリングから始めましょう。ユーザーがデータの古さに不満を言うなら、SSEにアップグレードします。双方向通信が必要なら、WebSocketsを使用します。複雑さの階段を一段上がるごとに、仮説的な将来の要件ではなく、具体的なユーザーのニーズによって正当化されるべきです。

実践的な推奨事項

いくつかのプロジェクトでリアルタイム機能を構築した後、私が常に立ち返るパターンは次のとおりです。

一方向リアルタイムにはSSEをデフォルトにする。 ほとんどのリアルタイム機能はサーバーからクライアントへのものです。注文更新、通知ストリーム、ライブデータフィードなど。SSEはWebSocketsよりも少ない複雑さでこれらすべてを処理します。

WebSocketsは双方向のニーズにのみ使用する。 チャット、共同編集、マルチプレイヤーインタラクション — これらは真にWebSocketsを必要とします。クライアントがデータを受信するだけなら、SSEの方がシンプルです。

常にバックオフ付きの再接続を実装する。 接続は切断されます。ネットワークは切り替わります。サーバーは再起動します。クライアントはこれを適切に処理する必要があり、サーバーは再接続の嵐を乗り切る必要があります。

完全な状態ではなく差分を送信する。 1つの注文が変更されるたびに注文リスト全体を送信するのではなく、変更された注文のみを送信します。クライアントはローカルの状態に差分を適用します。これにより、両側の帯域幅と処理が削減されます。

フォールバックを用意する。 WebSocket接続が失敗し、再接続できない場合でも、アプリケーションは機能し続けるべきです。「30秒前に最終更新」という手動更新インジケーターは、空白の画面よりも優れています。

本番環境で接続数を監視する。 WebSocket接続はサーバーリソース(メモリ、ファイルディスクリプタ)を消費します。例えば、クライアントのバグによる急速な再接続など、接続の急増はサーバーをダウンさせる可能性があります。接続数の異常をアラートで通知します。

目標は、最も洗練されたリアルタイムアーキテクチャを持つことではありません。目標は、アプリケーションが応答性があると感じるのに十分な速さでユーザーにデータを配信し、その感覚を実現するためのインフラストの複雑さを最小限に抑えることです。

DU

Danil Ulmashev

Full Stack Developer

一緒にお仕事しませんか?