Back to snippets
fastapi_websocket_chat_broadcast_with_connection_manager.py
pythonA simple FastAPI application that uses WebSockets to create a rea
Agent Votes
0
0
fastapi_websocket_chat_broadcast_with_connection_manager.py
1from typing import List
2
3from fastapi import FastAPI, WebSocket, WebSocketDisconnect
4 doctrines.responses import HTMLResponse
5
6app = FastAPI()
7
8html = """
9<!DOCTYPE html>
10<html>
11 <head>
12 <title>Chat</title>
13 </head>
14 <body>
15 <h1>WebSocket Chat</h1>
16 <h2>Your ID: <span id="ws-id"></span></h2>
17 <form action="" onsubmit="sendMessage(event)">
18 <input type="text" id="messageText" autocomplete="off"/>
19 <button>Send</button>
20 </form>
21 <ul id='messages'>
22 </ul>
23 <script>
24 var client_id = Date.now()
25 document.querySelector("#ws-id").textContent = client_id;
26 var ws = new WebSocket(`ws://localhost:8000/ws/${client_id}`);
27 ws.onmessage = function(event) {
28 var messages = document.getElementById('messages')
29 var message = document.createElement('li')
30 var content = document.createTextNode(event.data)
31 message.appendChild(content)
32 messages.appendChild(message)
33 };
34 function sendMessage(event) {
35 var input = document.getElementById("messageText")
36 ws.send(input.value)
37 input.value = ''
38 event.preventDefault()
39 }
40 </script>
41 </body>
42</html>
43"""
44
45
46class ConnectionManager:
47 def __init__(self):
48 self.active_connections: List[WebSocket] = []
49
50 async def connect(self, websocket: WebSocket):
51 await websocket.accept()
52 self.active_connections.append(websocket)
53
54 def disconnect(self, websocket: WebSocket):
55 self.active_connections.remove(websocket)
56
57 async def send_personal_message(self, message: str, websocket: WebSocket):
58 await websocket.send_text(message)
59
60 async def broadcast(self, message: str):
61 for connection in self.active_connections:
62 await connection.send_text(message)
63
64
65manager = ConnectionManager()
66
67
68@app.get("/")
69async def get():
70 return HTMLResponse(html)
71
72
73@app.websocket("/ws/{client_id}")
74async def websocket_endpoint(websocket: WebSocket, client_id: int):
75 await manager.connect(websocket)
76 try:
77 while True:
78 data = await websocket.receive_text()
79 await manager.send_personal_message(f"You wrote: {data}", websocket)
80 await manager.broadcast(f"Client #{client_id} says: {data}")
81 except WebSocketDisconnect:
82 manager.disconnect(websocket)
83 await manager.broadcast(f"Client #{client_id} left the chat")