python from sanic import Sanic from sanic.response import json app = Sanic("myapp") @app.route("/") async def test(request): return json(response) async def call_external_api(): return {'message': 'Hello World'} if __name__ == '__main__': app.run(host="0.0.0.0", port=8000) python from sanic import Sanic from sanic.response import json import aiomysql app = Sanic("myapp") async def connect_db(): global db db = await aiomysql.connect(host="localhost", port=3306, user="root", password="password", db="mydb") @app.route("/") async def test(request): async with db.cursor() as cursor: await cursor.execute("SELECT * FROM users") result = await cursor.fetchall() return json(result) if __name__ == '__main__': app.run(host="0.0.0.0", port=8000) python from sanic import Sanic from sanic.response import json app = Sanic("myapp") async def log_middleware(request): print('Request:', request.url) @app.route("/") async def test(request): return json({'message': 'Hello World'}) if __name__ == '__main__': app.run(host="0.0.0.0", port=8000) python from sanic import Sanic from sanic.response import json import asyncio app = Sanic("myapp") async def worker(): while True: task = await queue.get() await do_some_work(task) queue.task_done() async def do_some_work(task): print('Task:', task) @app.route("/") async def test(request): queue.put_nowait('Task2') return json({'message': 'Tasks added to queue'}) if __name__ == '__main__': asyncio.get_event_loop().create_task(worker()) app.run(host="0.0.0.0", port=8000)


上一篇:
下一篇:
切换中文