3

Я пытаюсь очистить одновременно с селеном и многопроцессорными модулями. Ниже примерно мой подход:Скребок одновременно с селеном в питоне

  • создать очередь с числом экземпляров WebDriver равно числу рабочих
  • создать пул рабочих
  • каждый работник тянет WebDriver, например, из очереди
  • когда функция завершается WebDriver экземпляр помещается обратно в очередь

Вот код:

#!/usr/bin/env python 
# encoding: utf-8 

import time 
import codecs 
from selenium import webdriver 
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities 
from multiprocessing import Pool 
from Queue import Queue 


def download_and_save(link_tuple): 
    link_id, link = link_tuple 
    print link_id 
    w = q.get() 
    w.get(link) 
    with codecs.open('%s.html' % link_id, 'w', encoding='utf-8') as f: 
     f.write(w.page_source) 
    time.sleep(10) 
    q.put(w) 


def main(num_processes): 
    links = [ 
     'http://openjurist.org/743/f2d/273/zwiener-v-commissioner-of-internal-revenue', 
     'http://www.oyez.org/advocates/z/l/lonny_f_zwiener', 
     'http://www.texasbar.com/attorneys/member.cfm?id=21191', 
     'https://www.courtlistener.com/opinion/441662/lonny-f-zwiener-and-ardith-e-zwiener-v-commissione/cited-by', 
     'https://www.courtlistener.com/opinion/441662/lonny-f-zwiener-and-ardith-e-zwiener-v-commissione/authorities/', 
     'http://www.myheritage.com/names/lonny_zwiene', 
     'https://law.resource.org/pub/us/case/reporter/F2/743/743.F2d.273.84-4068.htm', 
     'http://www.ancestry.com/1940-census/usa/Texas/Lonny-F-Zwiener_5bbff', 
     'http://search.ancestry.com/cgi-bin/sse.dll?gl=34&rank=1&new=1&so=3&MSAV=0&msT=1&gss=ms_f-34&gl=bmd_death&rank=1&new=1&so=1&MSAV=0&msT=1&gss=ms_f-2_s&gsfn=Lonny&gsln=Zwiener&msypn__ftp=T', 
     'http://www.mocavo.com/Lonny-F-Zwiener-Fredericksburg-Gillespie-Texas-1940-United-States-Census/0798164756456805432', 
     'http://www.taftlaw.com/attorneys/635-mark-s-yuric' 
    ] 
    n = len(links) 
    link_tuples = [(link_id, link) for link_id, link in zip(xrange(n), links)] 
    pool = Pool(num_processes) 
    pool.map(download_and_save, link_tuples) 


if __name__ == '__main__': 
    num_processes = 2 
    q = Queue() 
    dcap = dict(DesiredCapabilities.PHANTOMJS) 
    dcap["phantomjs.page.settings.userAgent"] = (
     "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 " 
     "(KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36" 
    ) 
    for i in range(num_processes): 
     w = webdriver.PhantomJS(desired_capabilities=dcap) 
     q.put(w) 
    main(num_processes) 

Эти сценарии запускаются, но сохраненные htmls либо дублируются, либо отсутствуют.

+0

Я думаю, что сам webdriver действует только как один статический экземпляр и поэтому не порождает новые экземпляры для соответствия потокам. по крайней мере, с pahntomjs, так оно и есть. –

+0

@jimtollan, спасибо за комментарий. Вы знаете, есть ли способ создать несколько экземпляров webdriver? – user2016508

+1

Я довольно грубо пошел по пути появления новых экземпляров автономного приложения, вместо того, чтобы обрабатывать несколько потоков, чтобы преодолеть это препятствие. Это некрасиво, и я уверен, что есть лучший способ - однако, мой отказ в то время был против временного давления и был только использован в течение нескольких дней, поэтому принял прагматичный подход –

ответ

0

Вот другой подход, с которым я добился успеха: вы держите своих рабочих в __main__, а рабочие вытаскивают из task_q.

import multiprocessing 
import traceback 

class scrapeWorker(multiprocessing.Process): 
    def __init__(self, worker_num, task_q, result_q): 
     super(scrapeWorker, self).__init__() 
     self.worker_num = worker_num 
     self.task_q = task_q 
     self.result_q = result_q 

     self.scraper = my_scraper_class() # this contains driver code, methods, etc. 

    def handleWork(self, work): 
     assert isinstance(work, tuple) or isinstance(work, list), "work should be a tuple or list. found {}".format(type(work)) 
     assert len(work) == 2, "len(work) != 2. found {}".format(work) 
     assert isinstance(work[1], dict), "work[1] should be a dict. found {}".format(type(work[1])) 

     # do the work 
     result = getattr(self.scraper, work[0])(**work[1]) 

     self.result_q.put(result) 

    # worker.run() is actually called via worker.start() 
    def run(self): 
     try: 
      self.scraper.startDriving() 

      while True: 
       work = self.task_q.get() 

       if work == 'KILL': 
        self.scraper.driver.quit() 
        break 

       self.handleWork(work) 
     except: 
      print traceback.format_exc() 

      raise 

if __name__ == "__main__": 
    num_workers = 4 

    manager = multiprocessing.Manager() 
    task_q = manager.Queue() 
    result_q = manager.Queue() 

    workers = [] 
    for worker_num in xrange(num_workers): 
     worker = scrapeWorker(worker_num, task_q, result_q) 
     worker.start() 
     workers.append(worker) 

    # you decide what job_stuff is 
    # work == [ 'method_name', {'kw_1': val_1, ...} ] 
    for work in job_stuff: 
     task_q.put(work) 

    results = [] 
    while len(results) < len(job_stuff): 
     results.append(result_q.get()) 

    for worker in workers: 
     task_q.put("KILL") 

    for worker in workers: 
     worker.join() 

    print "finished!" 


#### 
Смежные вопросы