python import ssl import asyncio import websockets async def secure_websocket_handler(websocket, path): ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) ssl_context.load_cert_chain(certfile='path_to_certificate.crt', keyfile='path_to_private_key.key') await websocket.send('Secure WebSocket connection established!') async for message in websocket: await websocket.send('Received message: ' + message) start_server = websockets.serve(secure_websocket_handler, 'localhost', 8765, ssl=ssl_context) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever() python import asyncio import websockets user_database = { 'user1': 'password1', 'user2': 'password2' } async def authenticate(username, password): if username in user_database and user_database[username] == password: return True return False async def authorized_websocket_handler(websocket, path): authentication_successful = False async for message in websocket: if not authentication_successful: username, password = message.split(',') authentication_successful = await authenticate(username, password) if authentication_successful: await websocket.send('Successfully authenticated!') else: await websocket.send('Authentication failed!') else: await websocket.send('Received message: ' + message) start_server = websockets.serve(authorized_websocket_handler, 'localhost', 8765) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()


上一篇:
下一篇:
切换中文