Если getRenewed
является идемпотентным (то есть вы можете называть его несколько раз без побочных эффектов), вы можете просто переместить существующий код таймера в свои рабочие процессы и позволить каждому из них называть его один раз, когда они замечают, что их собственный таймер спуститься. Это только требует синхронизации для элементов из списка, который вы передаете в и multiprocessing.Pool
могут справиться с этим достаточно легко:
def setup_worker():
global clockExp, a
clockStart = dt.datetime.now()
clockExp = clockStart + dt.timedelta(seconds=900)
a = getRenewed()
def worker(item):
global clockExp, a
clockCur = dt.datetime.now()
clockRem = (clockExp - clockCur).total_seconds()
if clockRem < 5: # renew with 5 seconds left
clockStart = dt.datetime.now()
clockExp = clockStart + dt.timedelta(seconds=900)
a = getRenewed()
f(item, a)
def main(L):
pool = multiprocessing.Pool(initializer=setup_worker)
pool.map(worker, L)
Если getRenewed
не идемпотентна, то нужно будет немного сложнее. Вы не сможете вызвать его в каждом рабочем процессе, поэтому вам нужно будет настроить какой-то метод связи между вашими процессами, чтобы каждый мог получить последнюю версию, когда она доступна.
Я бы предложил использовать multiprocessing.queue
для передачи значения a
от основного процесса до рабочих. Вы по-прежнему можете использовать Pool
для элементов списка, вам просто нужно убедиться, что вы используете его асинхронно из основного процесса. Как это, возможно:
def setup_worker2(queue):
global x
x = random.random()
global a_queue, a, clockExp
a_queue = queue
a = a_queue.get() # wait for the first `a` value
clockStart = dt.datetime.now()
clockExp = clockStart + dt.timedelta(seconds=900)
def worker2(item):
global a, clockExp
clockCur = dt.datetime.now()
clockRem = (clockExp - clockCur).total_seconds()
if clockRem < 60: # start checking for a new `a` value 60 seconds before its needed
try:
a = a_queue.get_nowait()
clockStart = dt.datetime.now()
clockExp = clockStart + dt.timedelta(seconds=900)
except queue.Empty:
pass
return f(item, a)
def main2(L):
queue = multiprocessing.Queue() # setup the queue for the a values
pool = multiprocessing.Pool(initializer=setup_worker2, initargs=(queue,))
result = pool.map_async(worker2, L) # send the items to the pool asynchronously
while True: # loop for sending a values through the queue
a = getRenewed() # get a new item
for _ in range(os.cpu_count()):
queue.put(a) # send one copy per worker process
try:
result.wait(900-5) # sleep for ~15 minutes, or until the result is ready
except multiprocessing.TimeoutError:
pass # if we got a timeout, keep looping!
else:
break # if not, we are done, so break out of the loop!
Рабочие все еще должны иметь некоторый код синхронизации, так как в противном случае вы бы столкнуться условия гонки, где один работник может потреблять два из a
значений ниспосланных очереди в одном партия из основного процесса. Это может произойти, если некоторые из вызовов f
значительно медленнее, чем другие (что, вероятно, вероятно, если они связаны с загрузкой вещей из Интернета).
Действительно ли 'f' или' getRenewed' полагаются на какое-либо конкретное состояние процесса или полагаются только на (или модифицируют) внешнее состояние? – Blckknght
f загружает веб-сайт, продиктованный элементом в L. getRenewed, для получения токена аутентификации. – Kevin