pip install flask flask-socketio
python
from flask import Flask, render_template
from flask_socketio import SocketIO
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
socketio = SocketIO(app)
@app.route('/')
def index():
return render_template('index.html')
@socketio.on('message')
def handle_message(data):
print('Received message: ' + data)
if __name__ == '__main__':
socketio.run(app)
html
<!DOCTYPE html>
<html>
<head>
<title>WebSocket with Flask</title>
</head>
<body>
<h1>WebSocket with Flask</h1>
<script src="//cdnjs.cloudflare.com/ajax/libs/socket.io/1.4.8/socket.io.min.js"></script>
<script type="text/javascript">
var socket = io.connect('http://' + document.domain + ':' + location.port);
socket.on('connect', function() {
socket.send('Client connected.');
});
socket.on('message', function(data) {
console.log('Received message: ' + data);
});
</script>
</body>
</html>
python app.py
pip install django channels
python
django-admin startproject myproject
python
cd myproject
python manage.py startapp myapp
python
INSTALLED_APPS = [
'channels',
'myapp',
]
ASGI_APPLICATION = 'myproject.routing.application'
python
from channels.routing import ProtocolTypeRouter
application = ProtocolTypeRouter({
'http': get_asgi_application(),
'websocket': AuthMiddlewareStack(
URLRouter(
myapp.routing.websocket_urlpatterns
)
),
})
python
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r'ws/$', consumers.MyConsumer.as_asgi()),
]
python
from channels.generic.websocket import AsyncWebsocketConsumer
class MyConsumer(AsyncWebsocketConsumer):
async def connect(self):
await self.accept()
async def disconnect(self, close_code):
pass
async def receive(self, text_data):
print('Received message: ' + text_data)
python manage.py runserver