Skip to content

Node.js WebSocket 实时通信指南

更新: 8/8/2025 字数: 0 字 时长: 0 分钟

本章将介绍如何在 Node.js 中实现 WebSocket 实时通信,包括 Socket.IO、原生 WebSocket、实时聊天、推送通知、性能优化等核心内容。

WebSocket 基础

1. WebSocket 概述

WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,它使得客户端和服务器之间的数据交换变得更加简单。

优势:

  • 全双工通信
  • 低延迟
  • 减少服务器负载
  • 实时性强
  • 支持二进制数据

应用场景:

  • 实时聊天
  • 在线游戏
  • 实时协作
  • 股票行情
  • 系统监控
  • 推送通知

2. 技术选型

javascript
// 原生 WebSocket
const WebSocket = require('ws');

// Socket.IO (推荐)
const io = require('socket.io');

// uWebSockets.js (高性能)
const uWS = require('uWebSockets.js');

Socket.IO 实现

1. 基础服务器设置

javascript
// server.js - Socket.IO 服务器
const express = require('express');
const http = require('http');
const socketIo = require('socket.io');
const cors = require('cors');
const jwt = require('jsonwebtoken');
const Redis = require('ioredis');

class WebSocketServer {
  constructor() {
    this.app = express();
    this.server = http.createServer(this.app);
    this.redis = new Redis({
      host: process.env.REDIS_HOST || 'localhost',
      port: process.env.REDIS_PORT || 6379
    });
    
    // Socket.IO 配置
    this.io = socketIo(this.server, {
      cors: {
        origin: process.env.CLIENT_URL || "http://localhost:3000",
        methods: ["GET", "POST"],
        credentials: true
      },
      transports: ['websocket', 'polling'],
      pingTimeout: 60000,
      pingInterval: 25000,
      maxHttpBufferSize: 1e6, // 1MB
      allowEIO3: true
    });
    
    this.setupMiddleware();
    this.setupAuthentication();
    this.setupEventHandlers();
    this.setupRedisAdapter();
  }
  
  setupMiddleware() {
    this.app.use(cors());
    this.app.use(express.json());
    this.app.use(express.static('public'));
    
    // 健康检查
    this.app.get('/health', (req, res) => {
      res.json({ 
        status: 'ok', 
        connections: this.io.engine.clientsCount,
        timestamp: new Date().toISOString()
      });
    });
  }
  
  setupAuthentication() {
    // Socket.IO 认证中间件
    this.io.use(async (socket, next) => {
      try {
        const token = socket.handshake.auth.token || 
                     socket.handshake.headers.authorization?.replace('Bearer ', '');
        
        if (!token) {
          throw new Error('No token provided');
        }
        
        const decoded = jwt.verify(token, process.env.JWT_SECRET);
        const user = await this.getUserById(decoded.id);
        
        if (!user) {
          throw new Error('User not found');
        }
        
        socket.userId = user.id;
        socket.username = user.username;
        socket.user = user;
        
        next();
      } catch (error) {
        console.error('Socket authentication error:', error.message);
        next(new Error('Authentication failed'));
      }
    });
  }
  
  setupRedisAdapter() {
    // Redis 适配器用于多实例扩展
    const redisAdapter = require('socket.io-redis');
    this.io.adapter(redisAdapter({
      host: process.env.REDIS_HOST || 'localhost',
      port: process.env.REDIS_PORT || 6379
    }));
  }
  
  setupEventHandlers() {
    this.io.on('connection', (socket) => {
      console.log(`User ${socket.username} connected: ${socket.id}`);
      
      // 用户上线
      this.handleUserOnline(socket);
      
      // 聊天相关事件
      this.setupChatHandlers(socket);
      
      // 房间相关事件
      this.setupRoomHandlers(socket);
      
      // 通知相关事件
      this.setupNotificationHandlers(socket);
      
      // 断开连接
      socket.on('disconnect', (reason) => {
        console.log(`User ${socket.username} disconnected: ${reason}`);
        this.handleUserOffline(socket);
      });
      
      // 错误处理
      socket.on('error', (error) => {
        console.error(`Socket error for user ${socket.username}:`, error);
      });
    });
  }
  
  async handleUserOnline(socket) {
    // 更新用户在线状态
    await this.redis.hset('users:online', socket.userId, JSON.stringify({
      socketId: socket.id,
      username: socket.username,
      connectedAt: new Date().toISOString()
    }));
    
    // 加入用户个人房间
    socket.join(`user:${socket.userId}`);
    
    // 通知好友用户上线
    const friends = await this.getUserFriends(socket.userId);
    friends.forEach(friendId => {
      socket.to(`user:${friendId}`).emit('user:online', {
        userId: socket.userId,
        username: socket.username
      });
    });
    
    // 发送未读消息
    const unreadMessages = await this.getUnreadMessages(socket.userId);
    if (unreadMessages.length > 0) {
      socket.emit('messages:unread', unreadMessages);
    }
  }
  
  async handleUserOffline(socket) {
    // 移除用户在线状态
    await this.redis.hdel('users:online', socket.userId);
    
    // 通知好友用户下线
    const friends = await this.getUserFriends(socket.userId);
    friends.forEach(friendId => {
      socket.to(`user:${friendId}`).emit('user:offline', {
        userId: socket.userId,
        username: socket.username
      });
    });
  }
  
  setupChatHandlers(socket) {
    // 发送私聊消息
    socket.on('message:send', async (data) => {
      try {
        const { recipientId, content, type = 'text' } = data;
        
        // 验证输入
        if (!recipientId || !content) {
          socket.emit('error', { message: 'Missing required fields' });
          return;
        }
        
        // 检查是否为好友关系
        const isFriend = await this.checkFriendship(socket.userId, recipientId);
        if (!isFriend) {
          socket.emit('error', { message: 'Can only send messages to friends' });
          return;
        }
        
        // 保存消息到数据库
        const message = await this.saveMessage({
          senderId: socket.userId,
          recipientId,
          content,
          type,
          timestamp: new Date()
        });
        
        // 发送给接收者
        socket.to(`user:${recipientId}`).emit('message:received', {
          id: message.id,
          senderId: socket.userId,
          senderUsername: socket.username,
          content,
          type,
          timestamp: message.timestamp
        });
        
        // 确认发送成功
        socket.emit('message:sent', {
          id: message.id,
          recipientId,
          timestamp: message.timestamp
        });
        
        // 推送通知(如果用户离线)
        const isRecipientOnline = await this.redis.hexists('users:online', recipientId);
        if (!isRecipientOnline) {
          await this.sendPushNotification(recipientId, {
            title: `New message from ${socket.username}`,
            body: content.substring(0, 100),
            type: 'message'
          });
        }
        
      } catch (error) {
        console.error('Message send error:', error);
        socket.emit('error', { message: 'Failed to send message' });
      }
    });
    
    // 标记消息已读
    socket.on('message:read', async (data) => {
      try {
        const { messageIds } = data;
        
        await this.markMessagesAsRead(messageIds, socket.userId);
        
        // 通知发送者消息已读
        for (const messageId of messageIds) {
          const message = await this.getMessage(messageId);
          if (message && message.senderId !== socket.userId) {
            socket.to(`user:${message.senderId}`).emit('message:read', {
              messageId,
              readBy: socket.userId,
              readAt: new Date()
            });
          }
        }
        
      } catch (error) {
        console.error('Mark message read error:', error);
      }
    });
    
    // 正在输入状态
    socket.on('typing:start', (data) => {
      const { recipientId } = data;
      socket.to(`user:${recipientId}`).emit('typing:start', {
        userId: socket.userId,
        username: socket.username
      });
    });
    
    socket.on('typing:stop', (data) => {
      const { recipientId } = data;
      socket.to(`user:${recipientId}`).emit('typing:stop', {
        userId: socket.userId
      });
    });
  }
  
  setupRoomHandlers(socket) {
    // 加入房间
    socket.on('room:join', async (data) => {
      try {
        const { roomId } = data;
        
        // 验证用户是否有权限加入房间
        const hasPermission = await this.checkRoomPermission(socket.userId, roomId);
        if (!hasPermission) {
          socket.emit('error', { message: 'No permission to join room' });
          return;
        }
        
        socket.join(roomId);
        
        // 通知房间内其他用户
        socket.to(roomId).emit('room:user_joined', {
          userId: socket.userId,
          username: socket.username
        });
        
        // 发送房间历史消息
        const roomMessages = await this.getRoomMessages(roomId, 50);
        socket.emit('room:messages', roomMessages);
        
        console.log(`User ${socket.username} joined room ${roomId}`);
        
      } catch (error) {
        console.error('Room join error:', error);
        socket.emit('error', { message: 'Failed to join room' });
      }
    });
    
    // 离开房间
    socket.on('room:leave', (data) => {
      const { roomId } = data;
      socket.leave(roomId);
      
      // 通知房间内其他用户
      socket.to(roomId).emit('room:user_left', {
        userId: socket.userId,
        username: socket.username
      });
      
      console.log(`User ${socket.username} left room ${roomId}`);
    });
    
    // 房间消息
    socket.on('room:message', async (data) => {
      try {
        const { roomId, content, type = 'text' } = data;
        
        // 验证用户是否在房间中
        if (!socket.rooms.has(roomId)) {
          socket.emit('error', { message: 'Not in room' });
          return;
        }
        
        // 保存房间消息
        const message = await this.saveRoomMessage({
          roomId,
          senderId: socket.userId,
          content,
          type,
          timestamp: new Date()
        });
        
        // 广播给房间内所有用户
        this.io.to(roomId).emit('room:message', {
          id: message.id,
          roomId,
          senderId: socket.userId,
          senderUsername: socket.username,
          content,
          type,
          timestamp: message.timestamp
        });
        
      } catch (error) {
        console.error('Room message error:', error);
        socket.emit('error', { message: 'Failed to send room message' });
      }
    });
  }
  
  setupNotificationHandlers(socket) {
    // 订阅通知
    socket.on('notification:subscribe', (data) => {
      const { topics } = data;
      
      topics.forEach(topic => {
        socket.join(`notification:${topic}`);
      });
      
      console.log(`User ${socket.username} subscribed to notifications:`, topics);
    });
    
    // 取消订阅通知
    socket.on('notification:unsubscribe', (data) => {
      const { topics } = data;
      
      topics.forEach(topic => {
        socket.leave(`notification:${topic}`);
      });
      
      console.log(`User ${socket.username} unsubscribed from notifications:`, topics);
    });
  }
  
  // 广播通知
  async broadcastNotification(topic, notification) {
    this.io.to(`notification:${topic}`).emit('notification', {
      ...notification,
      timestamp: new Date()
    });
  }
  
  // 发送个人通知
  async sendPersonalNotification(userId, notification) {
    this.io.to(`user:${userId}`).emit('notification', {
      ...notification,
      timestamp: new Date()
    });
  }
  
  // 获取在线用户数
  getOnlineUsersCount() {
    return this.io.engine.clientsCount;
  }
  
  // 获取房间用户数
  getRoomUsersCount(roomId) {
    const room = this.io.sockets.adapter.rooms.get(roomId);
    return room ? room.size : 0;
  }
  
  // 辅助方法(需要根据实际数据库实现)
  async getUserById(id) {
    // 实现获取用户逻辑
    return { id, username: 'user' + id };
  }
  
  async getUserFriends(userId) {
    // 实现获取用户好友列表逻辑
    return [];
  }
  
  async checkFriendship(userId1, userId2) {
    // 实现检查好友关系逻辑
    return true;
  }
  
  async saveMessage(messageData) {
    // 实现保存消息逻辑
    return { id: Date.now(), ...messageData };
  }
  
  async getMessage(messageId) {
    // 实现获取消息逻辑
    return null;
  }
  
  async getUnreadMessages(userId) {
    // 实现获取未读消息逻辑
    return [];
  }
  
  async markMessagesAsRead(messageIds, userId) {
    // 实现标记消息已读逻辑
  }
  
  async checkRoomPermission(userId, roomId) {
    // 实现检查房间权限逻辑
    return true;
  }
  
  async getRoomMessages(roomId, limit) {
    // 实现获取房间消息逻辑
    return [];
  }
  
  async saveRoomMessage(messageData) {
    // 实现保存房间消息逻辑
    return { id: Date.now(), ...messageData };
  }
  
  async sendPushNotification(userId, notification) {
    // 实现推送通知逻辑
  }
  
  start(port = 3000) {
    this.server.listen(port, () => {
      console.log(`WebSocket server running on port ${port}`);
    });
  }
}

// 启动服务器
const wsServer = new WebSocketServer();
wsServer.start(process.env.PORT || 3000);

module.exports = WebSocketServer;

2. 客户端实现

javascript
// client/websocket-client.js
class WebSocketClient {
  constructor(serverUrl, options = {}) {
    this.serverUrl = serverUrl;
    this.options = {
      autoConnect: true,
      reconnection: true,
      reconnectionAttempts: 5,
      reconnectionDelay: 1000,
      timeout: 20000,
      ...options
    };
    
    this.socket = null;
    this.isConnected = false;
    this.eventHandlers = new Map();
    this.messageQueue = [];
    this.reconnectAttempts = 0;
    
    if (this.options.autoConnect) {
      this.connect();
    }
  }
  
  connect(token) {
    if (this.socket && this.isConnected) {
      console.warn('Already connected');
      return;
    }
    
    this.socket = io(this.serverUrl, {
      auth: {
        token: token || this.getStoredToken()
      },
      transports: ['websocket', 'polling'],
      timeout: this.options.timeout
    });
    
    this.setupEventHandlers();
  }
  
  setupEventHandlers() {
    // 连接成功
    this.socket.on('connect', () => {
      console.log('Connected to WebSocket server');
      this.isConnected = true;
      this.reconnectAttempts = 0;
      
      // 发送队列中的消息
      this.flushMessageQueue();
      
      this.emit('connected');
    });
    
    // 连接失败
    this.socket.on('connect_error', (error) => {
      console.error('Connection error:', error.message);
      this.isConnected = false;
      this.emit('connection_error', error);
      
      // 自动重连
      if (this.options.reconnection && 
          this.reconnectAttempts < this.options.reconnectionAttempts) {
        this.reconnectAttempts++;
        setTimeout(() => {
          console.log(`Reconnection attempt ${this.reconnectAttempts}`);
          this.connect();
        }, this.options.reconnectionDelay * this.reconnectAttempts);
      }
    });
    
    // 断开连接
    this.socket.on('disconnect', (reason) => {
      console.log('Disconnected:', reason);
      this.isConnected = false;
      this.emit('disconnected', reason);
    });
    
    // 错误处理
    this.socket.on('error', (error) => {
      console.error('Socket error:', error);
      this.emit('error', error);
    });
    
    // 消息事件
    this.socket.on('message:received', (data) => {
      this.emit('message_received', data);
    });
    
    this.socket.on('message:sent', (data) => {
      this.emit('message_sent', data);
    });
    
    this.socket.on('message:read', (data) => {
      this.emit('message_read', data);
    });
    
    // 用户状态事件
    this.socket.on('user:online', (data) => {
      this.emit('user_online', data);
    });
    
    this.socket.on('user:offline', (data) => {
      this.emit('user_offline', data);
    });
    
    // 输入状态事件
    this.socket.on('typing:start', (data) => {
      this.emit('typing_start', data);
    });
    
    this.socket.on('typing:stop', (data) => {
      this.emit('typing_stop', data);
    });
    
    // 房间事件
    this.socket.on('room:user_joined', (data) => {
      this.emit('room_user_joined', data);
    });
    
    this.socket.on('room:user_left', (data) => {
      this.emit('room_user_left', data);
    });
    
    this.socket.on('room:message', (data) => {
      this.emit('room_message', data);
    });
    
    this.socket.on('room:messages', (data) => {
      this.emit('room_messages', data);
    });
    
    // 通知事件
    this.socket.on('notification', (data) => {
      this.emit('notification', data);
    });
  }
  
  // 发送消息
  sendMessage(recipientId, content, type = 'text') {
    const messageData = {
      recipientId,
      content,
      type,
      timestamp: new Date()
    };
    
    if (this.isConnected) {
      this.socket.emit('message:send', messageData);
    } else {
      // 添加到队列,连接后发送
      this.messageQueue.push({
        event: 'message:send',
        data: messageData
      });
    }
  }
  
  // 标记消息已读
  markMessagesRead(messageIds) {
    if (this.isConnected) {
      this.socket.emit('message:read', { messageIds });
    }
  }
  
  // 发送输入状态
  startTyping(recipientId) {
    if (this.isConnected) {
      this.socket.emit('typing:start', { recipientId });
    }
  }
  
  stopTyping(recipientId) {
    if (this.isConnected) {
      this.socket.emit('typing:stop', { recipientId });
    }
  }
  
  // 加入房间
  joinRoom(roomId) {
    if (this.isConnected) {
      this.socket.emit('room:join', { roomId });
    }
  }
  
  // 离开房间
  leaveRoom(roomId) {
    if (this.isConnected) {
      this.socket.emit('room:leave', { roomId });
    }
  }
  
  // 发送房间消息
  sendRoomMessage(roomId, content, type = 'text') {
    if (this.isConnected) {
      this.socket.emit('room:message', {
        roomId,
        content,
        type
      });
    }
  }
  
  // 订阅通知
  subscribeNotifications(topics) {
    if (this.isConnected) {
      this.socket.emit('notification:subscribe', { topics });
    }
  }
  
  // 取消订阅通知
  unsubscribeNotifications(topics) {
    if (this.isConnected) {
      this.socket.emit('notification:unsubscribe', { topics });
    }
  }
  
  // 事件监听
  on(event, handler) {
    if (!this.eventHandlers.has(event)) {
      this.eventHandlers.set(event, []);
    }
    this.eventHandlers.get(event).push(handler);
  }
  
  // 移除事件监听
  off(event, handler) {
    if (this.eventHandlers.has(event)) {
      const handlers = this.eventHandlers.get(event);
      const index = handlers.indexOf(handler);
      if (index > -1) {
        handlers.splice(index, 1);
      }
    }
  }
  
  // 触发事件
  emit(event, data) {
    if (this.eventHandlers.has(event)) {
      this.eventHandlers.get(event).forEach(handler => {
        try {
          handler(data);
        } catch (error) {
          console.error(`Error in event handler for ${event}:`, error);
        }
      });
    }
  }
  
  // 发送队列中的消息
  flushMessageQueue() {
    while (this.messageQueue.length > 0) {
      const { event, data } = this.messageQueue.shift();
      this.socket.emit(event, data);
    }
  }
  
  // 获取存储的令牌
  getStoredToken() {
    return localStorage.getItem('auth_token');
  }
  
  // 断开连接
  disconnect() {
    if (this.socket) {
      this.socket.disconnect();
      this.isConnected = false;
    }
  }
  
  // 获取连接状态
  getConnectionStatus() {
    return {
      connected: this.isConnected,
      socketId: this.socket?.id,
      transport: this.socket?.io?.engine?.transport?.name
    };
  }
}

// 使用示例
const wsClient = new WebSocketClient('http://localhost:3000');

// 监听消息
wsClient.on('message_received', (message) => {
  console.log('New message:', message);
  // 更新UI显示新消息
});

// 监听用户状态
wsClient.on('user_online', (user) => {
  console.log('User online:', user.username);
  // 更新用户在线状态
});

// 发送消息
wsClient.sendMessage('user123', 'Hello, how are you?');

module.exports = WebSocketClient;

原生 WebSocket 实现

1. 原生 WebSocket 服务器

javascript
// native-websocket-server.js
const WebSocket = require('ws');
const http = require('http');
const url = require('url');
const jwt = require('jsonwebtoken');

class NativeWebSocketServer {
  constructor(options = {}) {
    this.options = {
      port: 8080,
      maxConnections: 1000,
      heartbeatInterval: 30000,
      ...options
    };
    
    this.clients = new Map();
    this.rooms = new Map();
    this.server = null;
    this.wss = null;
    
    this.setupServer();
    this.startHeartbeat();
  }
  
  setupServer() {
    // 创建 HTTP 服务器
    this.server = http.createServer();
    
    // 创建 WebSocket 服务器
    this.wss = new WebSocket.Server({
      server: this.server,
      verifyClient: this.verifyClient.bind(this),
      maxPayload: 1024 * 1024 // 1MB
    });
    
    this.wss.on('connection', this.handleConnection.bind(this));
    
    this.server.on('upgrade', (request, socket, head) => {
      console.log('WebSocket upgrade request');
    });
  }
  
  verifyClient(info) {
    try {
      const query = url.parse(info.req.url, true).query;
      const token = query.token || info.req.headers.authorization?.replace('Bearer ', '');
      
      if (!token) {
        console.log('No token provided');
        return false;
      }
      
      const decoded = jwt.verify(token, process.env.JWT_SECRET);
      info.req.user = decoded;
      
      // 检查连接数限制
      if (this.clients.size >= this.options.maxConnections) {
        console.log('Max connections reached');
        return false;
      }
      
      return true;
    } catch (error) {
      console.error('Token verification failed:', error.message);
      return false;
    }
  }
  
  handleConnection(ws, request) {
    const user = request.user;
    const clientId = this.generateClientId();
    
    // 客户端信息
    const client = {
      id: clientId,
      ws,
      user,
      rooms: new Set(),
      lastPing: Date.now(),
      isAlive: true
    };
    
    this.clients.set(clientId, client);
    
    console.log(`Client connected: ${user.username} (${clientId})`);
    
    // 设置事件处理
    ws.on('message', (data) => this.handleMessage(client, data));
    ws.on('close', (code, reason) => this.handleDisconnect(client, code, reason));
    ws.on('error', (error) => this.handleError(client, error));
    ws.on('pong', () => this.handlePong(client));
    
    // 发送连接确认
    this.sendToClient(client, {
      type: 'connection',
      status: 'connected',
      clientId
    });
  }
  
  handleMessage(client, data) {
    try {
      const message = JSON.parse(data.toString());
      
      switch (message.type) {
        case 'chat':
          this.handleChatMessage(client, message);
          break;
        case 'join_room':
          this.handleJoinRoom(client, message);
          break;
        case 'leave_room':
          this.handleLeaveRoom(client, message);
          break;
        case 'ping':
          this.handlePing(client);
          break;
        default:
          console.log('Unknown message type:', message.type);
      }
    } catch (error) {
      console.error('Message parsing error:', error);
      this.sendError(client, 'Invalid message format');
    }
  }
  
  handleChatMessage(client, message) {
    const { recipientId, content, roomId } = message;
    
    if (roomId) {
      // 房间消息
      this.broadcastToRoom(roomId, {
        type: 'chat',
        senderId: client.user.id,
        senderName: client.user.username,
        content,
        timestamp: new Date().toISOString()
      }, client.id);
    } else if (recipientId) {
      // 私聊消息
      const recipient = this.findClientByUserId(recipientId);
      if (recipient) {
        this.sendToClient(recipient, {
          type: 'chat',
          senderId: client.user.id,
          senderName: client.user.username,
          content,
          timestamp: new Date().toISOString()
        });
      }
    }
  }
  
  handleJoinRoom(client, message) {
    const { roomId } = message;
    
    if (!this.rooms.has(roomId)) {
      this.rooms.set(roomId, new Set());
    }
    
    this.rooms.get(roomId).add(client.id);
    client.rooms.add(roomId);
    
    // 通知房间内其他用户
    this.broadcastToRoom(roomId, {
      type: 'user_joined',
      userId: client.user.id,
      username: client.user.username
    }, client.id);
    
    // 确认加入成功
    this.sendToClient(client, {
      type: 'room_joined',
      roomId
    });
    
    console.log(`User ${client.user.username} joined room ${roomId}`);
  }
  
  handleLeaveRoom(client, message) {
    const { roomId } = message;
    
    if (this.rooms.has(roomId)) {
      this.rooms.get(roomId).delete(client.id);
      
      // 如果房间为空,删除房间
      if (this.rooms.get(roomId).size === 0) {
        this.rooms.delete(roomId);
      }
    }
    
    client.rooms.delete(roomId);
    
    // 通知房间内其他用户
    this.broadcastToRoom(roomId, {
      type: 'user_left',
      userId: client.user.id,
      username: client.user.username
    });
    
    console.log(`User ${client.user.username} left room ${roomId}`);
  }
  
  handlePing(client) {
    client.lastPing = Date.now();
    this.sendToClient(client, { type: 'pong' });
  }
  
  handlePong(client) {
    client.isAlive = true;
    client.lastPing = Date.now();
  }
  
  handleDisconnect(client, code, reason) {
    console.log(`Client disconnected: ${client.user.username} (${code}: ${reason})`);
    
    // 从所有房间中移除
    client.rooms.forEach(roomId => {
      if (this.rooms.has(roomId)) {
        this.rooms.get(roomId).delete(client.id);
        
        // 通知房间内其他用户
        this.broadcastToRoom(roomId, {
          type: 'user_left',
          userId: client.user.id,
          username: client.user.username
        });
        
        // 如果房间为空,删除房间
        if (this.rooms.get(roomId).size === 0) {
          this.rooms.delete(roomId);
        }
      }
    });
    
    // 移除客户端
    this.clients.delete(client.id);
  }
  
  handleError(client, error) {
    console.error(`Client error for ${client.user.username}:`, error);
  }
  
  sendToClient(client, data) {
    if (client.ws.readyState === WebSocket.OPEN) {
      client.ws.send(JSON.stringify(data));
    }
  }
  
  sendError(client, message) {
    this.sendToClient(client, {
      type: 'error',
      message
    });
  }
  
  broadcastToRoom(roomId, data, excludeClientId = null) {
    if (!this.rooms.has(roomId)) return;
    
    this.rooms.get(roomId).forEach(clientId => {
      if (clientId !== excludeClientId) {
        const client = this.clients.get(clientId);
        if (client) {
          this.sendToClient(client, data);
        }
      }
    });
  }
  
  broadcast(data, excludeClientId = null) {
    this.clients.forEach((client, clientId) => {
      if (clientId !== excludeClientId) {
        this.sendToClient(client, data);
      }
    });
  }
  
  findClientByUserId(userId) {
    for (const client of this.clients.values()) {
      if (client.user.id === userId) {
        return client;
      }
    }
    return null;
  }
  
  generateClientId() {
    return Math.random().toString(36).substring(2) + Date.now().toString(36);
  }
  
  startHeartbeat() {
    setInterval(() => {
      this.clients.forEach((client, clientId) => {
        if (!client.isAlive) {
          console.log(`Terminating inactive client: ${client.user.username}`);
          client.ws.terminate();
          this.clients.delete(clientId);
          return;
        }
        
        client.isAlive = false;
        if (client.ws.readyState === WebSocket.OPEN) {
          client.ws.ping();
        }
      });
    }, this.options.heartbeatInterval);
  }
  
  getStats() {
    return {
      totalClients: this.clients.size,
      totalRooms: this.rooms.size,
      roomStats: Array.from(this.rooms.entries()).map(([roomId, clients]) => ({
        roomId,
        clientCount: clients.size
      }))
    };
  }
  
  start() {
    this.server.listen(this.options.port, () => {
      console.log(`Native WebSocket server running on port ${this.options.port}`);
    });
  }
}

// 启动服务器
const wsServer = new NativeWebSocketServer({ port: 8080 });
wsServer.start();

module.exports = NativeWebSocketServer;

2. 原生 WebSocket 客户端

javascript
// native-websocket-client.js
class NativeWebSocketClient {
  constructor(url, options = {}) {
    this.url = url;
    this.options = {
      reconnectInterval: 5000,
      maxReconnectAttempts: 5,
      pingInterval: 30000,
      ...options
    };
    
    this.ws = null;
    this.isConnected = false;
    this.reconnectAttempts = 0;
    this.eventHandlers = {};
    this.messageQueue = [];
    this.pingTimer = null;
    
    this.connect();
  }
  
  connect() {
    try {
      const token = this.getToken();
      const wsUrl = `${this.url}?token=${encodeURIComponent(token)}`;
      
      this.ws = new WebSocket(wsUrl);
      
      this.ws.onopen = this.handleOpen.bind(this);
      this.ws.onmessage = this.handleMessage.bind(this);
      this.ws.onclose = this.handleClose.bind(this);
      this.ws.onerror = this.handleError.bind(this);
      
    } catch (error) {
      console.error('WebSocket connection error:', error);
      this.scheduleReconnect();
    }
  }
  
  handleOpen() {
    console.log('WebSocket connected');
    this.isConnected = true;
    this.reconnectAttempts = 0;
    
    // 发送队列中的消息
    this.flushMessageQueue();
    
    // 开始心跳
    this.startPing();
    
    this.emit('connected');
  }
  
  handleMessage(event) {
    try {
      const data = JSON.parse(event.data);
      
      switch (data.type) {
        case 'connection':
          this.emit('connection', data);
          break;
        case 'chat':
          this.emit('message', data);
          break;
        case 'user_joined':
          this.emit('user_joined', data);
          break;
        case 'user_left':
          this.emit('user_left', data);
          break;
        case 'room_joined':
          this.emit('room_joined', data);
          break;
        case 'pong':
          // 心跳响应
          break;
        case 'error':
          this.emit('error', data);
          break;
        default:
          console.log('Unknown message type:', data.type);
      }
    } catch (error) {
      console.error('Message parsing error:', error);
    }
  }
  
  handleClose(event) {
    console.log('WebSocket disconnected:', event.code, event.reason);
    this.isConnected = false;
    this.stopPing();
    
    this.emit('disconnected', { code: event.code, reason: event.reason });
    
    // 自动重连
    if (this.reconnectAttempts < this.options.maxReconnectAttempts) {
      this.scheduleReconnect();
    }
  }
  
  handleError(error) {
    console.error('WebSocket error:', error);
    this.emit('error', error);
  }
  
  scheduleReconnect() {
    if (this.reconnectAttempts >= this.options.maxReconnectAttempts) {
      console.log('Max reconnect attempts reached');
      return;
    }
    
    this.reconnectAttempts++;
    console.log(`Reconnecting in ${this.options.reconnectInterval}ms (attempt ${this.reconnectAttempts})`);
    
    setTimeout(() => {
      this.connect();
    }, this.options.reconnectInterval);
  }
  
  send(data) {
    const message = JSON.stringify(data);
    
    if (this.isConnected && this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(message);
    } else {
      // 添加到队列
      this.messageQueue.push(message);
    }
  }
  
  sendMessage(recipientId, content) {
    this.send({
      type: 'chat',
      recipientId,
      content,
      timestamp: new Date().toISOString()
    });
  }
  
  sendRoomMessage(roomId, content) {
    this.send({
      type: 'chat',
      roomId,
      content,
      timestamp: new Date().toISOString()
    });
  }
  
  joinRoom(roomId) {
    this.send({
      type: 'join_room',
      roomId
    });
  }
  
  leaveRoom(roomId) {
    this.send({
      type: 'leave_room',
      roomId
    });
  }
  
  startPing() {
    this.pingTimer = setInterval(() => {
      if (this.isConnected) {
        this.send({ type: 'ping' });
      }
    }, this.options.pingInterval);
  }
  
  stopPing() {
    if (this.pingTimer) {
      clearInterval(this.pingTimer);
      this.pingTimer = null;
    }
  }
  
  flushMessageQueue() {
    while (this.messageQueue.length > 0) {
      const message = this.messageQueue.shift();
      this.ws.send(message);
    }
  }
  
  on(event, handler) {
    if (!this.eventHandlers[event]) {
      this.eventHandlers[event] = [];
    }
    this.eventHandlers[event].push(handler);
  }
  
  off(event, handler) {
    if (this.eventHandlers[event]) {
      const index = this.eventHandlers[event].indexOf(handler);
      if (index > -1) {
        this.eventHandlers[event].splice(index, 1);
      }
    }
  }
  
  emit(event, data) {
    if (this.eventHandlers[event]) {
      this.eventHandlers[event].forEach(handler => {
        try {
          handler(data);
        } catch (error) {
          console.error(`Error in event handler for ${event}:`, error);
        }
      });
    }
  }
  
  getToken() {
    // 从 localStorage 或其他地方获取令牌
    return localStorage.getItem('auth_token') || '';
  }
  
  disconnect() {
    if (this.ws) {
      this.ws.close();
    }
    this.stopPing();
  }
  
  getConnectionState() {
    return {
      connected: this.isConnected,
      readyState: this.ws?.readyState,
      reconnectAttempts: this.reconnectAttempts
    };
  }
}

// 使用示例
const client = new NativeWebSocketClient('ws://localhost:8080');

client.on('connected', () => {
  console.log('Connected to server');
  client.joinRoom('general');
});

client.on('message', (data) => {
  console.log('Received message:', data);
});

client.on('user_joined', (data) => {
  console.log('User joined:', data.username);
});

// 发送消息
client.sendRoomMessage('general', 'Hello everyone!');

module.exports = NativeWebSocketClient;

实时聊天应用

1. 聊天室管理器

javascript
// chat-room-manager.js
const EventEmitter = require('events');
const Redis = require('ioredis');

class ChatRoomManager extends EventEmitter {
  constructor(options = {}) {
    super();
    
    this.redis = new Redis({
      host: options.redisHost || 'localhost',
      port: options.redisPort || 6379
    });
    
    this.rooms = new Map();
    this.userRooms = new Map(); // 用户所在房间映射
    this.messageHistory = new Map(); // 消息历史缓存
    
    this.setupRedisSubscription();
  }
  
  setupRedisSubscription() {
    // 订阅 Redis 频道用于多实例同步
    const subscriber = this.redis.duplicate();
    subscriber.subscribe('chat:room:*');
    
    subscriber.on('message', (channel, message) => {
      const [, , , roomId] = channel.split(':');
      const data = JSON.parse(message);
      
      this.emit('room_event', {
        roomId,
        ...data
      });
    });
  }
  
  async createRoom(roomData) {
    const {
      id,
      name,
      description,
      type = 'public', // public, private, direct
      maxMembers = 100,
      createdBy
    } = roomData;
    
    const room = {
      id,
      name,
      description,
      type,
      maxMembers,
      createdBy,
      createdAt: new Date(),
      members: new Set(),
      admins: new Set([createdBy]),
      banned: new Set(),
      settings: {
        allowFileUpload: true,
        allowVoiceMessage: true,
        messageRetention: 30 // 天
      }
    };
    
    this.rooms.set(id, room);
    
    // 保存到 Redis
    await this.redis.hset('chat:rooms', id, JSON.stringify({
      ...room,
      members: Array.from(room.members),
      admins: Array.from(room.admins),
      banned: Array.from(room.banned)
    }));
    
    return room;
  }
  
  async joinRoom(roomId, userId, userInfo) {
    const room = await this.getRoom(roomId);
    if (!room) {
      throw new Error('Room not found');
    }
    
    // 检查是否被禁止
    if (room.banned.has(userId)) {
      throw new Error('User is banned from this room');
    }
    
    // 检查房间人数限制
    if (room.members.size >= room.maxMembers) {
      throw new Error('Room is full');
    }
    
    // 检查私有房间权限
    if (room.type === 'private' && !room.members.has(userId) && !room.admins.has(userId)) {
      throw new Error('No permission to join private room');
    }
    
    room.members.add(userId);
    
    // 更新用户房间映射
    if (!this.userRooms.has(userId)) {
      this.userRooms.set(userId, new Set());
    }
    this.userRooms.get(userId).add(roomId);
    
    // 保存到 Redis
    await this.saveRoom(room);
    
    // 发布加入事件
    await this.publishRoomEvent(roomId, {
      type: 'user_joined',
      userId,
      userInfo,
      timestamp: new Date()
    });
    
    return {
      room: this.serializeRoom(room),
      recentMessages: await this.getRecentMessages(roomId, 50)
    };
  }
  
  async leaveRoom(roomId, userId) {
    const room = await this.getRoom(roomId);
    if (!room) return;
    
    room.members.delete(userId);
    
    // 更新用户房间映射
    if (this.userRooms.has(userId)) {
      this.userRooms.get(userId).delete(roomId);
    }
    
    // 如果是直接聊天房间且没有成员,删除房间
    if (room.type === 'direct' && room.members.size === 0) {
      await this.deleteRoom(roomId);
    } else {
      await this.saveRoom(room);
    }
    
    // 发布离开事件
    await this.publishRoomEvent(roomId, {
      type: 'user_left',
      userId,
      timestamp: new Date()
    });
  }
  
  async sendMessage(roomId, senderId, messageData) {
    const room = await this.getRoom(roomId);
    if (!room) {
      throw new Error('Room not found');
    }
    
    // 检查用户是否在房间中
    if (!room.members.has(senderId)) {
      throw new Error('User not in room');
    }
    
    const message = {
      id: this.generateMessageId(),
      roomId,
      senderId,
      content: messageData.content,
      type: messageData.type || 'text',
      timestamp: new Date(),
      edited: false,
      reactions: {},
      replyTo: messageData.replyTo || null,
      attachments: messageData.attachments || []
    };
    
    // 保存消息
    await this.saveMessage(message);
    
    // 更新消息历史缓存
    if (!this.messageHistory.has(roomId)) {
      this.messageHistory.set(roomId, []);
    }
    const history = this.messageHistory.get(roomId);
    history.push(message);
    
    // 限制缓存大小
    if (history.length > 100) {
      history.shift();
    }
    
    // 发布消息事件
    await this.publishRoomEvent(roomId, {
      type: 'message',
      message
    });
    
    return message;
  }
  
  async editMessage(messageId, userId, newContent) {
    const message = await this.getMessage(messageId);
    if (!message) {
      throw new Error('Message not found');
    }
    
    if (message.senderId !== userId) {
      throw new Error('Can only edit own messages');
    }
    
    // 检查编辑时间限制(例如:5分钟内)
    const editTimeLimit = 5 * 60 * 1000; // 5分钟
    if (Date.now() - new Date(message.timestamp).getTime() > editTimeLimit) {
      throw new Error('Message edit time limit exceeded');
    }
    
    message.content = newContent;
    message.edited = true;
    message.editedAt = new Date();
    
    await this.saveMessage(message);
    
    // 发布编辑事件
    await this.publishRoomEvent(message.roomId, {
      type: 'message_edited',
      message
    });
    
    return message;
  }
  
  async deleteMessage(messageId, userId) {
    const message = await this.getMessage(messageId);
    if (!message) {
      throw new Error('Message not found');
    }
    
    const room = await this.getRoom(message.roomId);
    
    // 检查删除权限(消息发送者或房间管理员)
    if (message.senderId !== userId && !room.admins.has(userId)) {
      throw new Error('No permission to delete message');
    }
    
    await this.redis.hdel('chat:messages', messageId);
    
    // 从缓存中移除
    if (this.messageHistory.has(message.roomId)) {
      const history = this.messageHistory.get(message.roomId);
      const index = history.findIndex(msg => msg.id === messageId);
      if (index > -1) {
        history.splice(index, 1);
      }
    }
    
    // 发布删除事件
    await this.publishRoomEvent(message.roomId, {
      type: 'message_deleted',
      messageId,
      deletedBy: userId
    });
  }
  
  async addReaction(messageId, userId, emoji) {
    const message = await this.getMessage(messageId);
    if (!message) {
      throw new Error('Message not found');
    }
    
    if (!message.reactions[emoji]) {
      message.reactions[emoji] = [];
    }
    
    if (!message.reactions[emoji].includes(userId)) {
      message.reactions[emoji].push(userId);
      
      await this.saveMessage(message);
      
      // 发布反应事件
      await this.publishRoomEvent(message.roomId, {
        type: 'reaction_added',
        messageId,
        userId,
        emoji
      });
    }
  }
  
  async removeReaction(messageId, userId, emoji) {
    const message = await this.getMessage(messageId);
    if (!message) {
      throw new Error('Message not found');
    }
    
    if (message.reactions[emoji]) {
      const index = message.reactions[emoji].indexOf(userId);
      if (index > -1) {
        message.reactions[emoji].splice(index, 1);
        
        if (message.reactions[emoji].length === 0) {
          delete message.reactions[emoji];
        }
        
        await this.saveMessage(message);
        
        // 发布反应移除事件
        await this.publishRoomEvent(message.roomId, {
          type: 'reaction_removed',
          messageId,
          userId,
          emoji
        });
      }
    }
  }
  
  async getRoom(roomId) {
    if (this.rooms.has(roomId)) {
      return this.rooms.get(roomId);
    }
    
    // 从 Redis 加载
    const roomData = await this.redis.hget('chat:rooms', roomId);
    if (roomData) {
      const room = JSON.parse(roomData);
      room.members = new Set(room.members);
      room.admins = new Set(room.admins);
      room.banned = new Set(room.banned);
      
      this.rooms.set(roomId, room);
      return room;
    }
    
    return null;
  }
  
  async saveRoom(room) {
    await this.redis.hset('chat:rooms', room.id, JSON.stringify({
      ...room,
      members: Array.from(room.members),
      admins: Array.from(room.admins),
      banned: Array.from(room.banned)
    }));
  }
  
  async deleteRoom(roomId) {
    this.rooms.delete(roomId);
    await this.redis.hdel('chat:rooms', roomId);
    
    // 删除房间消息
    const messageKeys = await this.redis.keys(`chat:messages:${roomId}:*`);
    if (messageKeys.length > 0) {
      await this.redis.del(...messageKeys);
    }
  }
  
  async saveMessage(message) {
    await this.redis.hset('chat:messages', message.id, JSON.stringify(message));
    
    // 添加到房间消息列表
    await this.redis.zadd(
      `chat:room:${message.roomId}:messages`,
      new Date(message.timestamp).getTime(),
      message.id
    );
  }
  
  async getMessage(messageId) {
    const messageData = await this.redis.hget('chat:messages', messageId);
    return messageData ? JSON.parse(messageData) : null;
  }
  
  async getRecentMessages(roomId, limit = 50) {
    // 先从缓存获取
    if (this.messageHistory.has(roomId)) {
      const cached = this.messageHistory.get(roomId);
      if (cached.length >= limit) {
        return cached.slice(-limit);
      }
    }
    
    // 从 Redis 获取
    const messageIds = await this.redis.zrevrange(
      `chat:room:${roomId}:messages`,
      0,
      limit - 1
    );
    
    const messages = [];
    for (const messageId of messageIds) {
      const message = await this.getMessage(messageId);
      if (message) {
        messages.push(message);
      }
    }
    
    return messages.reverse();
  }
  
  async publishRoomEvent(roomId, eventData) {
    await this.redis.publish(
      `chat:room:${roomId}`,
      JSON.stringify(eventData)
    );
  }
  
  serializeRoom(room) {
    return {
      ...room,
      members: Array.from(room.members),
      admins: Array.from(room.admins),
      banned: Array.from(room.banned)
    };
  }
  
  generateMessageId() {
    return `msg_${Date.now()}_${Math.random().toString(36).substring(2)}`;
  }
  
  // 获取用户参与的所有房间
  async getUserRooms(userId) {
    const rooms = [];
    
    if (this.userRooms.has(userId)) {
      for (const roomId of this.userRooms.get(userId)) {
        const room = await this.getRoom(roomId);
        if (room) {
          rooms.push(this.serializeRoom(room));
        }
      }
    }
    
    return rooms;
  }
  
  // 搜索房间
  async searchRooms(query, userId, limit = 20) {
    const allRoomIds = await this.redis.hkeys('chat:rooms');
    const results = [];
    
    for (const roomId of allRoomIds) {
      if (results.length >= limit) break;
      
      const room = await this.getRoom(roomId);
      if (room && room.type === 'public' && 
          (room.name.toLowerCase().includes(query.toLowerCase()) ||
           room.description?.toLowerCase().includes(query.toLowerCase()))) {
        results.push(this.serializeRoom(room));
      }
    }
    
    return results;
  }
}

module.exports = ChatRoomManager;

2. 文件上传处理

javascript
// file-upload-handler.js
const multer = require('multer');
const path = require('path');
const fs = require('fs').promises;
const sharp = require('sharp');
const ffmpeg = require('fluent-ffmpeg');

class FileUploadHandler {
  constructor(options = {}) {
    this.uploadDir = options.uploadDir || './uploads';
    this.maxFileSize = options.maxFileSize || 10 * 1024 * 1024; // 10MB
    this.allowedTypes = options.allowedTypes || [
      'image/jpeg', 'image/png', 'image/gif', 'image/webp',
      'video/mp4', 'video/webm',
      'audio/mp3', 'audio/wav', 'audio/ogg',
      'application/pdf', 'text/plain'
    ];
    
    this.setupUploadDir();
    this.setupMulter();
  }
  
  async setupUploadDir() {
    try {
      await fs.access(this.uploadDir);
    } catch {
      await fs.mkdir(this.uploadDir, { recursive: true });
    }
    
    // 创建子目录
    const subdirs = ['images', 'videos', 'audio', 'documents'];
    for (const subdir of subdirs) {
      const dirPath = path.join(this.uploadDir, subdir);
      try {
        await fs.access(dirPath);
      } catch {
        await fs.mkdir(dirPath, { recursive: true });
      }
    }
  }
  
  setupMulter() {
    const storage = multer.diskStorage({
      destination: (req, file, cb) => {
        let subdir = 'documents';
        
        if (file.mimetype.startsWith('image/')) {
          subdir = 'images';
        } else if (file.mimetype.startsWith('video/')) {
          subdir = 'videos';
        } else if (file.mimetype.startsWith('audio/')) {
          subdir = 'audio';
        }
        
        cb(null, path.join(this.uploadDir, subdir));
      },
      filename: (req, file, cb) => {
        const uniqueSuffix = Date.now() + '-' + Math.round(Math.random() * 1E9);
        const ext = path.extname(file.originalname);
        cb(null, `${file.fieldname}-${uniqueSuffix}${ext}`);
      }
    });
    
    this.upload = multer({
      storage,
      limits: {
        fileSize: this.maxFileSize
      },
      fileFilter: (req, file, cb) => {
        if (this.allowedTypes.includes(file.mimetype)) {
          cb(null, true);
        } else {
          cb(new Error(`File type ${file.mimetype} not allowed`));
        }
      }
    });
  }
  
  async processUpload(file, userId) {
    const fileInfo = {
      id: this.generateFileId(),
      originalName: file.originalname,
      filename: file.filename,
      path: file.path,
      mimetype: file.mimetype,
      size: file.size,
      uploadedBy: userId,
      uploadedAt: new Date(),
      processed: false
    };
    
    // 根据文件类型进行处理
    if (file.mimetype.startsWith('image/')) {
      await this.processImage(fileInfo);
    } else if (file.mimetype.startsWith('video/')) {
      await this.processVideo(fileInfo);
    } else if (file.mimetype.startsWith('audio/')) {
      await this.processAudio(fileInfo);
    }
    
    fileInfo.processed = true;
    return fileInfo;
  }
  
  async processImage(fileInfo) {
    const inputPath = fileInfo.path;
    const dir = path.dirname(inputPath);
    const name = path.parse(fileInfo.filename).name;
    
    try {
      // 生成缩略图
      const thumbnailPath = path.join(dir, `${name}_thumb.jpg`);
      await sharp(inputPath)
        .resize(200, 200, { fit: 'cover' })
        .jpeg({ quality: 80 })
        .toFile(thumbnailPath);
      
      fileInfo.thumbnail = thumbnailPath;
      
      // 获取图片信息
      const metadata = await sharp(inputPath).metadata();
      fileInfo.metadata = {
        width: metadata.width,
        height: metadata.height,
        format: metadata.format
      };
      
      // 如果图片过大,生成压缩版本
      if (fileInfo.size > 2 * 1024 * 1024) { // 2MB
        const compressedPath = path.join(dir, `${name}_compressed.jpg`);
        await sharp(inputPath)
          .jpeg({ quality: 70 })
          .toFile(compressedPath);
        
        fileInfo.compressed = compressedPath;
      }
      
    } catch (error) {
      console.error('Image processing error:', error);
    }
  }
  
  async processVideo(fileInfo) {
    const inputPath = fileInfo.path;
    const dir = path.dirname(inputPath);
    const name = path.parse(fileInfo.filename).name;
    
    return new Promise((resolve, reject) => {
      // 生成视频缩略图
      const thumbnailPath = path.join(dir, `${name}_thumb.jpg`);
      
      ffmpeg(inputPath)
        .screenshots({
          timestamps: ['00:00:01'],
          filename: `${name}_thumb.jpg`,
          folder: dir,
          size: '200x200'
        })
        .on('end', () => {
          fileInfo.thumbnail = thumbnailPath;
          
          // 获取视频信息
          ffmpeg.ffprobe(inputPath, (err, metadata) => {
            if (!err && metadata) {
              const videoStream = metadata.streams.find(s => s.codec_type === 'video');
              if (videoStream) {
                fileInfo.metadata = {
                  duration: metadata.format.duration,
                  width: videoStream.width,
                  height: videoStream.height,
                  bitrate: metadata.format.bit_rate
                };
              }
            }
            resolve();
          });
        })
        .on('error', (error) => {
          console.error('Video processing error:', error);
          resolve(); // 不阻塞上传
        });
    });
  }
  
  async processAudio(fileInfo) {
    const inputPath = fileInfo.path;
    
    return new Promise((resolve) => {
      // 获取音频信息
      ffmpeg.ffprobe(inputPath, (err, metadata) => {
        if (!err && metadata) {
          const audioStream = metadata.streams.find(s => s.codec_type === 'audio');
          if (audioStream) {
            fileInfo.metadata = {
              duration: metadata.format.duration,
              bitrate: metadata.format.bit_rate,
              sampleRate: audioStream.sample_rate
            };
          }
        }
        resolve();
      });
    });
  }
  
  generateFileId() {
    return `file_${Date.now()}_${Math.random().toString(36).substring(2)}`;
  }
  
  getUploadMiddleware() {
    return this.upload.single('file');
  }
  
  async deleteFile(fileInfo) {
    try {
      // 删除主文件
      await fs.unlink(fileInfo.path);
      
      // 删除缩略图
      if (fileInfo.thumbnail) {
        await fs.unlink(fileInfo.thumbnail);
      }
      
      // 删除压缩版本
      if (fileInfo.compressed) {
        await fs.unlink(fileInfo.compressed);
      }
    } catch (error) {
      console.error('File deletion error:', error);
    }
  }
}

module.exports = FileUploadHandler;

推送通知系统

1. 通知管理器

javascript
// notification-manager.js
const webpush = require('web-push');
const apn = require('apn');
const admin = require('firebase-admin');
const Redis = require('ioredis');

class NotificationManager {
  constructor(options = {}) {
    this.redis = new Redis({
      host: options.redisHost || 'localhost',
      port: options.redisPort || 6379
    });
    
    this.setupWebPush(options.webPush);
    this.setupAPNS(options.apns);
    this.setupFCM(options.fcm);
    
    this.subscriptions = new Map();
    this.loadSubscriptions();
  }
  
  setupWebPush(config) {
    if (config) {
      webpush.setVapidDetails(
        config.subject,
        config.publicKey,
        config.privateKey
      );
      this.webPushEnabled = true;
    }
  }
  
  setupAPNS(config) {
    if (config) {
      this.apnProvider = new apn.Provider({
        token: {
          key: config.keyPath,
          keyId: config.keyId,
          teamId: config.teamId
        },
        production: config.production || false
      });
      this.apnsEnabled = true;
    }
  }
  
  setupFCM(config) {
    if (config) {
      admin.initializeApp({
        credential: admin.credential.cert(config.serviceAccount)
      });
      this.fcmEnabled = true;
    }
  }
  
  async loadSubscriptions() {
    try {
      const subscriptionData = await this.redis.hgetall('push:subscriptions');
      
      for (const [userId, data] of Object.entries(subscriptionData)) {
        this.subscriptions.set(userId, JSON.parse(data));
      }
      
      console.log(`Loaded ${this.subscriptions.size} push subscriptions`);
    } catch (error) {
      console.error('Failed to load subscriptions:', error);
    }
  }
  
  async saveSubscription(userId, subscription) {
    this.subscriptions.set(userId, subscription);
    
    await this.redis.hset(
      'push:subscriptions',
      userId,
      JSON.stringify(subscription)
    );
  }
  
  async removeSubscription(userId) {
    this.subscriptions.delete(userId);
    await this.redis.hdel('push:subscriptions', userId);
  }
  
  async subscribeWebPush(userId, subscription) {
    if (!this.webPushEnabled) {
      throw new Error('Web Push not configured');
    }
    
    const userSubscription = this.subscriptions.get(userId) || {};
    userSubscription.webPush = subscription;
    
    await this.saveSubscription(userId, userSubscription);
    
    console.log(`Web Push subscription saved for user ${userId}`);
  }
  
  async subscribeAPNS(userId, deviceToken) {
    if (!this.apnsEnabled) {
      throw new Error('APNS not configured');
    }
    
    const userSubscription = this.subscriptions.get(userId) || {};
    userSubscription.apns = { deviceToken };
    
    await this.saveSubscription(userId, userSubscription);
    
    console.log(`APNS subscription saved for user ${userId}`);
  }
  
  async subscribeFCM(userId, registrationToken) {
    if (!this.fcmEnabled) {
      throw new Error('FCM not configured');
    }
    
    const userSubscription = this.subscriptions.get(userId) || {};
    userSubscription.fcm = { registrationToken };
    
    await this.saveSubscription(userId, userSubscription);
    
    console.log(`FCM subscription saved for user ${userId}`);
  }
  
  async sendNotification(userId, notification) {
    const subscription = this.subscriptions.get(userId);
    if (!subscription) {
      console.log(`No subscription found for user ${userId}`);
      return;
    }
    
    const results = [];
    
    // Web Push
    if (subscription.webPush && this.webPushEnabled) {
      try {
        const result = await this.sendWebPush(subscription.webPush, notification);
        results.push({ type: 'webpush', success: true, result });
      } catch (error) {
        console.error('Web Push error:', error);
        results.push({ type: 'webpush', success: false, error: error.message });
        
        // 如果订阅无效,移除它
        if (error.statusCode === 410) {
          delete subscription.webPush;
          await this.saveSubscription(userId, subscription);
        }
      }
    }
    
    // APNS
    if (subscription.apns && this.apnsEnabled) {
      try {
        const result = await this.sendAPNS(subscription.apns.deviceToken, notification);
        results.push({ type: 'apns', success: true, result });
      } catch (error) {
        console.error('APNS error:', error);
        results.push({ type: 'apns', success: false, error: error.message });
      }
    }
    
    // FCM
    if (subscription.fcm && this.fcmEnabled) {
      try {
        const result = await this.sendFCM(subscription.fcm.registrationToken, notification);
        results.push({ type: 'fcm', success: true, result });
      } catch (error) {
        console.error('FCM error:', error);
        results.push({ type: 'fcm', success: false, error: error.message });
      }
    }
    
    // 保存通知历史
    await this.saveNotificationHistory(userId, notification, results);
    
    return results;
  }
  
  async sendWebPush(subscription, notification) {
    const payload = JSON.stringify({
      title: notification.title,
      body: notification.body,
      icon: notification.icon || '/icon-192x192.png',
      badge: notification.badge || '/badge-72x72.png',
      data: notification.data || {},
      actions: notification.actions || [],
      tag: notification.tag,
      requireInteraction: notification.requireInteraction || false
    });
    
    const options = {
      TTL: notification.ttl || 24 * 60 * 60, // 24 hours
      urgency: notification.urgency || 'normal'
    };
    
    return await webpush.sendNotification(subscription, payload, options);
  }
  
  async sendAPNS(deviceToken, notification) {
    const note = new apn.Notification();
    
    note.expiry = Math.floor(Date.now() / 1000) + (notification.ttl || 24 * 60 * 60);
    note.badge = notification.badge;
    note.sound = notification.sound || 'ping.aiff';
    note.alert = {
      title: notification.title,
      body: notification.body
    };
    note.payload = notification.data || {};
    note.topic = notification.bundleId;
    
    const result = await this.apnProvider.send(note, deviceToken);
    
    if (result.failed.length > 0) {
      throw new Error(`APNS failed: ${result.failed[0].response.reason}`);
    }
    
    return result;
  }
  
  async sendFCM(registrationToken, notification) {
    const message = {
      notification: {
        title: notification.title,
        body: notification.body,
        imageUrl: notification.image
      },
      data: notification.data || {},
      token: registrationToken,
      android: {
        notification: {
          icon: notification.icon,
          color: notification.color,
          sound: notification.sound,
          tag: notification.tag
        },
        ttl: (notification.ttl || 24 * 60 * 60) * 1000
      },
      apns: {
        payload: {
          aps: {
            badge: notification.badge,
            sound: notification.sound || 'default'
          }
        }
      }
    };
    
    return await admin.messaging().send(message);
  }
  
  async sendBulkNotification(userIds, notification) {
    const results = [];
    
    // 批量发送,避免并发过高
    const batchSize = 100;
    for (let i = 0; i < userIds.length; i += batchSize) {
      const batch = userIds.slice(i, i + batchSize);
      
      const batchPromises = batch.map(userId => 
        this.sendNotification(userId, notification)
          .catch(error => ({ userId, error: error.message }))
      );
      
      const batchResults = await Promise.all(batchPromises);
      results.push(...batchResults);
      
      // 短暂延迟避免过载
      if (i + batchSize < userIds.length) {
        await new Promise(resolve => setTimeout(resolve, 100));
      }
    }
    
    return results;
  }
  
  async saveNotificationHistory(userId, notification, results) {
    const historyEntry = {
      userId,
      notification,
      results,
      timestamp: new Date(),
      success: results.some(r => r.success)
    };
    
    await this.redis.zadd(
      `notifications:history:${userId}`,
      Date.now(),
      JSON.stringify(historyEntry)
    );
    
    // 保留最近100条记录
    await this.redis.zremrangebyrank(
      `notifications:history:${userId}`,
      0,
      -101
    );
  }
  
  async getNotificationHistory(userId, limit = 50) {
    const history = await this.redis.zrevrange(
      `notifications:history:${userId}`,
      0,
      limit - 1
    );
    
    return history.map(entry => JSON.parse(entry));
  }
  
  async getSubscriptionStats() {
    const stats = {
      total: this.subscriptions.size,
      webPush: 0,
      apns: 0,
      fcm: 0
    };
    
    for (const subscription of this.subscriptions.values()) {
      if (subscription.webPush) stats.webPush++;
      if (subscription.apns) stats.apns++;
      if (subscription.fcm) stats.fcm++;
    }
    
    return stats;
  }
}

module.exports = NotificationManager;

2. 推送通知客户端

javascript
// push-notification-client.js
class PushNotificationClient {
  constructor(options = {}) {
    this.vapidPublicKey = options.vapidPublicKey;
    this.serviceWorkerPath = options.serviceWorkerPath || '/sw.js';
    this.isSupported = 'serviceWorker' in navigator && 'PushManager' in window;
    
    this.subscription = null;
    this.registration = null;
    
    if (this.isSupported) {
      this.init();
    }
  }
  
  async init() {
    try {
      // 注册 Service Worker
      this.registration = await navigator.serviceWorker.register(this.serviceWorkerPath);
      console.log('Service Worker registered');
      
      // 检查现有订阅
      this.subscription = await this.registration.pushManager.getSubscription();
      
      if (this.subscription) {
        console.log('Existing push subscription found');
        await this.sendSubscriptionToServer(this.subscription);
      }
      
    } catch (error) {
      console.error('Service Worker registration failed:', error);
    }
  }
  
  async requestPermission() {
    if (!this.isSupported) {
      throw new Error('Push notifications not supported');
    }
    
    const permission = await Notification.requestPermission();
    
    if (permission !== 'granted') {
      throw new Error('Push notification permission denied');
    }
    
    return permission;
  }
  
  async subscribe() {
    if (!this.isSupported) {
      throw new Error('Push notifications not supported');
    }
    
    if (!this.registration) {
      throw new Error('Service Worker not registered');
    }
    
    // 请求权限
    await this.requestPermission();
    
    // 创建订阅
    this.subscription = await this.registration.pushManager.subscribe({
      userVisibleOnly: true,
      applicationServerKey: this.urlBase64ToUint8Array(this.vapidPublicKey)
    });
    
    console.log('Push subscription created');
    
    // 发送订阅信息到服务器
    await this.sendSubscriptionToServer(this.subscription);
    
    return this.subscription;
  }
  
  async unsubscribe() {
    if (this.subscription) {
      await this.subscription.unsubscribe();
      await this.removeSubscriptionFromServer();
      this.subscription = null;
      console.log('Push subscription removed');
    }
  }
  
  async sendSubscriptionToServer(subscription) {
    try {
      const response = await fetch('/api/push/subscribe', {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'Authorization': `Bearer ${this.getAuthToken()}`
        },
        body: JSON.stringify(subscription)
      });
      
      if (!response.ok) {
        throw new Error('Failed to send subscription to server');
      }
      
      console.log('Subscription sent to server');
    } catch (error) {
      console.error('Error sending subscription to server:', error);
    }
  }
  
  async removeSubscriptionFromServer() {
    try {
      const response = await fetch('/api/push/unsubscribe', {
        method: 'POST',
        headers: {
          'Authorization': `Bearer ${this.getAuthToken()}`
        }
      });
      
      if (!response.ok) {
        throw new Error('Failed to remove subscription from server');
      }
      
      console.log('Subscription removed from server');
    } catch (error) {
      console.error('Error removing subscription from server:', error);
    }
  }
  
  urlBase64ToUint8Array(base64String) {
    const padding = '='.repeat((4 - base64String.length % 4) % 4);
    const base64 = (base64String + padding)
      .replace(/-/g, '+')
      .replace(/_/g, '/');
    
    const rawData = window.atob(base64);
    const outputArray = new Uint8Array(rawData.length);
    
    for (let i = 0; i < rawData.length; ++i) {
      outputArray[i] = rawData.charCodeAt(i);
    }
    
    return outputArray;
  }
  
  getAuthToken() {
    return localStorage.getItem('auth_token') || '';
  }
  
  isSubscribed() {
    return !!this.subscription;
  }
  
  getSubscription() {
    return this.subscription;
  }
  
  // 显示本地通知
  showLocalNotification(title, options = {}) {
    if (!this.isSupported) {
      console.warn('Notifications not supported');
      return;
    }
    
    if (Notification.permission === 'granted') {
      const notification = new Notification(title, {
        icon: options.icon || '/icon-192x192.png',
        badge: options.badge || '/badge-72x72.png',
        body: options.body,
        data: options.data,
        tag: options.tag,
        requireInteraction: options.requireInteraction || false,
        actions: options.actions || []
      });
      
      // 设置点击事件
      notification.onclick = (event) => {
        event.preventDefault();
        
        if (options.onClick) {
          options.onClick(event);
        } else if (options.url) {
          window.open(options.url, '_blank');
        }
        
        notification.close();
      };
      
      // 自动关闭
      if (options.autoClose) {
        setTimeout(() => {
          notification.close();
        }, options.autoClose);
      }
      
      return notification;
    }
  }
}

// Service Worker 代码 (sw.js)
const SW_CODE = `
self.addEventListener('push', function(event) {
  if (event.data) {
    const data = event.data.json();
    
    const options = {
      body: data.body,
      icon: data.icon || '/icon-192x192.png',
      badge: data.badge || '/badge-72x72.png',
      data: data.data || {},
      actions: data.actions || [],
      tag: data.tag,
      requireInteraction: data.requireInteraction || false
    };
    
    event.waitUntil(
      self.registration.showNotification(data.title, options)
    );
  }
});

self.addEventListener('notificationclick', function(event) {
  event.notification.close();
  
  const data = event.notification.data;
  
  if (event.action) {
    // 处理操作按钮点击
    console.log('Action clicked:', event.action);
  } else {
    // 处理通知点击
    if (data.url) {
      event.waitUntil(
        clients.openWindow(data.url)
      );
    }
  }
});

self.addEventListener('notificationclose', function(event) {
  console.log('Notification closed:', event.notification.tag);
});
`;

module.exports = { PushNotificationClient, SW_CODE };

性能优化

1. 连接池管理

javascript
// connection-pool-manager.js
class ConnectionPoolManager {
  constructor(options = {}) {
    this.maxConnections = options.maxConnections || 10000;
    this.connectionTimeout = options.connectionTimeout || 30000;
    this.heartbeatInterval = options.heartbeatInterval || 25000;
    
    this.connections = new Map();
    this.connectionsByUser = new Map();
    this.connectionStats = {
      total: 0,
      active: 0,
      idle: 0,
      peak: 0
    };
    
    this.startCleanupTimer();
    this.startStatsTimer();
  }
  
  addConnection(socket) {
    const connectionId = socket.id;
    const userId = socket.userId;
    
    // 检查连接数限制
    if (this.connections.size >= this.maxConnections) {
      socket.emit('error', { message: 'Server at capacity' });
      socket.disconnect(true);
      return false;
    }
    
    const connection = {
      id: connectionId,
      socket,
      userId,
      connectedAt: Date.now(),
      lastActivity: Date.now(),
      isActive: true,
      rooms: new Set(),
      messageCount: 0,
      bytesReceived: 0,
      bytesSent: 0
    };
    
    this.connections.set(connectionId, connection);
    
    // 用户连接映射
    if (!this.connectionsByUser.has(userId)) {
      this.connectionsByUser.set(userId, new Set());
    }
    this.connectionsByUser.get(userId).add(connectionId);
    
    // 更新统计
    this.connectionStats.total++;
    this.connectionStats.active++;
    
    if (this.connectionStats.active > this.connectionStats.peak) {
      this.connectionStats.peak = this.connectionStats.active;
    }
    
    console.log(`Connection added: ${connectionId} (User: ${userId})`);
    return true;
  }
  
  removeConnection(connectionId) {
    const connection = this.connections.get(connectionId);
    if (!connection) return;
    
    const userId = connection.userId;
    
    // 从用户连接映射中移除
    if (this.connectionsByUser.has(userId)) {
      this.connectionsByUser.get(userId).delete(connectionId);
      
      if (this.connectionsByUser.get(userId).size === 0) {
        this.connectionsByUser.delete(userId);
      }
    }
    
    this.connections.delete(connectionId);
    
    // 更新统计
    this.connectionStats.active--;
    
    console.log(`Connection removed: ${connectionId} (User: ${userId})`);
  }
  
  updateActivity(connectionId, bytesReceived = 0, bytesSent = 0) {
    const connection = this.connections.get(connectionId);
    if (connection) {
      connection.lastActivity = Date.now();
      connection.bytesReceived += bytesReceived;
      connection.bytesSent += bytesSent;
      connection.messageCount++;
    }
  }
  
  getUserConnections(userId) {
    const connectionIds = this.connectionsByUser.get(userId);
    if (!connectionIds) return [];
    
    return Array.from(connectionIds)
      .map(id => this.connections.get(id))
      .filter(conn => conn && conn.isActive);
  }
  
  broadcastToUser(userId, event, data) {
    const connections = this.getUserConnections(userId);
    connections.forEach(connection => {
      if (connection.socket.connected) {
        connection.socket.emit(event, data);
        this.updateActivity(connection.id, 0, JSON.stringify(data).length);
      }
    });
  }
  
  startCleanupTimer() {
    setInterval(() => {
      this.cleanupInactiveConnections();
    }, 60000); // 每分钟清理一次
  }
  
  cleanupInactiveConnections() {
    const now = Date.now();
    const inactiveConnections = [];
    
    for (const [connectionId, connection] of this.connections) {
      const inactiveTime = now - connection.lastActivity;
      
      if (inactiveTime > this.connectionTimeout) {
        inactiveConnections.push(connectionId);
      }
    }
    
    inactiveConnections.forEach(connectionId => {
      const connection = this.connections.get(connectionId);
      if (connection && connection.socket.connected) {
        console.log(`Disconnecting inactive connection: ${connectionId}`);
        connection.socket.disconnect(true);
      }
    });
  }
  
  startStatsTimer() {
    setInterval(() => {
      this.updateStats();
    }, 10000); // 每10秒更新统计
  }
  
  updateStats() {
    let activeCount = 0;
    let idleCount = 0;
    
    for (const connection of this.connections.values()) {
      const inactiveTime = Date.now() - connection.lastActivity;
      
      if (inactiveTime < 30000) { // 30秒内有活动
        activeCount++;
      } else {
        idleCount++;
      }
    }
    
    this.connectionStats.active = activeCount;
    this.connectionStats.idle = idleCount;
  }
  
  getStats() {
    return {
      ...this.connectionStats,
      current: this.connections.size,
      users: this.connectionsByUser.size,
      avgConnectionsPerUser: this.connectionsByUser.size > 0 
        ? this.connections.size / this.connectionsByUser.size 
        : 0
    };
  }
  
  getDetailedStats() {
    const stats = this.getStats();
    const connectionDetails = [];
    
    for (const connection of this.connections.values()) {
      connectionDetails.push({
        id: connection.id,
        userId: connection.userId,
        connectedAt: connection.connectedAt,
        lastActivity: connection.lastActivity,
        messageCount: connection.messageCount,
        bytesReceived: connection.bytesReceived,
        bytesSent: connection.bytesSent,
        rooms: Array.from(connection.rooms)
      });
    }
    
    return {
      ...stats,
      connections: connectionDetails
    };
  }
}

module.exports = ConnectionPoolManager;

2. 消息队列优化

javascript
// message-queue-optimizer.js
const Redis = require('ioredis');
const { Worker } = require('worker_threads');

class MessageQueueOptimizer {
  constructor(options = {}) {
    this.redis = new Redis({
      host: options.redisHost || 'localhost',
      port: options.redisPort || 6379
    });
    
    this.batchSize = options.batchSize || 100;
    this.batchTimeout = options.batchTimeout || 1000;
    this.maxRetries = options.maxRetries || 3;
    this.workerCount = options.workerCount || 4;
    
    this.messageQueue = [];
    this.batchTimer = null;
    this.workers = [];
    
    this.setupWorkers();
    this.startBatchProcessor();
  }
  
  setupWorkers() {
    for (let i = 0; i < this.workerCount; i++) {
      const worker = new Worker(`
        const { parentPort } = require('worker_threads');
        const Redis = require('ioredis');
        
        const redis = new Redis({
          host: '${this.redis.options.host}',
          port: ${this.redis.options.port}
        });
        
        parentPort.on('message', async (batch) => {
          try {
            const results = [];
            
            for (const message of batch) {
              const result = await processMessage(message);
              results.push(result);
            }
            
            parentPort.postMessage({ success: true, results });
          } catch (error) {
            parentPort.postMessage({ success: false, error: error.message });
          }
        });
        
        async function processMessage(message) {
          // 处理消息逻辑
          switch (message.type) {
            case 'broadcast':
              return await processBroadcast(message);
            case 'notification':
              return await processNotification(message);
            case 'analytics':
              return await processAnalytics(message);
            default:
              throw new Error('Unknown message type');
          }
        }
        
        async function processBroadcast(message) {
          // 广播消息处理
          await redis.publish(message.channel, JSON.stringify(message.data));
          return { messageId: message.id, status: 'broadcasted' };
        }
        
        async function processNotification(message) {
          // 通知处理
          await redis.zadd(
            'notifications:pending',
            Date.now(),
            JSON.stringify(message.data)
          );
          return { messageId: message.id, status: 'queued' };
        }
        
        async function processAnalytics(message) {
          // 分析数据处理
          await redis.hincrby('analytics:events', message.data.event, 1);
          return { messageId: message.id, status: 'recorded' };
        }
      `, { eval: true });
      
      worker.on('message', (result) => {
        if (result.success) {
          console.log('Batch processed successfully:', result.results.length);
        } else {
          console.error('Batch processing failed:', result.error);
        }
      });
      
      worker.on('error', (error) => {
        console.error('Worker error:', error);
      });
      
      this.workers.push(worker);
    }
  }
  
  queueMessage(message) {
    const queuedMessage = {
      id: this.generateMessageId(),
      ...message,
      queuedAt: Date.now(),
      retries: 0
    };
    
    this.messageQueue.push(queuedMessage);
    
    // 如果队列达到批处理大小,立即处理
    if (this.messageQueue.length >= this.batchSize) {
      this.processBatch();
    }
  }
  
  startBatchProcessor() {
    this.batchTimer = setInterval(() => {
      if (this.messageQueue.length > 0) {
        this.processBatch();
      }
    }, this.batchTimeout);
  }
  
  processBatch() {
    if (this.messageQueue.length === 0) return;
    
    const batch = this.messageQueue.splice(0, this.batchSize);
    const availableWorker = this.getAvailableWorker();
    
    if (availableWorker) {
      availableWorker.postMessage(batch);
    } else {
      // 如果没有可用的工作线程,重新加入队列
      this.messageQueue.unshift(...batch);
    }
  }
  
  getAvailableWorker() {
    // 简单的轮询策略
    return this.workers[Math.floor(Math.random() * this.workers.length)];
  }
  
  // 优先级队列
  queuePriorityMessage(message, priority = 'normal') {
    const queuedMessage = {
      id: this.generateMessageId(),
      ...message,
      priority,
      queuedAt: Date.now(),
      retries: 0
    };
    
    // 根据优先级插入到队列中的适当位置
    const priorityOrder = { 'high': 0, 'normal': 1, 'low': 2 };
    const messagePriority = priorityOrder[priority] || 1;
    
    let insertIndex = this.messageQueue.length;
    for (let i = 0; i < this.messageQueue.length; i++) {
      const existingPriority = priorityOrder[this.messageQueue[i].priority] || 1;
      if (messagePriority < existingPriority) {
        insertIndex = i;
        break;
      }
    }
    
    this.messageQueue.splice(insertIndex, 0, queuedMessage);
  }
  
  // 延迟消息
  queueDelayedMessage(message, delay) {
    setTimeout(() => {
      this.queueMessage(message);
    }, delay);
  }
  
  // 定时消息
  queueScheduledMessage(message, scheduleTime) {
    const delay = scheduleTime - Date.now();
    if (delay > 0) {
      this.queueDelayedMessage(message, delay);
    } else {
      this.queueMessage(message);
    }
  }
  
  generateMessageId() {
    return `msg_${Date.now()}_${Math.random().toString(36).substring(2)}`;
  }
  
  getQueueStats() {
    return {
      queueLength: this.messageQueue.length,
      workerCount: this.workers.length,
      batchSize: this.batchSize,
      batchTimeout: this.batchTimeout
    };
  }
  
  async shutdown() {
    // 清理定时器
    if (this.batchTimer) {
      clearInterval(this.batchTimer);
    }
    
    // 处理剩余消息
    if (this.messageQueue.length > 0) {
      console.log(`Processing remaining ${this.messageQueue.length} messages...`);
      while (this.messageQueue.length > 0) {
        this.processBatch();
        await new Promise(resolve => setTimeout(resolve, 100));
      }
    }
    
    // 终止工作线程
    await Promise.all(this.workers.map(worker => worker.terminate()));
    
    // 关闭 Redis 连接
    await this.redis.quit();
  }
}

module.exports = MessageQueueOptimizer;

3. 内存优化

javascript
// memory-optimizer.js
class MemoryOptimizer {
  constructor(options = {}) {
    this.maxMemoryUsage = options.maxMemoryUsage || 1024 * 1024 * 1024; // 1GB
    this.gcThreshold = options.gcThreshold || 0.8; // 80%
    this.cacheSize = options.cacheSize || 10000;
    
    this.messageCache = new Map();
    this.userCache = new Map();
    this.roomCache = new Map();
    
    this.startMemoryMonitoring();
  }
  
  startMemoryMonitoring() {
    setInterval(() => {
      this.checkMemoryUsage();
    }, 30000); // 每30秒检查一次
  }
  
  checkMemoryUsage() {
    const memUsage = process.memoryUsage();
    const usageRatio = memUsage.heapUsed / this.maxMemoryUsage;
    
    console.log('Memory usage:', {
      heapUsed: Math.round(memUsage.heapUsed / 1024 / 1024) + 'MB',
      heapTotal: Math.round(memUsage.heapTotal / 1024 / 1024) + 'MB',
      external: Math.round(memUsage.external / 1024 / 1024) + 'MB',
      usageRatio: Math.round(usageRatio * 100) + '%'
    });
    
    if (usageRatio > this.gcThreshold) {
      console.log('Memory usage high, triggering cleanup...');
      this.performCleanup();
    }
  }
  
  performCleanup() {
    // 清理消息缓存
    this.cleanupCache(this.messageCache, 'messages');
    
    // 清理用户缓存
    this.cleanupCache(this.userCache, 'users');
    
    // 清理房间缓存
    this.cleanupCache(this.roomCache, 'rooms');
    
    // 强制垃圾回收
    if (global.gc) {
      global.gc();
      console.log('Garbage collection triggered');
    }
  }
  
  cleanupCache(cache, type) {
    const initialSize = cache.size;
    
    if (cache.size > this.cacheSize) {
      // 删除最旧的条目
      const entries = Array.from(cache.entries());
      const toDelete = entries
        .sort((a, b) => (a[1].lastAccessed || 0) - (b[1].lastAccessed || 0))
        .slice(0, Math.floor(cache.size * 0.3)); // 删除30%最旧的条目
      
      toDelete.forEach(([key]) => cache.delete(key));
      
      console.log(`Cleaned up ${type} cache: ${initialSize} -> ${cache.size}`);
    }
  }
  
  // 智能缓存管理
  setCache(cache, key, value, ttl = 300000) { // 默认5分钟TTL
    const entry = {
      value,
      createdAt: Date.now(),
      lastAccessed: Date.now(),
      ttl,
      accessCount: 0
    };
    
    cache.set(key, entry);
    
    // 如果缓存过大,触发清理
    if (cache.size > this.cacheSize * 1.2) {
      this.cleanupCache(cache, 'cache');
    }
  }
  
  getCache(cache, key) {
    const entry = cache.get(key);
    
    if (!entry) {
      return null;
    }
    
    // 检查TTL
    if (Date.now() - entry.createdAt > entry.ttl) {
      cache.delete(key);
      return null;
    }
    
    // 更新访问信息
    entry.lastAccessed = Date.now();
    entry.accessCount++;
    
    return entry.value;
  }
  
  // 对象池管理
  createObjectPool(createFn, resetFn, initialSize = 10) {
    const pool = [];
    
    // 预创建对象
    for (let i = 0; i < initialSize; i++) {
      pool.push(createFn());
    }
    
    return {
      acquire() {
        return pool.length > 0 ? pool.pop() : createFn();
      },
      
      release(obj) {
        if (pool.length < initialSize * 2) {
          resetFn(obj);
          pool.push(obj);
        }
      },
      
      size() {
        return pool.length;
      }
    };
  }
  
  // 字符串池
  createStringPool() {
    const stringPool = new Map();
    
    return {
      intern(str) {
        if (stringPool.has(str)) {
          return stringPool.get(str);
        }
        
        stringPool.set(str, str);
        return str;
      },
      
      size() {
        return stringPool.size;
      },
      
      clear() {
        stringPool.clear();
      }
    };
  }
  
  // 缓冲区池
  createBufferPool(bufferSize = 1024, poolSize = 100) {
    const buffers = [];
    
    for (let i = 0; i < poolSize; i++) {
      buffers.push(Buffer.allocUnsafe(bufferSize));
    }
    
    return {
      acquire() {
        return buffers.length > 0 ? buffers.pop() : Buffer.allocUnsafe(bufferSize);
      },
      
      release(buffer) {
        if (buffers.length < poolSize && buffer.length === bufferSize) {
          buffer.fill(0); // 清零
          buffers.push(buffer);
        }
      },
      
      size() {
        return buffers.length;
      }
    };
  }
  
  getMemoryStats() {
    const memUsage = process.memoryUsage();
    
    return {
      memory: {
        heapUsed: memUsage.heapUsed,
        heapTotal: memUsage.heapTotal,
        external: memUsage.external,
        rss: memUsage.rss
      },
      caches: {
        messages: this.messageCache.size,
        users: this.userCache.size,
        rooms: this.roomCache.size
      },
      usage: {
        ratio: memUsage.heapUsed / this.maxMemoryUsage,
        threshold: this.gcThreshold
      }
    };
  }
}

module.exports = MemoryOptimizer;

最佳实践

1. 安全最佳实践

javascript
// security-best-practices.js
const rateLimit = require('express-rate-limit');
const helmet = require('helmet');
const validator = require('validator');
const xss = require('xss');

class WebSocketSecurity {
  constructor() {
    this.rateLimiters = new Map();
    this.blockedIPs = new Set();
    this.suspiciousPatterns = [
      /<script[^>]*>.*?<\/script>/gi,
      /javascript:/gi,
      /on\w+\s*=/gi,
      /eval\s*\(/gi
    ];
  }
  
  // 输入验证和清理
  validateAndSanitizeInput(data) {
    const errors = [];
    const sanitized = {};
    
    for (const [key, value] of Object.entries(data)) {
      try {
        // 基本类型检查
        if (typeof value !== 'string' && typeof value !== 'number' && typeof value !== 'boolean') {
          if (value !== null && value !== undefined) {
            errors.push(`Invalid type for field ${key}`);
            continue;
          }
        }
        
        if (typeof value === 'string') {
          // 长度检查
          if (value.length > 10000) {
            errors.push(`Field ${key} too long`);
            continue;
          }
          
          // XSS 检查
          if (this.containsSuspiciousContent(value)) {
            errors.push(`Field ${key} contains suspicious content`);
            continue;
          }
          
          // 清理 HTML
          sanitized[key] = xss(value, {
            whiteList: {}, // 不允许任何 HTML 标签
            stripIgnoreTag: true,
            stripIgnoreTagBody: ['script']
          });
        } else {
          sanitized[key] = value;
        }
      } catch (error) {
        errors.push(`Validation error for field ${key}: ${error.message}`);
      }
    }
    
    return { sanitized, errors };
  }
  
  containsSuspiciousContent(content) {
    return this.suspiciousPatterns.some(pattern => pattern.test(content));
  }
  
  // 速率限制
  createRateLimiter(key, options = {}) {
    const limiter = {
      windowMs: options.windowMs || 60000, // 1分钟
      maxRequests: options.maxRequests || 100,
      requests: new Map()
    };
    
    this.rateLimiters.set(key, limiter);
    return limiter;
  }
  
  checkRateLimit(limitKey, identifier) {
    const limiter = this.rateLimiters.get(limitKey);
    if (!limiter) return true;
    
    const now = Date.now();
    const userRequests = limiter.requests.get(identifier) || [];
    
    // 清理过期请求
    const validRequests = userRequests.filter(
      timestamp => now - timestamp < limiter.windowMs
    );
    
    if (validRequests.length >= limiter.maxRequests) {
      return false;
    }
    
    validRequests.push(now);
    limiter.requests.set(identifier, validRequests);
    
    return true;
  }
  
  // IP 封禁管理
  blockIP(ip, duration = 3600000) { // 默认1小时
    this.blockedIPs.add(ip);
    
    setTimeout(() => {
      this.blockedIPs.delete(ip);
      console.log(`IP ${ip} unblocked`);
    }, duration);
    
    console.log(`IP ${ip} blocked for ${duration}ms`);
  }
  
  isIPBlocked(ip) {
    return this.blockedIPs.has(ip);
  }
  
  // 消息内容过滤
  filterMessage(content) {
    // 敏感词过滤
    const sensitiveWords = ['spam', 'hack', 'exploit']; // 实际应用中应该从配置文件加载
    
    let filtered = content;
    sensitiveWords.forEach(word => {
      const regex = new RegExp(word, 'gi');
      filtered = filtered.replace(regex, '*'.repeat(word.length));
    });
    
    return filtered;
  }
  
  // 连接安全检查
  validateConnection(socket, token) {
    const ip = socket.handshake.address;
    
    // 检查 IP 是否被封禁
    if (this.isIPBlocked(ip)) {
      throw new Error('IP blocked');
    }
    
    // 检查连接速率
    if (!this.checkRateLimit('connection', ip)) {
      throw new Error('Connection rate limit exceeded');
    }
    
    // 验证 token
    if (!token || !this.validateToken(token)) {
      throw new Error('Invalid token');
    }
    
    return true;
  }
  
  validateToken(token) {
    try {
      // 这里应该实现实际的 token 验证逻辑
      const jwt = require('jsonwebtoken');
      const decoded = jwt.verify(token, process.env.JWT_SECRET);
      return !!decoded;
    } catch (error) {
      return false;
    }
  }
  
  // 安全中间件
  createSecurityMiddleware() {
    return [
      helmet({
        contentSecurityPolicy: {
          directives: {
            defaultSrc: ["'self'"],
            scriptSrc: ["'self'", "'unsafe-inline'"],
            styleSrc: ["'self'", "'unsafe-inline'"],
            imgSrc: ["'self'", "data:", "https:"],
            connectSrc: ["'self'", "ws:", "wss:"]
          }
        }
      }),
      
      rateLimit({
        windowMs: 15 * 60 * 1000, // 15分钟
        max: 1000, // 限制每个IP 15分钟内最多1000个请求
        message: 'Too many requests from this IP'
      })
    ];
  }
}

module.exports = WebSocketSecurity;

2. 监控和日志

javascript
// monitoring-and-logging.js
const winston = require('winston');
const prometheus = require('prom-client');

class WebSocketMonitoring {
  constructor() {
    this.setupLogger();
    this.setupMetrics();
    this.startMetricsCollection();
  }
  
  setupLogger() {
    this.logger = winston.createLogger({
      level: 'info',
      format: winston.format.combine(
        winston.format.timestamp(),
        winston.format.errors({ stack: true }),
        winston.format.json()
      ),
      defaultMeta: { service: 'websocket-server' },
      transports: [
        new winston.transports.File({ 
          filename: 'logs/websocket-error.log', 
          level: 'error' 
        }),
        new winston.transports.File({ 
          filename: 'logs/websocket-combined.log' 
        }),
        new winston.transports.Console({
          format: winston.format.simple()
        })
      ]
    });
  }
  
  setupMetrics() {
    // 连接数指标
    this.connectionGauge = new prometheus.Gauge({
      name: 'websocket_connections_total',
      help: 'Total number of WebSocket connections',
      labelNames: ['status']
    });
    
    // 消息数指标
    this.messageCounter = new prometheus.Counter({
      name: 'websocket_messages_total',
      help: 'Total number of WebSocket messages',
      labelNames: ['type', 'status']
    });
    
    // 响应时间指标
    this.responseTimeHistogram = new prometheus.Histogram({
      name: 'websocket_response_time_seconds',
      help: 'WebSocket response time in seconds',
      labelNames: ['operation'],
      buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5]
    });
    
    // 错误数指标
    this.errorCounter = new prometheus.Counter({
      name: 'websocket_errors_total',
      help: 'Total number of WebSocket errors',
      labelNames: ['type', 'code']
    });
    
    // 内存使用指标
    this.memoryGauge = new prometheus.Gauge({
      name: 'websocket_memory_usage_bytes',
      help: 'Memory usage in bytes',
      labelNames: ['type']
    });
  }
  
  startMetricsCollection() {
    // 收集默认指标
    prometheus.collectDefaultMetrics({ prefix: 'websocket_' });
    
    // 定期更新内存指标
    setInterval(() => {
      const memUsage = process.memoryUsage();
      this.memoryGauge.set({ type: 'heap_used' }, memUsage.heapUsed);
      this.memoryGauge.set({ type: 'heap_total' }, memUsage.heapTotal);
      this.memoryGauge.set({ type: 'external' }, memUsage.external);
      this.memoryGauge.set({ type: 'rss' }, memUsage.rss);
    }, 10000);
  }
  
  // 记录连接事件
  logConnection(event, data) {
    this.logger.info('Connection event', {
      event,
      ...data,
      timestamp: new Date().toISOString()
    });
    
    if (event === 'connected') {
      this.connectionGauge.inc({ status: 'active' });
    } else if (event === 'disconnected') {
      this.connectionGauge.dec({ status: 'active' });
    }
  }
  
  // 记录消息事件
  logMessage(type, status, data = {}) {
    this.logger.info('Message event', {
      type,
      status,
      ...data,
      timestamp: new Date().toISOString()
    });
    
    this.messageCounter.inc({ type, status });
  }
  
  // 记录错误
  logError(error, context = {}) {
    this.logger.error('WebSocket error', {
      error: error.message,
      stack: error.stack,
      ...context,
      timestamp: new Date().toISOString()
    });
    
    this.errorCounter.inc({ 
      type: error.name || 'UnknownError',
      code: error.code || 'unknown'
    });
  }
  
  // 性能监控
  measurePerformance(operation, fn) {
    const startTime = Date.now();
    const timer = this.responseTimeHistogram.startTimer({ operation });
    
    try {
      const result = fn();
      
      if (result && typeof result.then === 'function') {
        // 异步操作
        return result
          .then(res => {
            timer();
            this.logPerformance(operation, Date.now() - startTime, true);
            return res;
          })
          .catch(err => {
            timer();
            this.logPerformance(operation, Date.now() - startTime, false);
            throw err;
          });
      } else {
        // 同步操作
        timer();
        this.logPerformance(operation, Date.now() - startTime, true);
        return result;
      }
    } catch (error) {
      timer();
      this.logPerformance(operation, Date.now() - startTime, false);
      throw error;
    }
  }
  
  logPerformance(operation, duration, success) {
    this.logger.info('Performance metric', {
      operation,
      duration,
      success,
      timestamp: new Date().toISOString()
    });
  }
  
  // 健康检查
  getHealthStatus() {
    const memUsage = process.memoryUsage();
    const uptime = process.uptime();
    
    return {
      status: 'healthy',
      uptime,
      memory: {
        heapUsed: Math.round(memUsage.heapUsed / 1024 / 1024) + 'MB',
        heapTotal: Math.round(memUsage.heapTotal / 1024 / 1024) + 'MB',
        external: Math.round(memUsage.external / 1024 / 1024) + 'MB',
        rss: Math.round(memUsage.rss / 1024 / 1024) + 'MB'
      },
      timestamp: new Date().toISOString()
    };
  }
  
  // 获取指标
  async getMetrics() {
    return await prometheus.register.metrics();
  }
  
  // 创建监控中间件
  createMonitoringMiddleware() {
    return (req, res, next) => {
      const startTime = Date.now();
      
      res.on('finish', () => {
        const duration = Date.now() - startTime;
        this.logger.info('HTTP request', {
          method: req.method,
          url: req.url,
          statusCode: res.statusCode,
          duration,
          userAgent: req.get('User-Agent'),
          ip: req.ip,
          timestamp: new Date().toISOString()
        });
      });
      
      next();
    };
  }
}

module.exports = WebSocketMonitoring;

3. 部署配置

javascript
// deployment-config.js
const cluster = require('cluster');
const os = require('os');

class WebSocketDeployment {
  constructor(options = {}) {
    this.numWorkers = options.numWorkers || os.cpus().length;
    this.port = options.port || process.env.PORT || 3000;
    this.environment = options.environment || process.env.NODE_ENV || 'development';
    
    this.setupCluster();
  }
  
  setupCluster() {
    if (cluster.isMaster) {
      this.setupMaster();
    } else {
      this.setupWorker();
    }
  }
  
  setupMaster() {
    console.log(`Master ${process.pid} is running`);
    console.log(`Starting ${this.numWorkers} workers...`);
    
    // 创建工作进程
    for (let i = 0; i < this.numWorkers; i++) {
      this.forkWorker();
    }
    
    // 监听工作进程退出
    cluster.on('exit', (worker, code, signal) => {
      console.log(`Worker ${worker.process.pid} died with code ${code} and signal ${signal}`);
      console.log('Starting a new worker...');
      this.forkWorker();
    });
    
    // 优雅关闭
    process.on('SIGTERM', () => {
      console.log('Master received SIGTERM, shutting down gracefully...');
      this.gracefulShutdown();
    });
    
    process.on('SIGINT', () => {
      console.log('Master received SIGINT, shutting down gracefully...');
      this.gracefulShutdown();
    });
  }
  
  forkWorker() {
    const worker = cluster.fork();
    
    // 设置工作进程超时
    const timeout = setTimeout(() => {
      console.log(`Worker ${worker.process.pid} startup timeout, killing...`);
      worker.kill();
    }, 30000); // 30秒超时
    
    worker.on('listening', () => {
      clearTimeout(timeout);
      console.log(`Worker ${worker.process.pid} is listening on port ${this.port}`);
    });
    
    return worker;
  }
  
  setupWorker() {
    const WebSocketServer = require('./websocket-server');
    
    const server = new WebSocketServer({
      port: this.port,
      environment: this.environment,
      workerId: cluster.worker.id
    });
    
    server.start();
    
    console.log(`Worker ${process.pid} started`);
    
    // 优雅关闭
    process.on('SIGTERM', () => {
      console.log(`Worker ${process.pid} received SIGTERM, shutting down gracefully...`);
      server.gracefulShutdown();
    });
  }
  
  gracefulShutdown() {
    const workers = Object.values(cluster.workers);
    let shutdownCount = 0;
    
    workers.forEach(worker => {
      worker.send('shutdown');
      
      const timeout = setTimeout(() => {
        console.log(`Force killing worker ${worker.process.pid}`);
        worker.kill('SIGKILL');
      }, 10000); // 10秒强制关闭
      
      worker.on('disconnect', () => {
        clearTimeout(timeout);
        shutdownCount++;
        
        if (shutdownCount === workers.length) {
          console.log('All workers shut down, exiting master...');
          process.exit(0);
        }
      });
    });
  }
  
  // Docker 配置
  static getDockerConfig() {
    return `
# Dockerfile
FROM node:18-alpine

# 设置工作目录
WORKDIR /app

# 复制 package.json 和 package-lock.json
COPY package*.json ./

# 安装依赖
RUN npm ci --only=production

# 复制应用代码
COPY . .

# 创建非 root 用户
RUN addgroup -g 1001 -S nodejs
RUN adduser -S websocket -u 1001

# 更改文件所有权
RUN chown -R websocket:nodejs /app
USER websocket

# 暴露端口
EXPOSE 3000

# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
  CMD curl -f http://localhost:3000/health || exit 1

# 启动应用
CMD ["node", "server.js"]
`;
  }
  
  // Docker Compose 配置
  static getDockerComposeConfig() {
    return `
# docker-compose.yml
version: '3.8'

services:
  websocket-server:
    build: .
    ports:
      - "3000:3000"
    environment:
      - NODE_ENV=production
      - REDIS_HOST=redis
      - REDIS_PORT=6379
    depends_on:
      - redis
    restart: unless-stopped
    deploy:
      replicas: 3
      resources:
        limits:
          cpus: '0.5'
          memory: 512M
        reservations:
          cpus: '0.25'
          memory: 256M
  
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    restart: unless-stopped
  
  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./ssl:/etc/nginx/ssl
    depends_on:
      - websocket-server
    restart: unless-stopped

volumes:
  redis_data:
`;
  }
  
  // Nginx 配置
  static getNginxConfig() {
    return `
# nginx.conf
events {
    worker_connections 1024;
}

http {
    upstream websocket_backend {
        least_conn;
        server websocket-server:3000;
    }
    
    map $http_upgrade $connection_upgrade {
        default upgrade;
        '' close;
    }
    
    server {
        listen 80;
        server_name your-domain.com;
        
        # 重定向到 HTTPS
        return 301 https://$server_name$request_uri;
    }
    
    server {
        listen 443 ssl http2;
        server_name your-domain.com;
        
        ssl_certificate /etc/nginx/ssl/cert.pem;
        ssl_certificate_key /etc/nginx/ssl/key.pem;
        
        # SSL 配置
        ssl_protocols TLSv1.2 TLSv1.3;
        ssl_ciphers ECDHE-RSA-AES256-GCM-SHA512:DHE-RSA-AES256-GCM-SHA512:ECDHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES256-GCM-SHA384;
        ssl_prefer_server_ciphers off;
        
        # WebSocket 代理
        location /socket.io/ {
            proxy_pass http://websocket_backend;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection $connection_upgrade;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;
            
            # WebSocket 特定配置
            proxy_cache_bypass $http_upgrade;
            proxy_read_timeout 86400;
            proxy_send_timeout 86400;
        }
        
        # 静态文件
        location / {
            proxy_pass http://websocket_backend;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;
        }
        
        # 健康检查
        location /health {
            proxy_pass http://websocket_backend/health;
            access_log off;
        }
    }
}
`;
  }
  
  // Kubernetes 配置
  static getKubernetesConfig() {
    return `
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: websocket-server
  labels:
    app: websocket-server
spec:
  replicas: 3
  selector:
    matchLabels:
      app: websocket-server
  template:
    metadata:
      labels:
        app: websocket-server
    spec:
      containers:
      - name: websocket-server
        image: your-registry/websocket-server:latest
        ports:
        - containerPort: 3000
        env:
        - name: NODE_ENV
          value: "production"
        - name: REDIS_HOST
          value: "redis-service"
        - name: REDIS_PORT
          value: "6379"
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 3000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 3000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: websocket-service
spec:
  selector:
    app: websocket-server
  ports:
  - protocol: TCP
    port: 80
    targetPort: 3000
  type: LoadBalancer
`;
  }
}

module.exports = WebSocketDeployment;

总结

本章详细介绍了 Node.js 中 WebSocket 实时通信的完整实现方案,包括:

核心功能

  • Socket.IO 实现:完整的服务器和客户端实现,支持认证、房间管理、消息处理
  • 原生 WebSocket:高性能的原生 WebSocket 服务器和客户端
  • 实时聊天应用:聊天室管理、文件上传、消息处理
  • 推送通知系统:支持 Web Push、APNS、FCM 多平台推送

性能优化

  • 连接池管理:智能连接管理和资源优化
  • 消息队列优化:批处理、工作线程、优先级队列
  • 内存优化:缓存管理、对象池、垃圾回收优化

最佳实践

  • 安全最佳实践:输入验证、速率限制、IP 封禁、XSS 防护
  • 监控和日志:完整的监控指标、日志记录、性能分析
  • 部署配置:集群部署、Docker 容器化、Kubernetes 编排

关键特性

  • 高并发连接支持
  • 实时双向通信
  • 多房间管理
  • 文件上传处理
  • 推送通知集成
  • 性能监控
  • 安全防护
  • 容器化部署