2014-02-14 4 views
0

Новичок для нарезки здесь. Я заимствование много кода из этого thread при попытке построить свой первый сценарий с помощью поточной/очереди:Тема висит/Очередь висит

import threading, urllib2 
import Queue 
import sys 
from PIL import Image 
import io, sys 

def avhash(url,queue): 
    if not isinstance(url, Image.Image): 
     try: 
      im = Image.open(url) 
     except IOError: 
      fd=urllib2.urlopen(url) 
      image_file=io.BytesIO(fd.read()) 
      im=Image.open(image_file) 
      im = im.resize((8, 8), Image.ANTIALIAS).convert('L') 
      avg = reduce(lambda x, y: x + y, im.getdata())/64. 
    hash = reduce(lambda x, (y, z): x | (z << y), 
        enumerate(map(lambda i: 0 if i < avg else 1, im.getdata())), 
        0) 

    queue.put({url:hash}) 
    queue.task_done() 

def fetch_parallel(job_list): 
    q = Queue.Queue() 
    threads = [threading.Thread(target=avhash, args = (job,q)) for job in job_list[0:50]] 
    for t in threads: 
     t.daemon = True 
     t.start() 

    for t in threads: 
     t.join() 
    return [q.get() for _ in xrange(len(job_list))] 

В этом случае job_list список URL-адресов. Я обнаружил, что этот код отлично работает, когда этот список равен или меньше 50, но он зависает, когда> 50. Должно быть, я принципиально не понимаю, как работает потоки?

ответ

0

Ваша проблема эта линия:

return [q.get() for _ in xrange(len(job_list))] 

Если job_list имеет более чем 50 элементов, то вы пытаетесь прочитать больше результатов из очереди, чем вы положили в этой связи:.

return [q.get() for _ in xrange(len(job_list[:50]))] 

или, еще лучше:

MAX_LEN = 50 
... 
threads = [... for job in job_list[:MAXLEN]] 
... 
return [q.get() for _ in job_list[:MAXLEN]] 

[EDIT]

Кажется, вы хотите, чтобы ваша программа делала что-то отличное от того, что она делает. Ваша программа занимает первые 50 записей в job_list, обрабатывает каждую из них в потоке и игнорирует все остальные задания. Из вашего комментария ниже я предполагаю, что вы хотите обрабатывать все задания, но только по 50 за раз. Для этого вы должны использовать пул потоков. В Python> = 3.2 вы можете использовать concurrent.futures.ThreadPoolExecutor[link].

В Python < 3.2 вы должны свернуть свой собственный:

CHUNK_SIZE = 50 

def fetch_parallel(job_list): 
    results = [] 
    queue = Queue.Queue() 
    while job_list: 
     threads = [threading.Thread(target=avhash, args=(job, queue)) 
         for job in job_list[:CHUNK_SIZE]] 
     job_list = job_list[CHUNK_SIZE:] 
     for thread in threads: 
      thread.daemon = True 
      thread.start() 
     for thread in threads: 
      thread.join() 
     results.extend(queue.get() for _ in threads) 
    return results 

(непроверенные)

[/ EDIT]

+0

Привет, код выполняется без ошибок, но это только возвращает первый 50 результатов. – ChrisArmstrong

+0

См. Мое редактирование. – pillmuncher

+0

Это работает, спасибо – ChrisArmstrong