2013-04-18 2 views
17

Я использую пакет R foreach() с %dopar%, чтобы делать длинные (~ дни) вычисления параллельно. Мне хотелось бы остановить весь набор вычислений в случае, если одна из них вызывает ошибку. Однако я не нашел способ добиться этого, и из документации и различных форумов я не нашел никаких указаний на то, что это возможно. В частности, break() не работает, и stop() останавливает текущий расчет, а не весь цикл foreach.Есть ли способ вырваться из цикла foreach?

Обратите внимание, что я не могу использовать простой цикл, потому что в конечном итоге я хочу распараллелить это с помощью пакета doRNG.

Вот упрощенная, воспроизводимая версия того, что я пытаюсь (здесь последовательно с %do%, но у меня такая же проблема при использовании doRNG и %dopar%). Обратите внимание, что на самом деле я хочу параллельно использовать все элементы этого цикла (здесь 10).

library(foreach) 
myfunc <- function() { 
    x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %do% { 
    cat("Element ", k, "\n") 
    Sys.sleep(0.5) # just to show that stop does not cause exit from foreach 
    if(is.element(k, 2:6)) { 
     cat("Should stop\n") 
     stop("Has stopped") 
    } 
    k 
    } 
    return(x) 
} 
x <- myfunc() 
# stop() halts the processing of k=2:6, but it does not stop the foreach loop itself. 
# x is not returned. The execution produces the error message 
# Error in { : task 2 failed - "Has stopped" 

То, что я хотел бы добиться того, что весь цикл Еогеасп может быть сразу же вышел на некоторое условие (здесь, когда stop() встречается).

Я не нашел способа достичь этого с помощью foreach. Кажется, мне нужен способ отправить сообщение всем остальным процессам, чтобы они тоже перестали.

Если не возможно с foreach, знает ли кто-нибудь об альтернативах? Я также пытался добиться этого с помощью parallel::mclapply, но это тоже не работает.

> sessionInfo() 
R version 3.0.0 (2013-04-03) 
Platform: x86_64-apple-darwin10.8.0 (64-bit) 

locale: 
[1] C/UTF-8/C/C/C/C 

attached base packages: 
[1] stats  graphics grDevices utils  datasets methods base 

other attached packages: 
[1] foreach_1.4.0 

loaded via a namespace (and not attached): 
[1] codetools_0.2-8 compiler_3.0.0 iterators_1.0.6 
+0

Невозможно использовать 'for' вместо этого? –

+0

Нет, потому что в конечном итоге я хочу распараллелить это, используя пакет doRNG. (Извините, я не сделал это ясно в своем оригинальном посте: я отредактировал его, чтобы сделать это явным.) –

+3

На основании ваших других комментариев вы можете захотеть, чтобы каждый подпроцесс мог установить объект «флаг» на и сделать этот объект доступным для чтения всеми подпроцессами. Все они должны иметь некоторую внутреннюю точку останова или эквивалент, которая регулярно проверяет значение «флага», поэтому все они могут самозакрываться. –

ответ

2

Это не прямой ответ на ваш вопрос, но с помощью when() вы можете избежать входа в цикл, если условие:

x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %:% 
    when(!is.element(k, 2:6)) %do% 
    { 
    cat("Element ", k, "\n") 
    Sys.sleep(0.5) 
    k 
    } 

EDIT:

Я забыл кое-что: Я думаю, что по дизайну вы не можете просто остановить цикл foreach. Если вы запускаете цикл параллельно, каждый поворот обрабатывается независимо, что означает, что когда вы останавливаете весь цикл для k=2, это не предсказуемо, если процесс для k=1 завершен или уже запущен. Следовательно, использование условия when() дает вам детерминированный результат.

EDIT 2: Другое решение с учетом вашего комментария.

shouldStop <- FALSE 
x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %do% 
    { 
    if(!shouldStop){ 
     # put your time consuming code here 
     cat("Element ", k, "\n") 
     Sys.sleep(0.5) 
     shouldStop <- shouldStop || is.element(k, 2:6) 
     k 
    } 
    } 

Используя это решение, процессы, выполняющиеся в то время как условие останова становится истинным по-прежнему вычисляются до конца, но избежать затрат времени на всех предстоящих процессов.

+0

Проблема в том, что я буду знать только, хочу ли я выйти из цикла, как только выполнил некоторые вычисления в цикле. Но именно эти вычисления я хочу распараллелить с этим циклом. (Другими словами, само условие занимает много времени для вычисления.) –

+0

EDIT 2 - полезное предложение, но способ, которым я запускаю это, состоит в том, что количество обрабатываемых процессов равно количеству доступных ядер процессора (10- 50). Таким образом, все процессы запускаются одновременно, и нет никаких будущих процессов, чтобы избежать запуска. Как и сейчас, я должен дождаться завершения всех этих действий, прежде чем я получу сообщение об ошибке из stop(). Работа для меня была бы вручную убить всю программу, как только я увижу сообщение, созданное cat() (в моем сообщении), но это нецелесообразно, потому что это длинный пробег (~ 1 день) и выполняется в фон на удаленной машине. –

+0

Эта информация изменяет все это и должна упоминаться в исходном сообщении. Однако я должен признать, что в этом случае мои идеи ограничены. Вы можете попытаться напрямую управлять своими узлами, используя, например, 'clusterApply' из пакета' snow', и вызывать 'stopCluster()', когда выполняется первое задание с желаемым результатом. Но имейте в виду, что вызов 'stopCluster()' из подчиненного процесса не только даст уродливые ошибки. Кроме того, результат не вернется к мастеру. Может быть, у кого-то есть идея, как результат может быть принят? – Beasterfield

11

Похоже, что вы хотите нетерпеливый вариант обработки ошибки «стоп». Вы можете реализовать это, написав пользовательскую функцию комбинирования и упорядочив foreach, чтобы вызвать его, как только будут возвращены каждый результат.Для этого вам необходимо:

  • Используйте бэкенд, который поддерживает вызов combine на лету, как doMPI или doRedis
  • Не включайте .multicombine
  • Установите .inorder в FALSE
  • Набор .init (например, NULL)

Вот пример, который делает это :

library(foreach) 
parfun <- function(errval, n) { 
    abortable <- function(errfun) { 
    comb <- function(x, y) { 
     if (inherits(y, 'error')) { 
     warning('This will leave your parallel backend in an inconsistent state') 
     errfun(y) 
     } 
     c(x, y) 
    } 
    foreach(i=seq_len(n), .errorhandling='pass', .export='errval', 
      .combine='comb', .inorder=FALSE, .init=NULL) %dopar% { 
     if (i == errval) 
     stop('testing abort') 
     Sys.sleep(10) 
     i 
    } 
    } 
    callCC(abortable) 
} 

Обратите внимание, что я также установить обработку ошибок, чтобы «пройти» так foreach будет вызывать функцию в сочетании с объектом ошибки. Функция callCC используется для возврата из цикла foreach независимо от обработки ошибок, используемой в пределах foreach и бэкэнд. В этом случае callCC вызовет функцию abortable, передав ей объект функции, который используется для принудительного возврата callCC. Вызывая эту функцию из функции объединения, мы можем избежать цикла foreach, когда обнаруживаем объект ошибки, и возвращаем этот объект callCC. См. ?callCC для получения дополнительной информации.

Вы можете использовать parfun без параллельного бэкэнда зарегистрированного и убедитесь, что foreach петли «ломает», как только он выполняет задачу, которая выдает ошибку, но это может занять некоторое время, поскольку задачи выполняется последовательно. Например, это занимает 20 секунд, чтобы выполнить, если ни бэкенд не зарегистрировано:

print(system.time(parfun(3, 4))) 

При выполнении parfun параллельно, мы должны сделать больше, чем просто выйти из цикла foreach: мы должны остановить рабочих, иначе они будут продолжать вычислять свои назначенные задачи. С doMPI, рабочие могут быть остановлены с помощью mpi.abort:

library(doMPI) 
cl <- startMPIcluster() 
registerDoMPI(cl) 
r <- parfun(getDoParWorkers(), getDoParWorkers()) 
if (inherits(r, 'error')) { 
    cat(sprintf('Caught error: %s\n', conditionMessage(r))) 
    mpi.abort(cl$comm) 
} 

Обратите внимание, что объект кластера не может быть использован после того, как цикл прерывается, потому что вещи не были надлежащим образом очищены, поэтому нормальный «стоп» обработка ошибок не работает.

+0

+1 для комментария, и для вашей книги, которая мне очень помогла :) – statquant

+0

Я немного смущен тем, что вызывает ранний выход, если он больше не является stop() в функции comb comb(). Я предполагаю, что stop() в foreach вызывает вызов comb(). Это значит, что errfun() вызывает ранний выход? Но что такое errfun()? Он явно не определен (и имя произвольно). Кроме того, когда я запускаю parfun (6,12) с% dopar% и doMPI на 4 ядрах, выполнение продолжается для i = 5,7,8,9 (проверено с использованием метода sink() в моем ответе ниже), поэтому я «Не уверен, что на самом деле это происходит раньше, когда вы запускаете параллель. –

+0

«Стоп» в цикле 'foreach' просто приводит к возврату объекта ошибки в качестве результата задачи для мастера. Поскольку обработка ошибок «проходит», 'foreach' передает ее функции компиляции и делает это немедленно из-за указанных опций. Если функция комманды вызывает 'errfun', функция объединения не вернется к вызывающей стороне, а к' callCC'. Но, как я уже сказал в пересмотренном ответе, это не влияет на рабочих, поэтому требуется mpi.abort. –

-1

Первоначальный ответ Стив Уэстон в основном ответил на это. Но вот немного измененная версия его ответа, которая также сохраняет две дополнительные функции в том, как они мне нужны: (1) генерация случайных чисел; (2) диагностику времени выполнения печати.

suppressMessages(library(doMPI)) 

comb <- function(x, y) { 
    if(inherits(y, 'error')) { 
    stop(y) 
    } 
    rbind(x, y) # forces the row names to be 'y' 
} 

myfunc <- function() { 
    writeLines(text="foreach log", con="log.txt") 
    foreach(i=1:12, .errorhandling='pass', .combine='comb', .inorder=FALSE, .init=NULL) %dopar% { 
    set.seed(100) 
    sink("log.txt", append=TRUE) 
    if(i==6) { 
     stop('testing abort') 
    } 
    Sys.sleep(10) 
    cat("Completed task", i, "\n") 
    sink(NULL) 
    rnorm(5,mean=i) 
    } 
} 

myerr <- function(e) { 
    cat(sprintf('Caught error: %s\n', conditionMessage(e))) 
    mpi.abort(cl$comm) 
} 

cl <- startMPIcluster(4) 
registerDoMPI(cl) 
r <- tryCatch(myfunc(), error=myerr) 
closeCluster(cl) 

Когда этот файл используется, он выходит, как предполагалось с сообщением об ошибке

> source("exp2.R") 
    4 slaves are spawned successfully. 0 failed. 
Caught error: testing abort 
[ganges.local:16325] MPI_ABORT invoked on rank 0 in communicator with errorcode 0 

Файлы «log.txt» обеспечивает правильную диагностику до точки ошибки, а затем предоставляет дополнительную погрешность Информация. Реально, выполнение всех задач останавливается, как только stop() в цикле foreach встречается: он не дожидается завершения цикла foreach. Таким образом, я вижу сообщение «Completed task» до i = 4. (Обратите внимание, что если Sys.sleep() короче, более поздние задачи могут быть запущены до обработки mpi.abort().

Если я изменил условие остановки на «i == 100», остановка и, следовательно, ошибка не срабатывает.Код успешно существует без сообщения об ошибке, а r представляет собой 2D-массив с размерами 12 * 5.

Кстати, кажется, что мне действительно не нужно .inorder = FALSE (я думаю, это просто дает мне небольшое увеличение скорости в случае обнаружения ошибки).

+0

Я изменил свой ответ, потому что обнаружил, что он использовал неправильную обработку ошибок в doMPI. Выполнение 'stop' в функции комбайна не должно прерываться foreach, и это исправлено в версии разработки doMPI на R-forge, поэтому ваш ответ не будет работать, когда он будет выпущен. –

+0

Если вы не устанавливаете '.inorder = FALSE', тогда функция объединения не будет вызываться, пока все предыдущие задачи не будут обработаны функцией объединения. Поэтому, если неудачная задача не является первой задачей, в вашем примере потребуется не менее 10 секунд. –

0

Мне не очень повезло получить foreach, чтобы сделать то, что я хочу, поэтому вот решение, используя пакет parallel, который, кажется, делает то, что я хочу. Я использую опцию intermediate в mcparallel() для передачи результатов от моей функции do.task(), сразу к функции check.res(). Если do.task() выдает ошибку, то это используется в check.res(), чтобы вызвать вызов tools::pskill, чтобы явно убить всех работников. Это может быть не очень элегантно, но оно работает в том смысле, что оно мгновенно останавливает работу. Кроме того, я могу просто наследовать все переменные, которые мне нужны для обработки, в do.task() из существующей среды. (В действительности do.task() является гораздо более сложной функция, требующая много переменных, которые передаются в.)

library(parallel) 

# do.task() and check.res() inherit some variables from enclosing environment 

do.task <- function(x) { 
    cat("Starting task", x, "\n") 
    Sys.sleep(5*x) 
    if(x==stopat) { 
    stop("Error in job", x) # thrown to mccollect() which sends it to check.res() 
    } 
    cat(" Completed task", x, "\n") 
    return(10*x) 
} 

check.res <- function(r) { # r is list of results so far 
    cat("Called check.res\n") 
    sendKill <- FALSE 
    for(j in 1:Njob) { # check whether need to kill 
    if(inherits(r[[j]], 'try-error')) { 
     sendKill <- TRUE 
    } 
    } 
    if(sendKill) { # then kill all 
    for(j in 1:Njob) { 
     cat("Killing job", job[[j]]$pid, "\n") 
     tools::pskill(job[[j]]$pid) # mckill not accessible 
    } 
    } 
} 

Tstart <- Sys.time() 
stopat <- 3 
Njob <- 4 
job <- vector("list", length=Njob) 
for(j in 1:Njob) { 
    job[[j]]<- mcparallel(do.task(j)) 
} 
res <- mccollect(job, intermediate=check.res) # res is in order 1:Njob, regardless of how long jobs took 
cat("Collected\n") 
Tstop <- Sys.time() 
print(difftime(Tstop,Tstart)) 
for(j in 1:Njob) { 
    if(inherits(res[[j]], 'try-error')) { 
    stop("Parallel part encountered an error") 
    } 
} 

Это дает следующий дамп экрана и результаты для переменной res

> source("exp5.R") 
Starting task 1 
Starting task 2 
Starting task 3 
Starting task 4 
    Completed task 1 
Called check.res 
Called check.res 
    Completed task 2 
Called check.res 
Called check.res 
Called check.res 
Killing job 21423 
Killing job 21424 
Killing job 21425 
Killing job 21426 
Called check.res 
Killing job 21423 
Killing job 21424 
Killing job 21425 
Killing job 21426 
Called check.res 
Killing job 21423 
Killing job 21424 
Killing job 21425 
Killing job 21426 
Collected 
Time difference of 15.03558 secs 
Error in eval(expr, envir, enclos) : Parallel part encountered an error 
> res 
$`21423` 
[1] 10 

$`21424` 
[1] 20 

$`21425` 
[1] "Error in do.task(j) : Error in job3\n" 
attr(,"class") 
[1] "try-error" 
attr(,"condition") 
<simpleError in do.task(j): Error in job3> 

$`21426` 
NULL 
1

Ответ, который я получил от РЕВОЛЮЦИИ Техническая поддержка: «no-foreach в настоящее время не имеет возможности остановить все параллельные вычисления при ошибке для любого».

Смежные вопросы