2013-06-11 2 views
14

Я пробовал многопроцессорное программирование с помощью Python. Возьмите алгоритм разделения и покоя, например, Fibonacci. Поток выполнения программы разветвляется как дерево и выполняется параллельно. Другими словами, мы имеем пример nested parallelism.Вложенный параллелизм в Python

Из Java я использовал шаблон threadpool для управления ресурсами, так как программа могла быстро разветвляться и создавать слишком много короткоживущих потоков. Один статический (общий) поток можно создать через ExecutorService.

Ожидаю то же самое для Pool, но оказывается, что Pool object is not to be globally shared. Например, совместное использование пула с использованием multiprocessing.Manager.Namespace() приведет к ошибке.

объекты бассейна не могут быть переданы между процессами или маринованные

У меня есть 2 части вопроса:

  1. Что я здесь отсутствует; почему не следует делиться пулом между процессами?
  2. Что такое шаблон для реализации вложенного параллелизма в Python? Если возможно, поддерживайте рекурсивную структуру, а не торгуйте ее для итерации.

from concurrent.futures import ThreadPoolExecutor 

def fibonacci(n): 
    if n < 2: 
     return n 
    a = pool.submit(fibonacci, n - 1) 
    b = pool.submit(fibonacci, n - 2) 
    return a.result() + b.result() 

def main(): 
    global pool 

    N = int(10) 
    with ThreadPoolExecutor(2**N) as pool: 
     print(fibonacci(N)) 

main() 

Java

public class FibTask implements Callable<Integer> { 

    public static ExecutorService pool = Executors.newCachedThreadPool(); 
    int arg; 

    public FibTask(int n) { 
     this.arg= n; 
    } 

    @Override 
    public Integer call() throws Exception { 
     if (this.arg > 2) { 
      Future<Integer> left = pool.submit(new FibTask(arg - 1)); 
      Future<Integer> right = pool.submit(new FibTask(arg - 2)); 
      return left.get() + right.get(); 
     } else { 
      return 1; 
     } 

    } 

    public static void main(String[] args) throws Exception { 
     Integer n = 14; 
     Callable<Integer> task = new FibTask(n); 
     Future<Integer> result =FibTask.pool.submit(task); 
     System.out.println(Integer.toString(result.get())); 
     FibTask.pool.shutdown();    
    }  

} 

Я не уверен, если это имеет значение здесь, но я не обращая внимания на разницу между "процессом" и "нити"; для меня они оба означают «виртуализированный процессор». Я понимаю, что цель пула заключается в совместном использовании «пула» или ресурсов. Запуск задач может сделать запрос в пул. По мере выполнения параллельных задач в других потоках эти потоки могут быть восстановлены и назначены для новых задач. Мне не имеет смысла запрещать совместное использование пула, чтобы каждый поток должен создавать свой собственный новый пул, поскольку это, казалось бы, превзошло цель пула потоков.

+0

Зачем вам это нужно для общего доступа?Разве вы не можете содержать все это внутри одного пространства имен/класса? –

+2

@InbarRose Проблема в том, что в рекурсивной функции, которая выполняет рекурсивный вызов внутри другого процесса, пул разветвляется и также вызывается подпроцессом. Это вызывает проблемы с очередями, следовательно, это не работает. В любом случае я хотел бы подчеркнуть, что в Java вы используете * threads *. С потоками нет никаких проблем, так как нет разметки объекта пула. Я считаю, что использование пула процессов в Java приведет к более или менее одинаковому поведению. – Bakuriu

+0

@InbarRose Я также пробовал содержать «пул» как экземпляр класса и статическую переменную, но все равно доходил до проблемы с проблемой. Например, с пулом и рекурсивными вызовами, содержащимися в пределах одного класса, но это все равно приводит к одной и той же проблеме:> объекты пула не могут быть переданы между процессами ... –

ответ

3

1) Что мне здесь не хватает; почему не следует делиться пулом между процессами?

Не все объектные/экземпляры pickable/сериализации, в данном случае, пул использует threading.lock, который не является pickable:

>>> import threading, pickle 
>>> pickle.dumps(threading.Lock()) 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
[...] 
    File "/Users/rafael/dev/venvs/general/bin/../lib/python2.7/copy_reg.py", line 70, in _reduce_ex 
    raise TypeError, "can't pickle %s objects" % base.__name__ 
TypeError: can't pickle lock objects 

или лучше:

>>> import threading, pickle 
>>> from concurrent.futures import ThreadPoolExecutor 
>>> pickle.dumps(ThreadPoolExecutor(1)) 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 1374, in dumps 
    Pickler(file, protocol).dump(obj) 
    File 
[...] 
"/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 306, in save 
     rv = reduce(self.proto) 
     File "/Users/rafael/dev/venvs/general/bin/../lib/python2.7/copy_reg.py", line 70, in _reduce_ex 
     raise TypeError, "can't pickle %s objects" % base.__name__ 
    TypeError: can't pickle lock objects 

Если вы думаете об этом имеет смысл, блокировка - это примитив семафора, управляемый операционной системой (поскольку python использует собственные потоки). Возможность рассортировать и сохранять это состояние объекта внутри исполняемого файла python действительно не будет достигать ничего значимого, поскольку его истинное состояние хранится ОС.

2) Что такое шаблон для реализации вложенного параллелизма в Python?Если это возможно, сохраняя рекурсивную структуру, а не торговать его для итерации

Теперь, для престижа, все я уже говорил выше, не реально применить к примеру, так как вы используете темы (ThreadPoolExecutor), а не процессы (ProcessPoolExecutor), поэтому обмен данными между процессами не должен происходить.

Ваш пример java просто более эффективен, так как пул потоков, который вы используете (CachedThreadPool), создает новые потоки по мере необходимости, тогда как реализации исполнителей python ограничены и требуют явного максимального количества потоков (max_workers). Есть несколько различий в синтаксисе между языками, которые также, кажется, отбрасывают вас (статические экземпляры в python - это, по сути, все, что явно не охвачено), но по существу оба примера создавали бы точно такое же количество потоков для выполнения. Например, вот пример с использованием довольно наивные реализации CachedThreadPoolExecutor в питоне: настройка

from concurrent.futures import ThreadPoolExecutor 

class CachedThreadPoolExecutor(ThreadPoolExecutor): 
    def __init__(self): 
     super(CachedThreadPoolExecutor, self).__init__(max_workers=1) 

    def submit(self, fn, *args, **extra): 
     if self._work_queue.qsize() > 0: 
      print('increasing pool size from %d to %d' % (self._max_workers, self._max_workers+1)) 
      self._max_workers +=1 

     return super(CachedThreadPoolExecutor, self).submit(fn, *args, **extra) 

pool = CachedThreadPoolExecutor() 

def fibonacci(n): 
    print n 
    if n < 2: 
     return n 
    a = pool.submit(fibonacci, n - 1) 
    b = pool.submit(fibonacci, n - 2) 
    return a.result() + b.result() 

print(fibonacci(10)) 

Производительность:

Я настоятельно рекомендую смотреть в gevent, так как это даст вам высокий параллелизм без накладных расходов потоков. Это не всегда так, но ваш код на самом деле является плакатным ребенком для использования gevent. Вот пример:

import gevent 

def fibonacci(n): 
    print n 
    if n < 2: 
     return n 
    a = gevent.spawn(fibonacci, n - 1) 
    b = gevent.spawn(fibonacci, n - 2) 
    return a.get() + b.get() 

print(fibonacci(10)) 

Полностью ненаучный, но на моем компьютере выше код работает быстрее, чем резьбовой эквивалент.

Надеюсь, это поможет.

+1

gevent не дает вам никакого параллелизма. –

+0

Правильно, нет вычислительного параллелизма, но исходный вопрос заключался не в том, чтобы изменить выбранный алгоритм, а вместо этого предложить общий шаблон для его улучшения. –

+0

Не требуется изменение алгоритма: пример уже разбивает работу на самостоятельные подзадачи. Все, что необходимо, - это субстрат, который фактически выполняет задачи параллельно (т. Е. Не решение параллелизма, такое как gevent). –

0

1. Что мне здесь не хватает; почему не следует делиться пулом между процессами?

Как правило, вы не можете совместно использовать потоки ОС между процессами, независимо от языка.

Вы можете организовать доступ к диспетчеру пула с рабочими процессами, но это, вероятно, не является хорошим решением любой проблемы; Смотри ниже.

2. Что такое шаблон для реализации вложенного параллелизма в Python? Если возможно, поддерживая рекурсивную структуру, а не торгуя ею для итерации.

Это зависит от ваших данных.

В CPython общий ответ заключается в использовании структуры данных, которая реализует эффективные параллельные операции. Хорошим примером этого является NumPy оптимизированных типов массивов: here - пример их использования для разделения операции большого массива на несколько процессорных ядер.

Функция Fibonacci, реализованная с использованием блокирующей рекурсии, является особенно пессимируемой для любого подхода, основанного на пуле работника, хотя: fib (N) потратит большую часть своего времени, просто связывая N рабочих, которые ничего не делают, кроме ожидания других работников. Есть много других способов подойти к функции Фибоначчи специально (например, используя CPS, чтобы устранить блокировку и заполнить постоянное число работников), но, вероятно, лучше решить вашу стратегию, основанную на реальных проблемах, которые вы будете решать, а не на примерах как это.

Смежные вопросы