2016-08-22 5 views
0

В части моего программного кода, написанного с помощью python, у меня есть список элементов, размер которых может сильно варьироваться от 12 до одного элемента. Для каждого элемента в этом списке я делаю некоторую обработку (отправка HTTP-запроса, связанного с данным элементом, результаты синтаксического анализа и многие другие операции. Я хотел бы ускорить мой код с помощью потоковой передачи, я хотел бы создать 2 потока где каждый из них принять ряд элементов и сделать асинхр обработкиКак добавить потоки в зависимости от числа

Пример 1:. Допустим, что в моем списке у меня есть 12 пунктов, каждый поток будет принимать в этом случае 6 пунктов и вызов функции обработки на каждом пункт

Пример 2:. Теперь давайте предположим, что мой список есть 9 пунктов, один поток будет принимать 5 пунктов, а другой поток принимает бы другие 4 оставил пункты

.

В настоящее время я не прилагая каких-либо многопоточность, и мой код базы очень большой, так что здесь какой-то код, который делают почти то же самое, как мой случай:

#This procedure need to be used with threading . 
itemList = getItems() #This function return an unknown number of items between 1 and 12 

if len(itemList) > 0: # Make sure that the list is empty in this case . 
    for item in itemList: 
     processItem(item) #This is an imaginary function that do the processing on each item 

Ниже приведен базовый код облегченный, что объяснить, что я «Я делаю, я не могу понять, как я могу сделать мои потоки гибкими, поэтому каждый из них принимает несколько элементов, а другой берет остальные (как описано в примере 1 & 2).

Спасибо за ваше время

+0

Посмотрите на исполнителей темы. Запустите исполнителя с двумя потоками и просто отправьте все свои предметы на него. Это имеет то преимущество, что делает всю бухгалтерскую отчетность для вас и выясняет, какая нить должна выполнять следующую работу таким образом, чтобы поддерживать как максимально занятую работу. –

ответ

2

Вы могли бы скорее реализовать его с помощью общих очередей https://docs.python.org/3/library/queue.html#queue-objects

import queue 
import threading 

def worker(): 
    while True: 
     item = q.get() 
     if item is None: 
      break 
     do_work(item) 
     q.task_done() 

q = queue.Queue() 
threads = [] 
for i in range(num_worker_threads): 
    t = threading.Thread(target=worker) 
    t.start() 
    threads.append(t) 

for item in source(): 
    q.put(item) 

# block until all tasks are done 
q.join() 

# stop workers 
for i in range(num_worker_threads): 
    q.put(None) 
for t in threads: 
    t.join() 

Цитируя https://docs.python.org/3/library/queue.html#module-queue:

Модуль очереди реализует мульти-производитель, мульти- очереди потребителей. Он особенно полезен при программировании с резьбой, когда информация должна быть безопасно обмениваться между несколькими потоками.

Идея состоит в том, что у вас есть разделяемое хранилище, и каждый поток пытается прочитать элементы из него один за другим. Это гораздо более гибкая задача, чем распределение загрузки заранее, так как вы не знаете, как будет выполняться выполнение ваших задач ОС, сколько времени потребуется каждая итерация и т. Д. Кроме того, вы можете добавить элементы для дальнейшей обработки в эту очередь динамически - например, с параллельным потоком производителя.

Некоторые полезные ссылки:

Краткое введение в параллельное программирование в Python: http://www.slideshare.net/dabeaz/an-introduction-to-python-concurrency

Более подробную информацию о производитель-потребитель шаблон с линией за линией объяснения: http://www.informit.com/articles/article.aspx?p=1850445&seqNum=8

+0

Это именно то, что я собирался предложить, но вы были быстрее :-). – pts

+0

Некоторые объяснения этого кода. Все ожидающие рабочие элементы находятся в очереди. Рабочие потоки не знают, сколько элементов они будут обрабатывать. Они берут и обрабатывают предметы один за другим, до конца («Нет»). – pts

+0

Большое спасибо за ваш вклад, но я очень новичок в потоковом режиме, я бы хотел, чтобы вы могли объяснить свой код или добавить комментарии, которые лучше понять для других новичков. –

1

Вы можете используйте класс ThreadPoolExecutor из модуля concurrent.futures в Python 3. Модуль отсутствует в Python 2, но есть некоторые обходные пути (которые я не буду обсуждать).

Исполнитель пула потоков выполняет в основном то, что предложил @ffeast, но с меньшим количеством строк кода для написания. Он управляет пулом потоков, который выполнит все задачи, которые вы ему подадите, по-видимому, наиболее эффективным образом.Результаты будут возвращены через объекты Future, которые представляют собой «ожидающий» результат.

Поскольку вы, кажется, знаете список задач спереди, это особенно удобно для вас. Хотя вы не можете гарантировать, как задачи будут разделены между потоками, результат будет, по крайней мере, таким же хорошим, как и все, что вы закодировали вручную.

from concurrent.futures import ThreadPoolExecutor 
with ThreadPoolExecutor(max_workers=2) as executor: 
    for item in getItems(): 
     executor.submit(processItem, item) 

Если Вам необходима дополнительная информация с выходом, как какой-то способ определения фьючерсов, которые завершили или получение результатов из них, увидеть example в документации Python (на котором код выше сильно на основе) ,

Смежные вопросы