2012-03-08 3 views
21

У меня возникли проблемы с пониманием того, как правильно открывать и закрывать сеансы базы данных эффективно, как я понял в документации sqlalchemy, если я использую scoped_session для создания объекта Session, а затем использовать возвращенный сеанс объект для создания сеансов, он является потокобезопасным, поэтому в основном каждый поток будет получать свой собственный сеанс, и проблем с ним не будет. Теперь приведен пример ниже: я помещаю его в бесконечный цикл, чтобы проверить, правильно ли он закрывает сеансы, и если я правильно его отслеживаю (в mysql, выполняя «SHOW PROCESSLIST;»), соединения просто продолжают расти, они не закрывают их , хотя я использовал session.close() и даже удалял объект scoped_session в конце каждого прогона. Что я делаю не так? Моя цель в более крупном приложении - использовать минимальное количество подключений к базе данных, потому что моя текущая рабочая реализация создает новый сеанс в каждом методе, где это требуется, и закрывает его перед возвратом, что кажется неэффективным.Правильная обработка SQLAlchemy в многопоточных приложениях

from sqlalchemy import create_engine 
from sqlalchemy.orm import sessionmaker, scoped_session 
from threading import Thread 
from Queue import Queue, Empty as QueueEmpty 
from models import MyModel 


DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname' 


class MTWorker(object): 

    def __init__(self, worker_count=5): 
     self.task_queue = Queue() 
     self.worker_count = worker_count 
     self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False) 
     self.DBSession = scoped_session(
      sessionmaker(
       autoflush=True, 
       autocommit=False, 
       bind=self.db_engine 
      ) 
     ) 

    def _worker(self): 
     db_session = self.DBSession() 
     while True: 
      try: 
       task_id = self.task_queue.get(False) 
       try: 
        item = db_session.query(MyModel).filter(MyModel.id == task_id).one() 
        # do something with item 
       except Exception as exc: 
        # if an error occurrs we skip it 
        continue 

       finally: 
        db_session.commit() 
        self.task_queue.task_done() 
      except QueueEmpty: 
       db_session.close() 
       return 

    def start(self): 
     try: 
      db_session = self.DBSession() 
      all_items = db_session.query(MyModel).all() 
      for item in all_items: 
       self.task_queue.put(item.id) 

      for _i in range(self.worker_count): 
       t = Thread(target=self._worker) 
       t.start() 

      self.task_queue.join() 
     finally: 
      db_session.close() 
      self.DBSession.remove() 


if __name__ == '__main__': 
    while True: 
     mt_worker = MTWorker(worker_count=50) 
     mt_worker.start() 

ответ

36

Вы должны называть только create_engine и scoped_session один раз в процесса (в базе данных). Каждый из них получит свой собственный пул соединений или сеансов (соответственно), поэтому вы хотите, чтобы вы только создавали один пул. Просто сделайте его модульным уровнем глобальным. если вам нужно управлять сессий более preciesly, чем это, вы, вероятно, не следует использовать scoped_session

Еще одно изменение, чтобы сделать это, чтобы использовать DBSession непосредственно, как если бы это была сессия . методы вызова сеанса на scoped_session будут прозрачно при необходимости создать поток-локальный сеанс и перенаправить вызов метода на сеанс .

Другое дело, чтобы быть в курсе является pool_size из пула соединений, который является 5 по умолчанию. Для многих приложений это хорошо, но если вы создаете много потоков, возможно, потребуется настроить этот параметр

DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname' 
db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False) 
DBSession = scoped_session(
    sessionmaker(
     autoflush=True, 
     autocommit=False, 
     bind=db_engine 
    ) 
) 


class MTWorker(object): 

    def __init__(self, worker_count=5): 
     self.task_queue = Queue() 
     self.worker_count = worker_count 
# snip 
+1

Спасибо за информацию, это было очень полезно на самом деле. Царь приветствует! – andrean

Смежные вопросы