2010-02-19 2 views
2

Я создаю поточный скрипт python, в котором есть коллекция файлов, помещенных в очередь, а затем неизвестное количество потоков (по умолчанию - 3), чтобы начать загрузку. Когда каждый из потоков завершается, он обновляет stdout с статусом очереди и процентом. Все файлы загружаются, но информация о статусе неверна в третьем потоке, и я не уверен, почему. Я рассматривал возможность создания очереди с заданной работой, чтобы использовать ее для расчета, но не думаю, что мне нужно/чтобы это имело бы значение. Может ли кто-нибудь указать мне в правильном направлении?Проблема с потоком/очередью Python

download_queue = queue.Queue() 

class Downloader(threading.Thread): 
    def __init__(self,work_queue): 
     super().__init__() 
     self.current_job = 0 
     self.work_queue = work_queue 
     self.queue_size = work_queue.qsize() 

    def run(self): 
     while self.work_queue.qsize() > 0: 
      url = self.work_queue.get(True) 
      system_call = "wget -nc -q {0} -O {1}".format(url,local_file) 
      os.system(system_call) 
      self.current_job = int(self.queue_size) - int(self.work_queue.qsize()) 
      self.percent = (self.current_job/self.queue_size) * 100 
      sys.stdout.flush() 
      status = "\rDownloading " + url.split('/')[-1] + " [status: " + str(self.current_job) + "/" + str(self.queue_size) + ", " + str(round(self.percent,2)) + "%]" 
     finally: 
      self.work_queue.task_done() 
def main: 
    if download_queue.qsize() > 0: 
     if options.active_downloads: 
      active_downloads = options.active_downloads 
     else: 
      active_downloads = 3 
     for x in range(active_downloads): 
      downloader = Downloader(download_queue) 
      downloader.start() 
     download_queue.join() 
+2

Действительно ли этот код? Где вы печатаете сообщение о состоянии? Какая версия python поддерживает 'while ... finally'? –

+0

См. Также http://stackoverflow.com/questions/1965213/file-downloading-using-python-with-threads –

ответ

4

Вы не можете проверить размер очереди в одном операторе, а затем .get() из очереди в следующем. Тем временем весь мир, возможно, изменился. Вызов метода .get() - это одна атомная операция, которую вы должны вызвать. Если он поднимает Empty или блокирует, очередь пуста.

Ваши потоки могут переписывать выходные данные друг друга. У меня будет другой поток с входной очередью. Только задание - напечатать элементы в очереди на stdout. Он также может подсчитывать количество завершенных элементов и получать информацию о состоянии.

Я также, как правило, не подкласс Thread, но вместо того, чтобы просто поставить простой Thread экземпляр с параметром target= и .start() нити.

на основе вашего ответа, попробуйте следующее:

download_queue = queue.Queue() 


class Downloader(threading.Thread): 
    def __init__(self,work_queue, original_size): 
     super().__init__() 
     self.current_job = 0 
     self.work_queue = work_queue 
     self.queue_size = original_size 

    def run(self): 
     while True: 
      try: 
       url = self.work_queue.get(False) 
       system_call = "wget -nc -q {0} -O {1}".format(url,local_file) 
       os.system(system_call) 
       # the following code is questionable. By the time we get here, 
       # many other items may have been taken off the queue. 
       self.current_job = int(self.queue_size) - int(self.work_queue.qsize()) 
       self.percent = (self.current_job/self.queue_size) * 100 
       sys.stdout.flush() 
       status = ("\rDownloading " + url.split('/')[-1] + 
          " [status: " + str(self.current_job) + 
          "/" + str(self.queue_size) + ", " + 
          str(round(self.percent,2)) + "%]")    
      except queue.Empty: 
       pass 
      finally: 
       self.work_queue.task_done() 




def main: 
    if download_queue.qsize() > 0: 
     original_size = download_queue.qsize() 
     if options.active_downloads: 
      active_downloads = options.active_downloads 
     else: 
      active_downloads = 3 
     for x in range(active_downloads): 
      downloader = Downloader(download_queue, original_size) 
      downloader.start() 
     download_queue.join() 
+0

Я знаю, что потоки будут перезаписывать выходные данные друг друга, это нормально, поскольку он должен это делать. Я хочу показать только последний файл, который был установлен для загрузки, и какой номер он сравнивается с начальным значением размера очереди. В настоящее время происходит то, что queuesize является неправильным в третьем потоке (при использовании значений по умолчанию); он показывает 2 меньше, чем первые два. Например, вот что выглядит каждая из строк состояния при печати: Скачивание файла 1.txt [профиль: 1/10, 10%] Загрузка файла 2.txt [Статус: 2/10, 10%] Загрузка file 3.txt [status: 3/8, 37.5%] – MRR0GERS

+0

Да, к тому времени, как 3-го работника запустили, два других обработали элемент из очереди ... Вы не показываете код, который помещает элементы в очередь в этом фрагменте, но, предположительно, это то, откуда должен исходить ваш общий счет. Или просто сохраните общий размер очереди до того, как вы начнете какие-либо потоки, и не читайте его внутри потока. –

+0

Я написал сценарий в Python3, и он работает, за исключением нескольких вещей. Благодарим вас за ввод, я изменю свой код с вашими предложениями, когда вернусь домой сегодня вечером. – MRR0GERS

2

Если вы хотите использовать multiprocessing модуль, она включает в себя очень хорошо параллельно imap_unordered, что позволит сократить вашу проблему в очень элегантно:

import multiprocessing, sys 

class ParallelDownload: 
    def __init__(self, urls, processcount=3): 
     self.total_items = len(urls) 
     self.pool = multiprocessing.Pool(processcount) 
     for n, status in enumerate(self.pool.imap_unordered(self.download, urls)): 
      stats = (n, self.total_items, n/self.total_items) 
      sys.stdout.write(status + " [%d/%d = %0.2f %%]\n"%stats) 


    def download(self, url): 
     system_call = "wget -nc -q {0} -O {1}".format(url, local_file) 
     os.system(system_call) 
     status = "\rDownloaded " + url.split('/')[-1] 
     return status