2010-11-06 5 views
0

Как я могу обновить общую переменную между различными threading.Thread в python?python threading и shared variables

позволяет говорить, что у меня есть 5 потоков, работающих вниз Queue.Queue(). после завершения очереди я хочу выполнить другую операцию, но я хочу, чтобы это произошло только один раз.

Можно ли делиться и обновлять переменную между потоками. поэтому, когда Queue.empty() True, это событие запускается, но если один из потоков делает это, я не хочу, чтобы другие делали это, потому что я получал неправильные результаты.

EDIT
У меня есть очереди, которая отражает файлы в файловой системе. файлы загружаются на сайт по потокам, и в то время как каждый поток загружает файл, он обновляет набор() ключевых слов, которые я получил из файлов.
Когда очередь пуста, мне нужно связаться с сайтом и сообщить об этом, чтобы обновить количество ключевых слов. прямо сейчас каждый поток делает это, и я получаю обновление для каждого потока, который является плохим. Я также пытался очистить набор, но он не работает.

 keywordset = set() 
    hkeywordset = set() 
    def worker(): 
     while queue: 
      if queue.empty(): 
       if len(keywordset) or len(hkeywordset): 
        # as soon as the queue is empty we send the keywords and hkeywords to the 
        # imageapp so it can start updating 
        apiurl = update_cols_url 
        if apiurl[-1] != '/': 
         apiurl = apiurl+'/' 
        try: 
         keywords = [] 
         data = dict(keywords=list(keywordset), hkeywords=list(hkeywordset)) 
         post = dict(data=simplejson.dumps(data)) 
         post = urllib.urlencode(post) 
         urllib2.urlopen(apiurl, post) 
         hkeywordset.clear() 
         keywordset.clear() 
         print 'sent keywords and hkeywords to imageapp...' 
        except Exception, e: print e 
      # we get the task form the Queue and process the file based on the action 
      task = queue.get() 
      print str(task) 
      try: 
       reindex = task['reindex'] 
      except: 
       reindex = False 
      data = updater.process_file(task['filename'], task['action'], task['fnamechange'], reindex) 
      # we parse the images keywords and hkeywords and add them to the sets above for later 
      # processing 
      try: 
       for keyword in data['keywords']: 
        keywordset.add(keyword) 
      except: pass 
      try: 
       for hkw in data['hkeywords']: 
         hkeywordset.add(hkw) 
      except:pass 
      queue.task_done() 


    for i in range(num_worker_threads): 
     t = threading.Thread(target=worker) 
     t.daemon = True 
     t.start() 

    while 1: 
     line = raw_input('type \'q\' to stop filewatcher... or \'qq\' to force quit...\n').strip()

это то, что я пытался в основном. но, конечно, часть queue.empty() получает exectued столько раз, сколько у меня есть потоки.

ответ

0

Если вы используете очередь для запуска своей нити (thread pool), поэтому убедитесь, что условие гонки не будет (потокобезопасное), потому что очередь запускает поток в последовательном порядке, поэтому я думаю, что вы можете делиться переменной между потоком, и вы можете быть уверены, что не будет условия гонки по этой переменной.

Редактировать: Вот что-то подобное о том, что вы хотите сделать, надеюсь, что это может дать вам ответ на ваш вопрос в этот раз :):

import Queue 
import threading 
import ftplib 
import os 


class SendFileThread(threading.Thread): 
    """ Thread that will handle sending files to the FTP server""" 

    # Make set of keywords a class variable. 
    Keywords = set() 

    def __init__(self, queue, conn): 

      self.conn = conn 
      self.queue = queue 

      threading.Thread.__init__(self) 

     def run(self): 
      while True: 
       # Grabs file from queue. 
       file_name = self.queue.get() 

       # Send file to FTP server. 
       f=open(file_name,'rb') 
       self.conn.storbinary('STOR '+os.path.basename(file_name),f) 

       # Suppose that this keywords are in the first line. 
       # Update the set of keywords. 
       SendFileThread.Keywords.update(f.readline().split(" "))) 

       # Signals to queue job is done. 
       self.queue.task_done() 


def main(): 
    # Files to send. 
    files = os.listdir('/tosend') 

    queue = Queue.Queue() 

    # Connect to the FTP server. 
    conn = ftplib.FTP('ftp_uri') 
    conn.login() 

    # Create 5 threads that will handle file to send. 
    for i in range(5): 
     t = SendFileThread(queue, conn) 
     t.start() 

    # Fill the queue with files to be send. 
    for file in files: 
     queue.put(file) 

    # Wait until or thread are finish 
    queue.join() 

    # Send the keywords to the FTP server. 
    # I didn't understand well the part update keywords count, 
    # how this count is stored ... 
    # Here i will just send the keywords to the FTP server. 
    with open("keywords", "w") as keywords_file 
     keywords_file.write(";".join(SendFileThread.Keywords)) 
     conn.storbinary('STOR '+os.path.basename("keywords"), 
          keywords_file) 

    conn.close() 


if __name__ == '__main__': 
    main() 
+0

извините, но я действительно не понимаю, что вы имеете в виду. – aschmid00

+0

@ aschmid00: __sorry, но я действительно не понимаю, что вы имеете в виду. Это то же самое для меня? я хотел помочь, но я думаю, что я просто сделаю вас более смущенными, я думаю, что удалю этот ответ, если вы не дадите нам больше подробностей о том, что вы хотите сделать, и, возможно, какой-то код, чтобы понять больше проблемы, надеюсь, вы найдете ответы на свои вопросы :) – mouad

+0

спасибо !! я попробую это как можно скорее и дам вам знать. – aschmid00

0

Почему вы не можете просто добавить заключительный шаг очередь ?

+0

Это то, что я пытаюсь с помощью empty(), где я обрабатываю данные в наборе() и выполняю окончательный тест. проблема в том, что если у меня несколько потоков, этот последний шаг обрабатывается один раз для каждого потока. Я хочу, чтобы это произошло только один раз. – aschmid00

0

Есть еще одна очередь, где вы размещаете это событие после того, как первая очередь пуста.
Или создайте специальную тему для этого события.