2015-10-11 3 views
1

У меня есть функция, которая занимает некоторое время, чтобы вычислить, и должен быть итерация> 20k раз с двумя переменными параметрами:статус Мониторинг ipyparallel завершения

from ipyparallel import Client 
import numpy as np 

m_array = np.arange(0, 101, 1) 
s_array = np.arange(0, 201, 1) 
rc = Client() 
rc[:].push(dict(stuff=stuff)) 
view = rc.load_balanced_view() 
async_results = [] 

for m in m_array: 
    for s in s_array: 
     chi = view.apply_async(run_simulation, m=m, s=s) 
     async_results.append(chi) 
rc.wait(async_results) 
results = [ar.get() for ar in async_results] 

Я вижу, есть wait_interactive доступный метод, однако я не смогли выяснить, как его использовать. Каков наилучший способ распечатать обновление статуса за определенный интервал?

Update

Я добавил all_ids список, и get_result().wait_interative() методы.

async_results = [] 
all_ids = [] 
for m in m_array: 
    for s in s_array: 
     chi = view.apply_async(run_simulation, m=m, s=s) 
     async_results.append(chi) 
     all_ids.extend(chi.msg_ids) 
rc.get_result(all_ids).wait_interactive() 
rc.wait(async_results) 
results = [ar.get() for ar in async_results] 

Это производит периодические обновления состояния, как ожидалось, однако теперь производит трассировку.

--------------------------------------------------------------------------- 
KeyError         Traceback (most recent call last) 
<ipython-input-36-85db6ca605cd> in <module>() 
    220 rc.get_result(all_ids).wait_interactive() 
    221 rc.wait(async_results) 
--> 222 results = [ar.get() for ar in async_results] 
223 results = np.array(results) 
224 results.shape = (len(m_array), len(s_array)) 

//anaconda/lib/python2.7/site-packages/ipyparallel/client/asyncresult.pyc in get(self, timeout) 
    95   by get() inside a `RemoteError`. 
    96   """ 
---> 97   if not self.ready(): 
    98    self.wait(timeout) 
    99 

//anaconda/lib/python2.7/site-packages/ipyparallel/client/asyncresult.pyc in ready(self) 
    113   """Return whether the call has completed.""" 
    114   if not self._ready: 
--> 115    self.wait(0) 
    116   elif not self._outputs_ready: 
    117    self._wait_for_outputs(0) 

//anaconda/lib/python2.7/site-packages/ipyparallel/client/asyncresult.pyc in  wait(self, timeout) 
    152     if self.owner: 
    153 
--> 154      self._metadata = [self._client.metadata.pop(mid) for mid in self.msg_ids] 
155      [self._client.results.pop(mid) for mid in self.msg_ids] 
    156 

KeyError: '884328c8-d768-48d5-b477-a256ebaea7a9' 

идентификаторы сообщений или результаты очищаются где-то до того, как метод ar.get() получает, чтобы найти их?

ответ

2

wait_interactive - это метод для объектов AsyncResult. Это будет метод на самом Клиенте в ближайшее время, но в настоящее время его нет. Это означает, что для использования wait_interactive вам необходимо собрать AsyncResult, который будет обертывать все ваши результаты. Самый простой способ сделать это, чтобы сохранить единый список всех msg_ids, соответствующих вашим запросам.:

all_ids = [] 
for m in m_array: 
    for s in s_array: 
     chi = view.apply_async(run_simulation, m=m, s=s) 
     async_results.append(chi) 
     all_ids.extend(chi.msg_ids) 

rc.get_result(all_ids, owner=False).wait_interactive() 
+0

я добавил список в all_id, и в rc.get_result() wait_interactive линии, но я теперь получаю другой отслеживающий. В обновленном обновлении – DataSwede

+0

добавлен отсутствующий аргумент 'owner = False', который может потребоваться, чтобы избежать удаления записей из кеша. – minrk