2014-01-12 2 views
4

Я читал об использовании ORM SQLAlchemy в контексте приложения Twisted. Это много информации, чтобы переварить, так что у меня есть немного проблем с тем, чтобы собрать все части. До сих пор я собрал следующие абсолютные истины:Является ли это приемлемым способом выполнения запросов SQLAlchemy с резьбой от Twisted?

  1. Одна сессия подразумевает один поток. Всегда.
  2. scoped_session, по умолчанию, предоставляет нам способ ограничения сеансов для заданного потока. Другими словами, я уверен, что, используя scoped_session, я не буду передавать сеансы другим потокам (если только я не сделаю это явно, чего я не буду).

Я также собрал, что есть некоторые проблемы, связанные с ленивой/нетерпеливой загрузкой, и что один из возможных подходов состоит в том, чтобы отделить объекты ORM от сеанса и повторно привязать их к другому сеансу при изменении потоков. Я довольно неясен в деталях, но я также пришел к выводу, что scoped_session делает многие из этих моментов спорными.

Мой первым вопрос, является ли или не я сильно ошибся в своих выводах выше.

Помимо этого, я разработал этот подход, который, я надеюсь, удовлетворительный.

Я начинаю с создания scoped_session объекта ...

Session = scoped_session(sessionmaker(bind=_my_engine)) 

... который я затем использовать с менеджером контекста для того, чтобы обрабатывать исключения и очистки изящно:

@contextmanager 
def transaction_context(): 
    session = Session() 
    try: 
     yield session 
     session.commit() 
    except: 
     session.rollback() 
     raise 
    finally: 
     session.remove() # dispose of the session 

Теперь все, что мне нужно сделать, это использовать указанный выше менеджер контекста в функции, отложенной на отдельный поток. Я бросил вместе декоратора, чтобы сделать вещи немного симпатичнее:

def threaded(fn): 
    @wraps(fn) # functools.wraps 
    def wrapper(*args, **kwargs): 
     return deferToThread(fn, *args, **kwargs) # t.i.threads.deferToThread 
    return wrapper 

Вот пример того, как я намерен использовать весь притон. Ниже приведена функция, которая выполняет поиск DB, используя SQLAlchemy ОРМ:

@threaded 
def get_some_attributes(group): 
    with transaction_context() as session: 
     return session.query(Attribute).filter(Attribute.group == group) 

Мой второй вопрос, является ли или нет этот подход является жизнеспособным.

  • Я делаю какие-либо принципиально ошибочные предположения?
  • Есть ли какие-либо оговорки?
  • Есть ли лучший способ?

Edit:Here является родственным вопросом о неожиданной ошибке в моем контексте менеджере.

+0

Ну, я думаю, главный вопрос, который у всех будет: работает ли так, как вы его закодировали? – bitcycle

+0

@bitcycle, кстати (и удивительно), нет ... это не работает. Я получаю 'AttributeError' в моем диспетчере контекстов - видимо,' Session' не имеет атрибута 'remove'.Это довольно удивительно - – blz

+1

Это выглядит в основном правильно (за исключением того, что это должно быть 'Session.remove()'). Следует иметь в виду, что некоторые драйверы базы данных, такие как 'sqlite3', не позволяют передавать их потоки между потоками. Я уверен, что объекты 'scoped_session' обрабатывают это соответствующим образом (прошло некоторое время с тех пор, как я использовал SQLAlchemy). –

ответ

3

Сейчас я работаю над этой точной проблемой, и я думаю, что нашел решение.

Действительно, вы должны отложить все функции доступа к базе данных в поток. Но в вашем решении вы удаляете сеанс после запроса базы данных, поэтому все ваши объекты ORM результатов будут отсоединены, и вы не будете иметь доступ к их полям.

Вы не можете использовать scoped_session, потому что в Twisted у нас есть только один MainThread (за исключением тех вещей, которые работают в deferToThread). Однако мы можем использовать scoped_sesssion с scopefunc.

В Twisted есть великая вещь, известная как ContextTracker:

обеспечивает способ передачи данных произвольной ключ/значение вверх и вниз стек вызовов без передачи их в качестве параметров функций на этот вызов стек.

В моей скрученной веб-приложение в методе render_GET я поставил uuid параметр:

call = context.call({"uuid": str(uuid.uuid4())}, self._render, request) 

, а затем я вызываю метод _render делать реальную работу (работа с БД, визуализации HTML, и т.д.).

Я создаю scoped_session так:

scopefunc = functools.partial(context.get, "uuid") 
Session = scoped_session(session_factory, scopefunc=scopefunc) 

Теперь в любой функции вызовов _render я могу получить сессию:

Session() 

и в конце _render я должен сделать Session.remove() к удалить сеанс.

Это работает с моим webapp, и я думаю, что он может работать и для других задач.

Это полностью отдельный пример, покажите, как все это работает вместе.

from twisted.internet import reactor, threads 
from twisted.web.resource import Resource 
from twisted.web.server import Site, NOT_DONE_YET 
from twisted.python import context 
from sqlalchemy import create_engine, Column, Integer, String 
from sqlalchemy.orm import sessionmaker, scoped_session 
from sqlalchemy.ext.declarative import declarative_base 
import uuid 
import functools 

engine = create_engine(
    'sqlite:///test.sql', 
    connect_args={'check_same_thread': False}, 
    echo=False) 

session_factory = sessionmaker(bind=engine) 
scopefunc = functools.partial(context.get, "uuid") 
Session = scoped_session(session_factory, scopefunc=scopefunc) 
Base = declarative_base() 


class User(Base): 
    __tablename__ = 'users' 
    id = Column(Integer, primary_key=True) 
    name = Column(String) 

Base.metadata.create_all(bind=engine) 


class TestPage(Resource): 
    isLeaf = True 

    def render_GET(self, request): 
     context.call({"uuid": str(uuid.uuid4())}, self._render, request) 
     return NOT_DONE_YET 

    def render_POST(self, request): 
     return self.render_GET(request) 

    def work_with_db(self): 
     user = User(name="TestUser") 
     Session.add(user) 
     Session.commit() 
     return user 

    def _render(self, request): 
     print "session: ", id(Session()) 
     d = threads.deferToThread(self.work_with_db) 

     def success(result): 
      html = "added user with name - %s" % result.name 
      request.write(html.encode('UTF-8')) 
      request.finish() 
      Session.remove() 
     call = functools.partial(context.call, {"uuid": scopefunc()}, success) 
     d.addBoth(call) 
     return d 

if __name__ == "__main__": 
    reactor.listenTCP(8888, Site(TestPage())) 
    reactor.run() 

Я распечатываю идентификатор сеанса, и вы можете видеть, что он отличается для каждого запроса. Если вы удалите scopefunc из конструктора scoped_session и выполните два одновременных запроса (введите time.sleep в work_with_db), вы получите один общий сеанс для этих двух запросов.

Объект scoped_session по умолчанию использует threading.local() в качестве хранилища, так что один сеанс поддерживается для всех, призывающих реестра scoped_session, но только в пределах одной нити

проблема в том, что в скрученном случае у нас есть только один поток для всех запросов. Вот почему мы должны создать собственный scopefunc, который покажет разницу между запросами.

Другая проблема, что twisted не передавал контекст обратным вызовам, и мы должны обернуть обратный вызов и отправить текущий контекст.

call = functools.partial(context.call, {"uuid": scopefunc()}, success) 

Тем не менее, я не знаю, как заставить его работать с defer.inLineCallback, что я использую везде в моем коде.

+0

Спасибо за ваш ответ! Не могли бы вы представить более подробный пример, который явно показывает, как «ContextTracker», «_render» и «scoped_session» подходят друг к другу? У меня возникли проблемы с просмотром картины. Почему «scopefunc» необходим с «scoped_session»? Спасибо! – blz

+0

@blz добавить пример и пояснение scopefunc используя. – aborilov

+1

@aborilov Спасибо! Хорошее решение, которое вы получили здесь ... Интересно, может ли это быть сделано более косвенно с декоратором – CodeMode

Смежные вопросы