python
from sanic import Sanic
from sanic.response import json
app = Sanic("myapp")
@app.route("/")
async def test(request):
return json(response)
async def call_external_api():
return {'message': 'Hello World'}
if __name__ == '__main__':
app.run(host="0.0.0.0", port=8000)
python
from sanic import Sanic
from sanic.response import json
import aiomysql
app = Sanic("myapp")
async def connect_db():
global db
db = await aiomysql.connect(host="localhost", port=3306, user="root", password="password", db="mydb")
@app.route("/")
async def test(request):
async with db.cursor() as cursor:
await cursor.execute("SELECT * FROM users")
result = await cursor.fetchall()
return json(result)
if __name__ == '__main__':
app.run(host="0.0.0.0", port=8000)
python
from sanic import Sanic
from sanic.response import json
app = Sanic("myapp")
async def log_middleware(request):
print('Request:', request.url)
@app.route("/")
async def test(request):
return json({'message': 'Hello World'})
if __name__ == '__main__':
app.run(host="0.0.0.0", port=8000)
python
from sanic import Sanic
from sanic.response import json
import asyncio
app = Sanic("myapp")
async def worker():
while True:
task = await queue.get()
await do_some_work(task)
queue.task_done()
async def do_some_work(task):
print('Task:', task)
@app.route("/")
async def test(request):
queue.put_nowait('Task2')
return json({'message': 'Tasks added to queue'})
if __name__ == '__main__':
asyncio.get_event_loop().create_task(worker())
app.run(host="0.0.0.0", port=8000)