WebSocket 是一种全双工通信协议,允许客户端和服务器之间实时地进行数据交换。与 HTTP 协议不同,WebSocket 建立了一个持久的连接,客户端和服务器可以随时向对方发送消息,因此非常适用于实时应用,如在线聊天、通知推送、游戏应用等。
Python 中的 FastAPI 和 WebSockets 库可以轻松实现 WebSocket 服务。下面我们将深入探讨如何使用 WebSocket 构建实时应用。
14.1 安装 WebSocket 库
FastAPI 自带了对 WebSocket 的支持,但为了便于操作,我们可以单独安装 WebSocket 相关的库。安装 websockets 库:
pip install websockets
FastAPI 已经内建了 WebSocket 的支持,所以直接使用 FastAPI 就可以实现 WebSocket 服务。
14.2 使用 FastAPI 和 WebSocket 创建实时聊天应用
下面是使用 FastAPI 和 WebSocket 创建一个简单的实时聊天应用的示例。
14.2.1 创建 WebSocket 服务
在 FastAPI 中,WebSocket 被当作异步路由处理。下面是一个简单的聊天服务,客户端可以连接到服务器,并与其他客户端实时通信。
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
app = FastAPI()
# 用于保存当前连接的 WebSocket 客户端
clients: List[WebSocket] = []
@app.websocket("/ws/chat")
async def chat_websocket(websocket: WebSocket):
# 建立连接时,添加客户端到列表
await websocket.accept()
clients.append(websocket)
try:
while True:
# 接收客户端发送的消息
message = await websocket.receive_text()
# 广播消息给所有连接的客户端
for client in clients:
if client != websocket:
await client.send_text(message)
except WebSocketDisconnect:
# 客户端断开时,移除该客户端
clients.remove(websocket)
14.2.2 启动 WebSocket 服务
启动 FastAPI 服务:
uvicorn main:app --reload
14.2.3 客户端连接
要测试这个聊天应用,你可以使用 JavaScript 中的 WebSocket API 来连接到 WebSocket 服务。例如:
let socket = new WebSocket("ws://127.0.0.1:8000/ws/chat");
socket.onopen = () => {
console.log("WebSocket connection established.");
socket.send("Hello from client!");
};
socket.onmessage = (event) => {
console.log("Received message:", event.data);
};
socket.onclose = () => {
console.log("WebSocket connection closed.");
};
每当一个客户端向 WebSocket 服务器发送消息时,服务器会将该消息广播给所有其他连接的客户端。
14.3 处理 WebSocket 异常
在使用 WebSocket 时,可能会遇到一些常见的异常,如客户端断开连接。我们可以通过 try 和 except 块来处理这些异常,并确保服务器能够继续接受新的连接。
@app.websocket("/ws/chat")
async def chat_websocket(websocket: WebSocket):
await websocket.accept()
clients.append(websocket)
try:
while True:
message = await websocket.receive_text()
for client in clients:
if client != websocket:
await client.send_text(message)
except WebSocketDisconnect:
clients.remove(websocket)
print("A client disconnected")
14.3.1 超时控制
有时 WebSocket 连接可能会因为网络问题或者客户端关闭等原因被断开。可以设定超时时间,在连接超时后自动断开。
import asyncio
@app.websocket("/ws/chat")
async def chat_websocket(websocket: WebSocket):
await websocket.accept()
clients.append(websocket)
try:
while True:
message = await asyncio.wait_for(websocket.receive_text(), timeout=60)
for client in clients:
if client != websocket:
await client.send_text(message)
except asyncio.TimeoutError:
print("WebSocket timeout")
except WebSocketDisconnect:
clients.remove(websocket)
print("A client disconnected")
14.4 扩展 WebSocket 服务
WebSocket 也可以用于多种实时应用,除了聊天功能,还可以用于以下应用场景:
14.4.1 实时通知系统
可以将 WebSocket 用于推送实时通知。例如,创建一个用于推送通知的 WebSocket 服务,每当有新的通知时,服务器就会将通知消息发送给连接的客户端。
@app.websocket("/ws/notifications")
async def notification_websocket(websocket: WebSocket):
await websocket.accept()
while True:
# 模拟发送通知
await websocket.send_text("You have a new notification!")
await asyncio.sleep(10) # 每 10 秒发送一次通知
14.4.2 实时游戏或协作应用
WebSocket 可以用于多玩家实时游戏或协作应用,服务器可以保持连接并同步不同客户端之间的游戏状态或数据。每当某个玩家操作时,服务器会将变化发送给其他玩家,保持实时性。
14.5 使用 WebSocket 实现异步流媒体
WebSocket 也可以用于实时视频流、音频流或其他多媒体内容的传输。虽然 WebSocket 本身不直接支持流媒体传输,但可以利用其全双工特性来实现一个简单的视频聊天应用。例如:
- 客户端通过 WebSocket 将视频流的每一帧发送到服务器。
- 服务器将接收到的视频流转发给其他连接的客户端,达到视频聊天的目的。
@app.websocket("/ws/video_stream")
async def video_stream(websocket: WebSocket):
await websocket.accept()
try:
while True:
frame = await websocket.receive_bytes() # 接收视频帧
for client in clients:
if client != websocket:
await client.send_bytes(frame) # 发送视频帧到其他客户端
except WebSocketDisconnect:
clients.remove(websocket)
14.6 性能优化与高并发处理
WebSocket 服务可能会遇到高并发访问问题,因此需要进行一些性能优化。
14.6.1 连接池与负载均衡
为了处理大量 WebSocket 连接,可以使用连接池技术将连接分配到不同的服务器节点,同时利用负载均衡器(如 Nginx)来分配流量。
14.6.2 消息队列与后台任务
当消息量较大时,可以使用消息队列(如 RabbitMQ 或 Kafka)来缓解 WebSocket 服务器的压力。将消息推送到消息队列中,然后由后台工作进程异步处理,最后再将结果通过 WebSocket 推送到客户端。
总结:
- WebSocket 协议适合实时数据交换,支持全双工通信。
- 使用 FastAPI 和 WebSocket 可以轻松构建高效的实时聊天、通知推送等应用。
- 通过合理的异常处理、超时控制和优化措施,可以构建高性能的 WebSocket 服务。
- WebSocket 还可以扩展到实时游戏、视频流、多媒体应用等复杂场景。
下篇文章我们将探讨如何将 WebSocket 与数据库或其他后端服务结合,提升应用的功能和性能!