2011-12-21 2 views
2

У меня есть некоторые проблемы с использованием потоков и модуля scipy.stats.randint. Действительно, при запуске нескольких потоков локальный массив (bootIndexs в коде ниже), по-видимому, используется для всех запущенных потоков.threading with python: проблемы с локальными переменными

Это поднятая ошибка

> Exception in thread Thread-559: 
Traceback (most recent call last): 
... 
    File "..\calculDomaine3.py", line 223, in bootThread 
    result = bootstrap(nbB, distMod) 
    File "...\calculDomaine3.py", line 207, in bootstrap 
    bootIndexs = spstats.randint.rvs(0, nbTirages-1, size = nbTirages) 
    File "C:\Python27\lib\site-packages\scipy\stats\distributions.py", line 5014, in rvs 
    return super(rv_discrete, self).rvs(*args, **kwargs) 
    File "C:\Python27\lib\site-packages\scipy\stats\distributions.py", line 582, in rvs 
    vals = reshape(vals, size) 
    File "C:\Python27\lib\site-packages\numpy\core\fromnumeric.py", line 171, in reshape 
    return reshape(newshape, order=order) 
ValueError: total size of new array must be unchanged 

И это мой код:

import threading 
import Queue 
from scipy import stats as spstats 

nbThreads = 4 

def test(nbBoots, nbTirages, modules): 

    def bootstrap(nbBootsThread, distribModules) : 

     distribMax = []    

     for j in range(nbBootsThread): 
      bootIndexs = spstats.randint.rvs(0, nbTirages-1, size = nbTirages) 
      boot = [distribModules[i] for i in bootIndexs] 

      distribMax.append(max(boot)) 

     return distribMax 

    q = Queue.Queue() 

    def bootThread (nbB, distMod): 
     result = bootstrap(nbB, distMod) 
     q.put(result, False) 
     q.task_done() 

    works = [] 

    for i in range(nbThreads) :  
     works.append(threading.Thread(target = bootThread, args = (nbBoots//nbThreads, modules[:],))) 


    for w in works: 
     w.daemon = True 
     w.start() 

    q.join() 

     distMaxResult = [] 

     for j in range(q.qsize()): 
      distMaxResult += q.get() 

     return distMaxResult 

class classTest: 
    def __init__(self): 
     self.launch() 

    def launch(self): 
     print test(100, 1000, range(1000)) 

Спасибо за ваши ответы.

+0

Я ничего не вижу в вашем коде, чтобы объяснить поведение. Не могли бы вы придумать самодостаточный пример, с которым мы могли бы работать и экспериментировать? – NPE

+0

Я редактирую его, вы можете использовать его вот так, если у вас есть scipy – user1062526

+0

Я могу запустить код (как только я изменил 'nbThread' на' nbThreads'), и я получаю другую ошибку ('Исключение в потоке Thread- 4 (скорее всего, возникает при отключении интерпретатора)). – NPE

ответ

2

Действительно, при запуске нескольких потоков локальный массив (bootIndexs в коде ниже), по-видимому, используется для всех запущенных потоков.

Это целая точка потоков: легкие задачи, которые делят все с их нерестилищем! :) Если вы ищете решение «ничего общего», вы, возможно, посмотрите на multiprocessing module (имейте в виду, что процесс будет намного тяжелее в системе, чем размножение нити).

Однако, вернемся к вашей проблеме ... у меня это немного больше, чем выстрел в темноте, но вы могли бы попытаться изменить эту строку:

boot = [distribModules[i] for i in bootIndexs] 

к:

boot = [distribModules[i] for i in bootIndexs.copy()] 

(используя копию массива, а не сам массив). Это вряд ли будет проблемой (вы просто перебираете массив, а не используете его), но это единственный момент, который я вижу, когда вы используете его в своей нити, поэтому ...

Это, конечно, работает, если ваш контент массива не должен изменяться потоками, манипулирующими им. Если изменение значения «глобального» массива - правильное поведение, тогда вы должны произвольно реализовать Lock(), чтобы запретить одновременный доступ к этому ресурсу. Ваши потоки должны тогда сделать что-то вроде:

lock.acquire() 
# Manipulate the array content here 
lock.release() 
+0

Я пытаюсь копировать(), но это то же самое. Я думаю, что проблема в том, что массив создан (с scipy, в строке выше). Этот массив, вероятно, создан с помощью Thread и другого Thread, запущенного в то же время, попытайтесь создать массив другого одного и того же массива ... Многопроцессорность сложнее с python на окнах, я чувствую ... – user1062526

+0

@ user1062526 - Чтение ваших комментарий к вопросу: факт, что вы получаете ошибку с перерывами, является явным признаком того, что это ошибка, связанная с условиями гонки. Я удивлен, однако, что к массиву обращаются различные потоки, так как «bootIndex» ссылается только внутри функции, поэтому его область видимости должна быть однозначно локальной ...: -/ – mac

1

У меня нет опыта с резьбой, так что это может быть совершенно не по себе.

scipy.stats.randint, как и другие дистрибутивы в scipy.stats, является экземпляром соответствующего класса распределения. Это означает, что каждый поток обращается к одному экземпляру. Во время вызова rvs устанавливается атрибут _size. Если другой поток с другим размером обращается к экземпляру тем временем, то вы получите ValueError, что размеры не совпадают в изменении. Мне кажется, что это гонка.

Я бы рекомендовал использовать numpy.random непосредственно в этом случае (это вызов в scipy.stats.randint)

numpy.random.randint(min, max, self._size) 

может быть, вам повезет больше там.

Если вам нужен дистрибутив, который недоступен в numpy.random, тогда вам нужно будет создавать новые экземпляры дистрибутива в каждом потоке, если мои предположения верны.

+0

Действительно, мне иногда нужно другое распространение. .. Лучший способ, я думаю, это создать локальные переменные для каждого потока ... Но я не добился успеха. Я пытаюсь threading.locals, я пытаюсь передать пустой массив в args для bootIndexs, но ничего не работает. – user1062526

+0

, если вы используете spstats.randint (или любой другой дистрибутив scipy.stats) в разных потоках, все они все ссылаются на один и тот же экземпляр в scipy.stats, независимо от того, являются ли другие переменные или локальными или нет. – user333700

Смежные вопросы