使用 Python 实现一个简单的在线聊天室类(一文讲透)

使用 Python 实现一个简单的在线聊天室类

快速解决

import socket
import threading

class ChatServer:
    def __init__(self, host='127.0.0.1', port=5555):
        self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 创建 TCP 套接字
        self.server.bind((host, port))  # 绑定地址和端口
        self.clients = []  # 存储客户端连接

    def start(self):
        self.server.listen()  # 开始监听
        print(f"服务器已启动,等待连接在 {self.server.getsockname()}")
        while True:
            client, addr = self.server.accept()  # 接受新连接
            self.clients.append(client)  # 将连接加入列表
            threading.Thread(target=self.handle_client, args=(client,)).start()  # 新线程处理客户端

    def handle_client(self, client):
        while True:
            try:
                message = client.recv(1024).decode('utf-8')  # 接收消息
                if not message:
                    break
                self.broadcast(message, client)  # 广播消息
            except:
                client.close()
                self.clients.remove(client)
                break

    def broadcast(self, message, sender):
        for client in self.clients:
            if client != sender:  # 不向发送者回发消息
                client.sendall(message.encode('utf-8'))  # 发送消息给所有客户端

常用方法

方法名 作用 使用频率 示例代码片段
start() 启动服务器并持续监听 ⭐⭐⭐⭐⭐ ChatServer().start()
handle_client() 处理单个客户端连接 ⭐⭐⭐⭐ Thread(target=self.handle_client, args=(client,))
broadcast() 向所有客户端广播消息 ⭐⭐⭐⭐ self.broadcast(message, client)
recv(1024) 接收客户端消息 ⭐⭐⭐ message = client.recv(1024)
sendall() 发送消息给客户端 ⭐⭐⭐ client.sendall(message.encode())

详细说明

服务器初始化

import socket
class ChatServer:
    def __init__(self, host='127.0.0.1', port=5555):
        self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server.bind((host, port))
        self.clients = []
  • socket.socket() 创建 TCP 套接字
  • bind() 绑定本地地址和端口
  • clients 列表用于存储所有客户端连接

多线程消息处理

def start(self):
    self.server.listen()
    while True:
        client, addr = self.server.accept()
        self.clients.append(client)
        threading.Thread(target=self.handle_client, args=(client,)).start()
  • listen() 开启监听队列
  • accept() 阻塞等待连接
  • 使用 Thread 为每个客户端创建独立线程

消息广播逻辑

def broadcast(self, message, sender):
    for client in self.clients:
        if client != sender:
            client.sendall(message.encode('utf-8'))
  • 遍历所有客户端连接
  • 使用 if client != sender 避免消息回传
  • sendall() 确保完整发送消息

高级技巧

实现消息格式标准化

def handle_client(self, client):
    while True:
        try:
            raw_message = client.recv(1024).decode('utf-8')
            if not raw_message:
                break
            # 解析 JSON 格式消息
            data = json.loads(raw_message)
            formatted_msg = f"{data['username']}: {data['content']}"
            self.broadcast(formatted_msg, client)
        except:
            client.close()
  • 使用 JSON 格式封装用户名和消息内容
  • 增强消息结构化处理能力
  • 适用于需要用户身份识别的场景

客户端连接断开处理

def handle_client(self, client):
    try:
        while True:
            message = client.recv(1024)
            # 处理消息...
    except ConnectionResetError:
        print("客户端强制断开连接")
    finally:
        client.close()
        self.clients.remove(client)
  • 捕获 ConnectionResetError 异常
  • 使用 finally 确保资源释放
  • 支持异常安全的连接管理

常见问题

客户端无法连接

解决方案:检查防火墙设置,确认端口未被占用。使用 netstat -ano 查看端口监听状态。

消息乱序或丢失

解决方案:在 recv() 方法后增加 time.sleep(0.01) 休眠,避免线程争抢资源。使用 1024 字节缓冲区。

服务器高负载问题

解决方案:将 while True 改为 while clients,在 clients 列表为空时主动关闭服务器。

总结

本文通过 Python socket 模块和多线程技术,完整实现了支持多用户并发的在线聊天室核心功能。