2015-03-15 1 views
1

Я использую свой собственный конвейер для хранения сломанных элементов в базе данных PostgreSQL, я сделал расширение несколько дней назад, и теперь я храню данные в 3 базах данных. Таким образом, я хочу, чтобы конвейер, в который вставлялись данные, вызывается каждые 100 элементов, или он берет элементы и вставляет их 100 на 100.Batch/Bulk SQL insert in Scrapy Pipelines [PostgreSQL]

Причина, по которой я хочу сделать это быстро и меньше головной боли на серверах БД.

+1

Пожалуйста, обновите свой вопрос с трубопроводом, который вы используете. –

ответ

4

Решение не было, что отличается от ответа Anandhakumar в я создал глобальный список в файл параметров с помощью метода и присваивателя для него

# This buffer for the bluk insertion 
global products_buffer 

products_buffer = [] 

# Append product to the list 
def add_to_products_buffer(product): 
    global products_buffer 
    products_buffer.append(product) 

# Get the length of the product 
def get_products_buffer_len(): 
    global products_buffer 
    return len(products_buffer) 

# Get the products list 
def get_products_buffer(): 
    global products_buffer 
    return products_buffer 

# Empty the list 
def empty_products_buffer(): 
    global products_buffer 
    products_buffer[:] = [] 

Тогда я импортировал его в трубопровод

from project.settings import products_buffer,add_to_products_buffer,get_products_buffer_len,empty_products_buffer,get_products_buffer 

и приложение I завершайте элемент в списке каждый раз, когда вызывается конвейер, и я проверяю, соответствует ли длина списка 100 I циклу в списке, чтобы подготовить много запросов вставки, но самая важная магия состоит в том, чтобы передать их всем в одной строке, t совершать в цикле, иначе вы ничего не выиграете, и для их вставки потребуется много времени.

def process_item(self, item, spider): 
    # Adding the item to the list 
    add_to_products_buffer(item) 
    # Check if the length is 100 
    if get_products_buffer_len() == 100: 
     # Get The list to loop on it 
     products_list = get_products_buffer() 
     for item in products_list: 
      # The insert query 
      self.cursor.execute('insert query') 
     try: 
      # Commit to DB the insertions quires 
      self.conn.commit() 
      # Emty the list 
      empty_products_buffer() 
     except Exception, e: 
      # Except the error 

Также вы можете использовать executemany, если вы не хотите, чтобы петли.

+0

Мне очень нравится простота этого, но как мне обрабатывать элементы, которые не достигают предела 100, когда заканчивается искатель? –

+0

Вы должны сделать это вручную, вставив то, что осталось после того, как паук закончил свою работу. –

+0

Я использую метод close_spider. Кажется, это работает. Спасибо –

0

Это не опция scrapy, но опция psycopg2 для массового вставки строк. Этот вопрос имеет параметры, которые можно использовать: Psycopg2, Postgresql, Python: Fastest way to bulk-insert

+0

Конвейер является обратным вызовом для каждого элемента, я хочу, чтобы пакет выполнял каждые 100 элементов. –

0

Я не знаю Scrapy, и если он имеет какой-либо функциональности очереди встроенный, но, возможно, вы можете нажать ваш запрос-х на standard python Queue от Scrapy, а затем есть потребитель, который следит за очередью, и как только на нем будет 100 предметов, выполните их все, что действительно можно сделать с помощью psycopg2 (см. psycopg2: insert multiple rows with one query).

Вы могли бы сделать что-то вроде

queryQueue = Queue() 
def queryConsumer(){ 
    while True: 
    if queryQueue.qsize()==100: 
     queries=[queryQueue.get() for i in range(100)]    
     #execute the 100 queries 
} 
t = Thread(target=queryConsumer) 
t.daemon = True 
t.start() 

С вашего метода Scrapy вы могли бы назвать

queryQueue.put(myquery) 

толкать предметы на очереди.

0

Мое предложение,

самого по себе вашему пауку вы можете сделать, что нет необходимости писать в трубопроводе вообще.

Создать соединение postgres DB в пауке, используя psycopg2.

psycopg2.connect(database="testdb", user="postgres", password="cohondob", host="127.0.0.1", port="5432") 
connection.cursor() 

Если он достигает 100, тогда вставьте в db, затем зафиксируйте это в db.

Например:

  x = ({"name":"q", "lname":"55"}, 
      {"name":"e", "lname":"hh"}, 
      {"name":"ee", "lname":"hh"}) 
cur.executemany("""INSERT INTO bar(name,lname) VALUES (%(name)s, %(lname)s)""", x) 
0

Я только что написал небольшое расширение scrapy, чтобы сохранить очищенные элементы в базе данных.scrapy-sqlitem

Это очень проста в использовании.

pip install scrapy_sqlitem 

Определение Scrapy элементов с помощью SQLAlchemy Таблицы

from scrapy_sqlitem import SqlItem 

class MyItem(SqlItem): 
    sqlmodel = Table('mytable', metadata 
     Column('id', Integer, primary_key=True), 
     Column('name', String, nullable=False)) 

Написать свой паук и наследовать от SqlSpider

from scrapy_sqlitem import SqlSpider 

class MySpider(SqlSpider): 
    name = 'myspider' 

    start_urls = ('http://dmoz.org',) 

    def parse(self, response): 
     selector = Selector(response) 
     item = MyItem() 
     item['name'] = selector.xpath('//title[1]/text()').extract_first() 
     yield item 

Добавить DATABASE_URI и настройки chunksize в settings.py.

DATABASE_URI = "postgresql:///mydb" 

DEFAULT_CHUNKSIZE = 100 

CHUNKSIZE_BY_TABLE = {'mytable': 100, 'othertable': 250} 

Создайте столы, и все готово!

http://doc.scrapy.org/en/1.0/topics/item-pipeline.html#activating-an-item-pipeline-component

http://docs.sqlalchemy.org/en/rel_1_1/core/tutorial.html#define-and-create-tables