2014-01-30 2 views
-1

У меня есть небольшая программа python, сделанная мной, которая царапает сайт по некоторым ценам. Я использую модуль beautifulsoup 4 и python threading.О Threads in Python

Проблема в том, что я не знаю, как «управлять» потоками. Как видно из кода, я создал подкласс класса threading (что-то вроде потребителя, продюсера). В одном классе я беру ссылки со страниц, а в другом я ищу некоторые классы в html с BS4 и записываю в основной файл.

Когда я начинаю сценарий, я обычно начинаю с темы 1. Я очищаю каждую ссылку на веб-сайте, принимая стоимость имени и статьи. Для каждой ссылки я делаю нить. Поскольку на веб-сайте много ссылок (около 3000), через некоторое время у меня есть столько потоков, которые убивают мой компьютер. Python.exe составляет около 2 ГБ, и мне нужно убить программу.

Это мой четвертый день пытаются найти решение ...... пожалуйста .... :)

Если я правильно: setDaemon (истина) - программа убивает их после выполнения,. join() ждет завершения потока.

Я совершенно новичок в программировании, а также знаю, что код немного грязный. Любые предложения приветствуются.

Не беспокойтесь о последних нескольких блоках try. Это просто для удовольствия.

Спасибо!

import threading 
import csv 
import urllib2 
import time 
from bs4 import BeautifulSoup 
import re 
import Queue 


httpLink = "WWW.SOMEWEBSITE.COM" 
fn = 'J:\\PRICES\\' 

queue = Queue.Queue() 
soup_queue = Queue.Queue() 
brava = threading.Lock() 

links = [] 
brokenLinks = [] 
pageLinks = [] 

fileName = time.strftime("%d_%m_%Y-%H_%M") 


class TakeURL(threading.Thread): 
    def __init__(self, queue, soup_queue): 
     threading.Thread.__init__(self) 
     self.queue = queue 
     self.soup_queue = soup_queue 

    def run(self): 
     while True: 
      host = self.queue.get() 
      try: 
       url = urllib2.urlopen(host) 
       chunk = url.read() 
      except: 
       print ("Broken link " + host) 
       writeCSV("BrokenLinks.csv", "ab", host) 
       brokenLinks.append(host) 
       time.sleep(30) 

      writeCSV('Links.csv','ab',host) 

      if ("class=\"price\"" in chunk): 
       self.soup_queue.put(chunk)     
      else: 
       writeCSV("LinksWithoutPrice.csv", "ab", host) 
       try: 
        findLinks(chunk, "ul", "mainmenu") 
       except: 
        print ("Broken Link" + host) 
        writeCSV("BrokenLinks.csv", "ab", host) 
        brokenLinks.append(host) 
        time.sleep(30)     

      self.queue.task_done() 

class GetDataURL(threading.Thread): 
    getDataUrlLock = threading.Lock() 
    def __init__ (self, soup_queue): 
     threading.Thread.__init__(self) 
     self.soup_queue = soup_queue 
    def run(self): 
     while True: 
      chunk = self.soup_queue.get() 
      soup = BeautifulSoup(chunk) 
      dataArticle = soup.findAll("tr",{"class":""}) 
      pagination = soup.findAll("a",{"class":"page"}) 

      self.getDataUrlLock.acquire() 
      f = open(fn + fileName + ".csv", "ab") 
      filePrice = csv.writer(f) 
      for groupData in dataArticle: 
       for articleName in groupData.findAll("a",{"class":"noFloat"}): 
        fullName = articleName.string.encode('utf-8') 
        print (fullName) 
       for articlePrice in groupData.findAll("div", {"class":"price"}): 
        if (len(articlePrice) > 1): 
         fullPrice = articlePrice.contents[2].strip() 
        else: 
         fullPrice = articlePrice.get_text().strip() 
        print (fullPrice[:-12]) 
        print ('-')*80   

       filePrice.writerow([fullName, fullPrice[:-12]]) 
      f.close() 

      for page in pagination: 
       pageLink = page.get('href') 
       pageLinks.append('http://www.' + pageLink[1:]) 
      self.getDataUrlLock.release() 

      self.soup_queue.task_done() 

def writeCSV(fileName, writeMode, link): 
    try: 
     brava.acquire() 
     f = csv.writer(open(fn + fileName,writeMode)) 
     f.writerow([link])   
    except IOError as e: 
     print (e.message) 
    finally: 
     brava.release() 

def findLinks(chunk, tagName, className): 
    soup = BeautifulSoup(chunk) 
    mainmenu = soup.findAll(tagName,{"class":className}) 
    for mm in mainmenu: 
     for link in mm.findAll('a'): 
      href = link.get('href') 
      links.insert(0,href)    
      print (href) 
      print ('-')*80 

def startMain(links): 
    while (links): 
     #time.sleep(10) 
     threadLinks = links[-10:]   

     print ("Alive Threads: " + str(threading.activeCount())) 
     #time.sleep(1) 

     for item in range(len(threadLinks)): 
      links.pop() 

     for i in range(len(threadLinks)): 
      tu = TakeURL(queue, soup_queue) 
      tu.setDaemon(True) 
      tu.start()    

     for host in threadLinks: 
      queue.put(host) 

     for i in range(len(threadLinks)): 
      gdu = GetDataURL(soup_queue) 
      gdu.setDaemon(True) 
      gdu.start() 

     queue.join() 
     soup_queue.join()   


if __name__ == "__main__": 
    start = time.time() 

    httpWeb = urllib2.urlopen(httpLink) 
    chunk = httpWeb.read() 
    findLinks(chunk, 'li','tab') 

     startMain(links) 
     pageLinks = list(set(pageLinks)) 
     startMain(pageLinks) 
     startMain(brokenLinks) 

    print ('-') * 80 
    print ("Seconds: ") % (time.time() - start) 
    print ('-') * 80 
+0

http://sscce.org! попробуйте сузить свою проблему до ** простого ** кода для чтения! – zmo

+0

Я попробовал мужчину ... но я не знаю, с чего начать или как это сделать ... так что если вы не хотите читать, никаких проблем со мной ... извините ... во всяком случае, это может быть какой-то полный живой пример здесь ... поэтому я хотел бы оставить как есть ... :) – user2026884

+4

Прочитайте [Параллелизм в одной строке] (https://medium.com/building-things-on-the-internet/40e9b2b36148). В нем рассказывается, как легко настроить параллельность в программировании на Python, используя пример очищающих веб-сайтов. – xbonez

ответ

2

Ваша нить никогда не возвращает ничего, поэтому она никогда не останавливается; просто постоянно работает цикл while. И так как вы начинаете новый поток для каждой ссылки, вы в конечном итоге просто добавляете все больше и больше потоков, в то время как предыдущие потоки могут ничего не делать. Вам по существу не понадобится queue с тем, как у вас есть. Этот подход может вызвать проблемы с большим количеством заданий, поскольку вы замечаете.

worker = GetDataURL() worker.start()

действительно указывает на GetDataURL.run() ... которая является бесконечной во время цикла. То же самое верно для TakeURL.start().

Вы могли бы пойти пару маршрутов

1) Просто возьмите время из вашего потока, избавиться от очередей и возвращает результат в конце из run определения. Таким образом, каждый поток имеет 1 задачу, возвращает результаты, а затем останавливается. Не самый эффективный, но потребуется минимальная модификация кода.

2) В вашем startMain, за пределами цикла while, запустите группу из 10 потоков (т. Е. Пул потоков). Эти 10 потоков будут выполняться всегда, и вместо запуска нового потока для каждой ссылки просто поместите ссылку в очередь. Когда поток доступен, он будет запускать следующий элемент в очереди. Но вам все равно нужно управлять очисткой этих потоков.

3) Вы можете немного переработать свой код и использовать встроенные функции, такие как пулы потоков и пулы процессов. Я размещал на пулах процессов до: SO MultiProcessing С помощью этого метода вы можете забыть все беспорядок, связанный с замками. После каждого pool.map (или того, что вы используете) вы можете вставить этот фрагмент информации в файл в код startMain. Убирает много.

Надеюсь, это имеет смысл. Я решил не изменять код, потому что я думаю, что вам стоит экспериментировать с вариантами и выбирать направление.

+0

Добавлено еще несколько комментариев – catwalker333

+0

@catwalker ... я удалил в цикле, и теперь я вижу проблему .... спасибо для руководства ... ваш ответ с сайтом xbonez делает это немного более ясным. – user2026884