如何使Tornado中的SQLAlchemy成为asynchronous?

如何使Tornado SQLAlchemy成为async ? 我在MongoDB上find了asynchronousmongo例子的例子,但是我找不到像SQLAlchemy这样的motor 。 有谁知道如何使SQLAlchemy查询执行与tornado.gen (我正在使用MySQL下面的SQLAlchemy ,目前我的处理程序读取数据库和返回结果,我想使这个asynchronous)。

ORM不太适合显式asynchronous编程,也就是说,程序员必须在使用networking访问的任何时候产生明确的callback。 一个主要的原因是ORM大量使用延迟加载模式,它与或多或less与显式asynchronous不兼容。 代码如下所示:

 user = Session.query(User).first() print user.addresses 

实际上会发出两个单独的查询 – 一个是当你说first()加载一行时,另一个是当你说user.addresses时,如果user.addresses集合不存在或已经过期。 从本质上讲,几乎每一行处理ORM结构的代码都可能会阻塞IO,所以你会在几秒钟内大量callback意大利面 – 而且更糟糕的是,这些代码行绝大部分都不会实际上阻塞IO,所以将callback连接在一起的所有开销,否则将是简单的属性访问操作,也会使您的程序效率大大降低。

显式asynchronous模型的一个主要问题是它们为复杂的系统增加了巨大的Python函数调用开销 – 不仅仅是像延迟加载那样在面向用户的方面,而且在内部方面以及系统如何提供抽象Python数据库API(DBAPI)。 对于SQLAlchemy来说,即使有基本的asynchronous支持,也会对绝大多数不使用asynchronous模式的程序造成严重的性能损失,甚至是那些并不是高度并发的asynchronous程序。 考虑SQLAlchemy或任何其他ORM或抽象层可能具有如下代码:

 def execute(connection, statement): cursor = connection.cursor() cursor.execute(statement) results = cursor.fetchall() cursor.close() return results 

上面的代码执行似乎是一个简单的操作,在连接上执行SQL语句。 但是使用像psycopg2的asynchronous扩展完全asynchronousDBAPI,上面的代码在IO上至less阻塞三次。 所以要用明确的asynchronous风格编写上面的代码,即使没有使用asynchronous引擎,并且callback没有实际阻塞,也就意味着上面的外部函数调用变成了至less三个函数调用,而不是一个,不包括施加的开销由显式asynchronous系统或DBAPI自行调用。 因此,一个简单的应用程序会自动给出3倍的函数调用开销,围绕语句执行的简单抽象。 而在Python中, 函数调用开销就是一切 。

由于这些原因,对于围绕显式asynchronous系统的炒作,我至今还不甚了解,至less在某种程度上,一些人似乎希望对所有东西都进行asynchronous处理,比如提供网页(参见node.js)。 我build议使用隐式的asynchronous系统,最重要的是gevent ,在这里你可以获得asynchronous模型的所有非阻塞IO好处,而不需要显式callback的结构冗长/缺点。 我继续尝试理解这两种方法的用例,所以我对明确的asynchronous方法的吸引力感到困惑,因为它解决了所有问题,就像你用node.js看到的那样 – 我们正在使用脚本语言首先要减less冗长和代码复杂性,而对于像传递网页这样简单的东西,明确的asynchronous似乎什么都不做,只能添加样板,也可以通过gevent或类似的方法自动化,如果阻塞IO甚至是一个问题像这样的情况下(大量的高容量的网站做同步IO模型罚款)。 基于Gevent的系统已经过生产validation,并且它们的stream行度也在不断增长,所以如果您喜欢ORM提供的代码自动化,那么您可能还希望采用像gevent这样的系统提供的asynchronousIO调度自动化。

更新 :尼克·科格伦(Nick Coghlan)指出了他关于显式和隐式asynchronous这个主题的伟大文章,这也是必须阅读的。 而且我也被更新了一个事实,即pep-3156现在欢迎与gevent的互操作性 ,扭转了之前对gevent的不感兴趣,主要是由于Nick的文章。 所以,在将来我会推荐一个混合使用gevent的Tornado来处理数据库逻辑,一旦集成这些方法的系统可用。

我在过去有同样的问题,我找不到一个可靠的Async-MySQL库。 然而,使用Asyncio + Postgres有一个很酷的解决scheme。 你只需要使用aiopg库,它提供了开箱即用的SQLAlchemy支持:

 import asyncio from aiopg.sa import create_engine import sqlalchemy as sa metadata = sa.MetaData() tbl = sa.Table('tbl', metadata, sa.Column('id', sa.Integer, primary_key=True), sa.Column('val', sa.String(255))) @asyncio.coroutine def go(): engine = yield from create_engine(user='aiopg', database='aiopg', host='127.0.0.1', password='passwd') with (yield from engine) as conn: yield from conn.execute(tbl.insert().values(val='abc')) res = yield from conn.execute(tbl.select().where(tbl.c.val=='abc')) for row in res: print(row.id, row.val) loop = asyncio.get_event_loop() loop.run_until_complete(go()) 

我正在用sqlalchemy在下一个方式使用龙卷风:

from tornado_mysql import pools from sqlalchemy.sql import table, column, select, join from sqlalchemy.dialects import postgresql, mysql # from models import M, M2 t = table(...) t2 = table(...) xxx_id = 10 j = join(t, t2, tct_id == t2.c.id) s = select([t]).select_from(j).where(tcxxx == xxx_id) sql_str = s.compile(dialect=mysql.dialect(),compile_kwargs={"literal_binds": True}) pool = pools.Pool(conn_data...) cur = yield pool.execute(sql_str) data = cur.fetchone()
from tornado_mysql import pools from sqlalchemy.sql import table, column, select, join from sqlalchemy.dialects import postgresql, mysql # from models import M, M2 t = table(...) t2 = table(...) xxx_id = 10 j = join(t, t2, tct_id == t2.c.id) s = select([t]).select_from(j).where(tcxxx == xxx_id) sql_str = s.compile(dialect=mysql.dialect(),compile_kwargs={"literal_binds": True}) pool = pools.Pool(conn_data...) cur = yield pool.execute(sql_str) data = cur.fetchone() 

在这种情况下,我们可以使用sqlalchemy模型和sqlalchemy工具来构造查询。

不是龙卷风,但我们在GINO项目的 asyncio中做了一些SQLAlchemyasynchronous:

 import asyncio from gino import Gino, enable_task_local from sqlalchemy import Column, Integer, Unicode, cast db = Gino() class User(db.Model): __tablename__ = 'users' id = Column(Integer(), primary_key=True) nickname = Column(Unicode(), default='noname') async def main(): await db.create_pool('postgresql://localhost/gino') # Create object, `id` is assigned by database u1 = await User.create(nickname='fantix') print(u1.id, u1.nickname) # 1 fantix # Retrieve the same row, as a different object u2 = await User.get(u1.id) print(u2.nickname) # fantix # Update affects only database row and the operating object await u2.update(nickname='daisy') print(u2.nickname) # daisy print(u1.nickname) # fantix # Returns all user objects with "d" in their nicknames users = await User.query.where(User.nickname.contains('d')).gino.all() # Find one user object, None if not found user = await User.query.where(User.nickname == 'daisy').gino.first() # Execute complex statement and return command status status = await User.update.values( nickname='No.' + cast(User.id, Unicode), ).where( User.id > 10, ).gino.status() # Iterate over the results of a large query in a transaction as required async with db.transaction(): async for u in User.query.order_by(User.id).gino.iterate(): print(u.id, u.nickname) loop = asyncio.get_event_loop() enable_task_local(loop) loop.run_until_complete(main()) 

它看起来有点像,但实际上与SQLAlchemy ORM 完全不同 。 因为我们只使用了SQLAlchemy核心的一部分,并在其上构build了一个简单的ORM。 它使用下面的asyncpg ,所以它只适用于PostgreSQL

更新 :由于Vladimir Goncharov的贡献,GINO现在支持Tornado。 在这里看到文档