WebSocket cho phép two-way real-time communication.
python
from fastapi import WebSocket, WebSocketDisconnect
class ConnectionManager:
def __init__(self):
self.active: list[WebSocket] = []
async def connect(self, ws: WebSocket):
await ws.accept(); self.active.append(ws)
def disconnect(self, ws: WebSocket):
self.active.remove(ws)
async def broadcast(self, msg: str):
for ws in self.active:
await ws.send_text(msg)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def ws_endpoint(ws: WebSocket, client_id: str):
await manager.connect(ws)
try:
while True:
data = await ws.receive_text()
await manager.broadcast(f"{client_id}: {data}")
except WebSocketDisconnect:
manager.disconnect(ws)