2014-10-02 3 views
2

У нас есть приложение RESTful (-ish), которое использует txpostgres для доступа к postgres db. В настоящее время мы генерируем новые экземпляры txpostgres.Connection каждый раз, когда клиент связывает сервер для вызова db. Это неэффективно и приводит к тому, что наш db быстро перегружается. Я пытался адаптировать это вместо txpostgres.ConnectionPool, но у меня проблемы. Сейчас у меня есть что-то, что выглядит следующим образом:Совместное использование пула соединений txpostgres

class DBTester(object): 
    def __init__(self): 
     self.cfg = load_config('local') # load the db settings from a JSON file 
     self.pool = ConnectionPool(None, min=1, **self.cfg) # create the pool 

    @defer.inlineCallbacks 
    def get_pool(self): 
     yield self.pool.start() 
     defer.returnValue(self.pool) 


class DBT(object): 
    def __init__(self): 
     self.db = DBTester() 

    @defer.inlineCallbacks 
    def t(self): 
     conn = yield self.db.get_pool() 
     res = yield conn.runQuery('select * from clients') 
     println('DBT.t result: {}'.format(res)) 


if __name__ == "__main__": 
    dbt = DBT() 
    dbt.t() 
    dbt.t() 

    reactor.run() 

Вопрос является выбор времени pool.start() вызова. Если я поставлю его в DBTester.__init__, я получаю psycopg2.OperationalError: asynchronous connection attempt underway. Если я помещаю его в DBTester.get_pool, один вызов db.t() работает, а другие сбой - exceptions.AttributeError: 'NoneType' object has no attribute 'runQuery'. Я боролся с этим в основном весь день и не смог его взломать, и я не смог найти много онлайн.

Мне действительно нужен указатель на некоторый минимальный пример того, как используется ConnectionPool. Какие-либо предложения?

+0

Почему у вас есть пул в классе DBTester? Есть ли причина для этого? – brunsgaard

ответ

0

Я не знаю, если это имеет значение, как лучшие практики, но вот то, что мы собираемся с:

## dbutil.py 
class DBConnection(object): 
    def __init__(self, cfg_name): 
     self.cfg_name = cfg_name 
     self.cfg = self.load_config(self.cfg_name) 
     self.pool = txpostgres.ConnectionPool(None, min=5, **self.cfg) 

    @staticmethod 
    def load_config(name): 
     with open('config.json') as json_file: 
      cfg = json.load(json_file) 
     return cfg.get(name) 

    @defer.inlineCallbacks 
    def get_pool(self): 
     try: 
      yield self.pool.start() 
     except txpostgres.AlreadyConnected: 
      pass 
     defer.returnValue(self.pool) 


@defer.inlineCallbacks 
def db_factory(cfg_name): 
    db = DBConnection(cfg_name) 
    db.pool = yield db.get_pool() 
    defer.returnValue(db) 


## basehandler.py 
def __init__(self, name=None, db=None): 
    resource.Resource.__init__(self) 
    self.name = name 
    self.db = db 
    self.pool = self.db.pool 

@defer.inlineCallbacks 
def runQuery(self, *args, **kwargs): 
    res = yield self.pool.runQuery(*args, **kwargs) 
    defer.returnValue(res) 


## server.py 
@defer.inlineCallbacks 
def init_site(db): 
    db = yield db_factory(db) 
    root = RootURLHandler(db) 
    reactor.listenTCP(SERVER_PORT, site) 

def main(db): 
    log.startLogging(LogFile('server.log', '.', maxRotatedFiles=5)) 
    init_site(db) 
    reactor.run() 

Ключ, возможно, неудивительно, что делал инициализации сайта отложенное зависит от дб вещей происходит через.

+0

Я действительно думаю, что вы должны пересмотреть.части этого кода ужасны, к сожалению, особенно часть 'except txpostgres.AlreadyConnected'. но да, это работает. Но он никогда не прошел бы QA, где я работаю. – brunsgaard

+0

Это был код до QA - никто не ответил на мой вопрос, и у меня появился MVP, поэтому я решил, что поступил правильно и поделился тем, что узнал самостоятельно. – swizzard

3

Похоже, ваша проблема связана не с txpostgres, а с искаженным и асинхронным способом мышления.

exceptions.AttributeError: объект «NoneType» не имеет атрибута «runQuery» означает: Вы пытались бросить запросы SQL после базы данных до того, как соединение было сделано. Это глупо! Так что теперь я думаю, что я выброшу исключение, так что дорогой пользователь знает об этом безумии.

Таким образом, это может произойти, если вы что-то вроде

pool = ConnectionPool(None, min=1) 
d1 = pool.start() 
d2 = pool.runQuery('select tablename from pg_tables') 

Этот код создает два deferreds и thorws ет в реакторе. Только алгоритм планирования знает, какой из двух выполняется первым, а если он равен d2, тогда происходит ошибка.

txpostgres.txpostgres.AlreadyConnected средства: Довольно самостоятельно объяснить, это не имеет никакого смысла, чтобы начать пул, который уже начался.

psycopg2.OperationalError: асинхронная попытка соединения в стадию реализация означает:
Я был в середине создания хорошего соединения с базой данных асинхронных, когда вы начали выполнение SQL заявления. Соединение с базой данных еще не было готово, и поэтому запросы sql не выполнялись. Меня огорчает. Я думаю, что я выброшу операционную ошибку, так что уважаемый пользователь знает, что утверждение не выполнено.

Хорошо, поэтому нам нужен способ убедиться, что соединение установлено до того, как мы выберем sql-запросы после базы данных. Ниже приведен пример кода, который использует обратные вызовы для достижения этого.

from txpostgres.txpostgres import ConnectionPool 
from twisted.internet import reactor, defer 
from twisted.python import log, util 


class SomeClass(object): 

    pool = ConnectionPool(
     None, 
     min=1, 
     user="user", 
     password="pass", 
     host='host.com') 

    @defer.inlineCallbacks 
    def fetch_tables(self): 
     res = yield self.pool.runQuery('select tablename from pg_tables') 
     defer.returnValue(res) 


if __name__ == "__main__": 

    def querydb(n=10): 
     dl = [] 
     for i in range(n): 
      d = s.fetch_tables() 
      d.addCallback(lambda tables: util.println(len(tables))) 
      dl.append(d) 
     return defer.DeferredList(dl) 

    s = SomeClass() 
    d_startpool = s.pool.start() 
    d_startpool.addCallback(lambda _: querydb()) 
    d_startpool.addCallback(lambda _: s.pool.close()) 
    d_startpool.addErrback(log.err) 
    d_startpool.addBoth(lambda _: reactor.stop()) 

    reactor.run() 

Надеюсь, это поможет.

Смежные вопросы