python from twisted.internet import defer from twisted.python import log from txpostgres import txpostgres async def connect_and_query(): dsn = txpostgres.ConnectionParameters(database='mydb', user='myuser', password='mypassword', host='localhost', port=5432) conn = await txpostgres.Connection(dsn=dsn) await conn.connect() try: result = await conn.runQuery("SELECT * FROM mytable") print(result) except txpostgres.TxPostgresQueryError as e: log.err("Query failed: %s" % e) finally: await conn.close() if __name__ == '__main__': import asyncio asyncio.run(connect_and_query()) pip install twisted txpostgres psycopg2


上一篇:
下一篇:
切换中文