WebSocket¶
Django-Bolt provides WebSocket support for real-time bidirectional communication.
Basic WebSocket endpoint¶
from django_bolt import BoltAPI, WebSocket
api = BoltAPI()
@api.websocket("/ws/echo")
async def echo(websocket: WebSocket):
await websocket.accept()
async for message in websocket.iter_text():
await websocket.send_text(f"Echo: {message}")
WebSocket lifecycle¶
A WebSocket connection goes through these stages:
- Connection - Client initiates WebSocket handshake
- Accept - Server accepts the connection
- Communication - Exchange messages
- Close - Either party closes the connection
@api.websocket("/ws/chat")
async def chat(websocket: WebSocket):
# 1. Accept the connection
await websocket.accept()
try:
# 2. Communication loop
while True:
message = await websocket.receive_text()
await websocket.send_text(f"You said: {message}")
except WebSocketDisconnect:
# 3. Client disconnected
pass
Sending messages¶
Text messages¶
Binary messages¶
JSON messages¶
Receiving messages¶
Text messages¶
# Single message
message = await websocket.receive_text()
# Iterate over messages
async for message in websocket.iter_text():
print(f"Received: {message}")
Binary messages¶
data = await websocket.receive_bytes()
async for data in websocket.iter_bytes():
print(f"Received {len(data)} bytes")
JSON messages¶
data = await websocket.receive_json()
async for data in websocket.iter_json():
print(f"Received: {data}")
Path parameters¶
WebSocket routes support path parameters:
@api.websocket("/ws/room/{room_id}")
async def room(websocket: WebSocket, room_id: str):
await websocket.accept()
async for message in websocket.iter_text():
await websocket.send_text(f"[{room_id}] {message}")
Query parameters¶
Access query parameters from the connection:
@api.websocket("/ws/connect")
async def connect(websocket: WebSocket, token: str | None = None):
if token != "secret":
await websocket.close(code=4001, reason="Invalid token")
return
await websocket.accept()
# ...
Closing connections¶
From the server¶
await websocket.close()
# With custom close code and reason
await websocket.close(code=1000, reason="Normal closure")
Handling client disconnect¶
from django_bolt import WebSocketDisconnect
@api.websocket("/ws")
async def handler(websocket: WebSocket):
await websocket.accept()
try:
async for message in websocket.iter_text():
await websocket.send_text(message)
except WebSocketDisconnect:
print("Client disconnected")
Close codes¶
Common WebSocket close codes:
| Code | Name | Description |
|---|---|---|
| 1000 | Normal Closure | Normal closure |
| 1001 | Going Away | Server/client going away |
| 1002 | Protocol Error | Protocol error |
| 1003 | Unsupported Data | Unsupported data type |
| 1008 | Policy Violation | Policy violation |
| 1011 | Server Error | Server encountered error |
Access close codes:
Authentication¶
Apply authentication to WebSocket endpoints:
from django_bolt.auth import JWTAuthentication, IsAuthenticated
@api.websocket(
"/ws/protected",
auth=[JWTAuthentication()],
guards=[IsAuthenticated()]
)
async def protected(websocket: WebSocket):
user_id = websocket.context.get("user_id")
await websocket.accept()
await websocket.send_text(f"Welcome, user {user_id}")
The JWT token should be passed as a query parameter:
WebSocket state¶
Check the connection state:
from django_bolt import WebSocketState
if websocket.state == WebSocketState.CONNECTED:
await websocket.send_text("Still connected")
States:
WebSocketState.CONNECTING- Beforeaccept()WebSocketState.CONNECTED- Afteraccept()WebSocketState.DISCONNECTED- After close
Testing WebSockets¶
Use the WebSocketTestClient:
from django_bolt.testing import TestClient
with TestClient(api) as client:
with client.websocket_connect("/ws/echo") as ws:
ws.send_text("Hello")
response = ws.receive_text()
assert response == "Echo: Hello"
Real-time patterns¶
Broadcast to all clients¶
connected_clients = set()
@api.websocket("/ws/broadcast")
async def broadcast(websocket: WebSocket):
await websocket.accept()
connected_clients.add(websocket)
try:
async for message in websocket.iter_text():
# Broadcast to all clients
for client in connected_clients:
await client.send_text(message)
finally:
connected_clients.discard(websocket)
Room-based chat¶
rooms = {} # room_id -> set of websockets
@api.websocket("/ws/room/{room_id}")
async def room(websocket: WebSocket, room_id: str):
await websocket.accept()
if room_id not in rooms:
rooms[room_id] = set()
rooms[room_id].add(websocket)
try:
async for message in websocket.iter_text():
for client in rooms[room_id]:
await client.send_text(f"[{room_id}] {message}")
finally:
rooms[room_id].discard(websocket)