2014-10-02 4 views
1

У меня есть большой список L для работы. Пусть f() - функция, действующая на L. f() принимает другую переменную, которая истекает каждые 15 минут и нуждается в обновлении. Вот пример, в сериале:Многопроцессорный режим, подлежащий таймеру

def main(): 
    L = openList() 
    # START THE CLOCK 
    clockStart = dt.datetime.now() 
    clockExp = clockStart + dt.timedelta(seconds=900) 
    a = getRenewed() 
    for item in L: 
     f(item, a) # operate on item given a 
     # CHECK TIME REMAINING 
     clockCur = dt.datetime.now() 
     clockRem = (clockExp - clockCur).total_seconds() 
     # RENEW a IF NEEDED 
     if clockRem < 5: # renew with 5 seconds left 
      clockStart = dt.datetime.now() 
      clockExp = clockStart + dt.timedelta(seconds=900) 
      a = getRenewed() 

Так как F() занимает несколько секунд (или дольше, иногда), я хотел бы распараллеливание кода. Какие-нибудь советы о том, как это сделать, учитывая таймер? Я предвижу совместное использование clockExp и «a», и когда процесс удовлетворяет clockRem < 5, он вызывает getRenewed() и делится новыми «a» и clockExp и повторяется.

+0

Действительно ли 'f' или' getRenewed' полагаются на какое-либо конкретное состояние процесса или полагаются только на (или модифицируют) внешнее состояние? – Blckknght

+0

f загружает веб-сайт, продиктованный элементом в L. getRenewed, для получения токена аутентификации. – Kevin

ответ

3

Если getRenewed является идемпотентным (то есть вы можете называть его несколько раз без побочных эффектов), вы можете просто переместить существующий код таймера в свои рабочие процессы и позволить каждому из них называть его один раз, когда они замечают, что их собственный таймер спуститься. Это только требует синхронизации для элементов из списка, который вы передаете в и multiprocessing.Pool могут справиться с этим достаточно легко:

def setup_worker(): 
    global clockExp, a 

    clockStart = dt.datetime.now() 
    clockExp = clockStart + dt.timedelta(seconds=900) 
    a = getRenewed() 

def worker(item): 
    global clockExp, a 

    clockCur = dt.datetime.now() 
    clockRem = (clockExp - clockCur).total_seconds() 

    if clockRem < 5: # renew with 5 seconds left 
     clockStart = dt.datetime.now() 
     clockExp = clockStart + dt.timedelta(seconds=900) 
     a = getRenewed() 

    f(item, a) 

def main(L): 
    pool = multiprocessing.Pool(initializer=setup_worker) 

    pool.map(worker, L) 

Если getRenewed не идемпотентна, то нужно будет немного сложнее. Вы не сможете вызвать его в каждом рабочем процессе, поэтому вам нужно будет настроить какой-то метод связи между вашими процессами, чтобы каждый мог получить последнюю версию, когда она доступна.

Я бы предложил использовать multiprocessing.queue для передачи значения a от основного процесса до рабочих. Вы по-прежнему можете использовать Pool для элементов списка, вам просто нужно убедиться, что вы используете его асинхронно из основного процесса. Как это, возможно:

def setup_worker2(queue): 
    global x 
    x = random.random() 
    global a_queue, a, clockExp 

    a_queue = queue 
    a = a_queue.get() # wait for the first `a` value 
    clockStart = dt.datetime.now() 
    clockExp = clockStart + dt.timedelta(seconds=900) 

def worker2(item): 
    global a, clockExp 

    clockCur = dt.datetime.now() 
    clockRem = (clockExp - clockCur).total_seconds() 
    if clockRem < 60: # start checking for a new `a` value 60 seconds before its needed 
     try: 
      a = a_queue.get_nowait() 
      clockStart = dt.datetime.now() 
      clockExp = clockStart + dt.timedelta(seconds=900) 
     except queue.Empty: 
      pass 

    return f(item, a) 

def main2(L): 
    queue = multiprocessing.Queue()  # setup the queue for the a values 

    pool = multiprocessing.Pool(initializer=setup_worker2, initargs=(queue,)) 

    result = pool.map_async(worker2, L) # send the items to the pool asynchronously 

    while True:     # loop for sending a values through the queue 
     a = getRenewed()   # get a new item 
     for _ in range(os.cpu_count()): 
      queue.put(a)   # send one copy per worker process 

     try: 
      result.wait(900-5) # sleep for ~15 minutes, or until the result is ready 
     except multiprocessing.TimeoutError: 
      pass     # if we got a timeout, keep looping! 
     else: 
      break     # if not, we are done, so break out of the loop! 

Рабочие все еще должны иметь некоторый код синхронизации, так как в противном случае вы бы столкнуться условия гонки, где один работник может потреблять два из a значений ниспосланных очереди в одном партия из основного процесса. Это может произойти, если некоторые из вызовов f значительно медленнее, чем другие (что, вероятно, вероятно, если они связаны с загрузкой вещей из Интернета).

+0

Спасибо за подсказку. Оказывается, у меня может быть несколько активных токенов, поэтому первое решение работает! – Kevin

Смежные вопросы