2015-06-24 2 views
4

Я пытаюсь настроить параллельную задачу, где каждому работнику нужно будет делать запросы к базе данных. Я пытаюсь настроить каждого работника с помощью соединения, как показано в this question, но каждый раз, когда я пытаюсь, он возвращает <Expired PostgreSQLConnection:(2781,0)> для многих рабочих, которых я зарегистрировал.Соединения RPostgreSQL истек, как только они начинаются с doParallel clusterEvalQ

Вот мой код:

cl <- makeCluster(detectCores()) 
registerDoParallel(cl) 

clusterEvalQ(cl, { 
    library(RPostgreSQL) 
    drv<-dbDriver("PostgreSQL") 
    con<-dbConnect(drv, user="user", password="password", dbname="ISO",host="localhost") 

}) 

Если я пытаюсь запустить мой foreach несмотря на ошибку, он терпит неудачу с task 1 failed - "expired PostgreSQLConnection"

Когда я вхожу в состояние Postgres сервера он показывает все активные сеансы, которые были создано.

У меня нет никаких проблем с взаимодействием с postgres из моего основного экземпляра R.

Если я бегу

clusterEvalQ(cl, { 
    library(RPostgreSQL) 
    drv<-dbDriver("PostgreSQL") 
    con<-dbConnect(drv, user="user", password="password", dbname="ISO",host="localhost") 
    dbGetQuery(con, "select inet_client_port()") 

}) 

, то он будет возвращать все клиентские порты. Это не дает мне сообщение об истекшем времени, но если я попытаюсь запустить команду foreach, он сбой будет с той же ошибкой.

Edit:

Я пробовал это на Ubuntu и 2 окна компьютеров, все они дают ту же ошибку.

Другой Edit:

Сейчас 3 окна компьютеры

+0

Можете ли вы также добавить свой код foreach в сообщение? –

+0

@JellenVermeir не работает ни в какой команде 'dbGetQuery', независимо от того, что еще находится в' foreach'. Например: 'foreach (i = 1: 4)% dopar% dbGetQuery (con," select * from sometable limit 1 ")' будет терпеть неудачу, но 'foreach (i = 1: 4)% do% dbGetQuery (con," select * from sometable limit 1 ")' не прерывается. Когда я говорю, что это терпит неудачу, я имею в виду, что я получаю сообщение об ошибке «expired PostgreSQLConnection» –

ответ

5

Я был в состоянии воспроизвести проблему на местном уровне. Я не совсем уверен, но я думаю, что проблема связана с тем, как clusterEvalQ работает внутри. Например, вы говорите, что dbGetQuery(con, "select inet_client_port()) дал вам выход для клиентского порта. Если запрос был фактически оценен/выполнен на узлах кластера, то вы не сможете увидеть этот вывод (так же, как вы не можете напрямую читать какие-либо другие операторы вывода или печати, которые выполняются на внешних кластерных дисках).

Следовательно, я понимаю, что оценка как-то сначала выполняется в локальной среде, а соответствующие функции и переменные впоследствии копируются/экспортируются в отдельные кластерноды. Это будет работать для любого другого типа функций/переменных, но, очевидно, не для соединений db. Если соединения/сопоставления портов связаны с экземпляром master R, то соединения не будут работать из подчиненных экземпляров. Вы также получите ту же самую ошибку, если попытаетесь использовать функцию clusterExport, чтобы экспортировать подключения, созданные на главном экземпляре.

В качестве альтернативы, вы можете создать отдельные соединения внутри отдельных задач. Я проверил с локальной базой данных, что следующие работы:

library(doParallel) 
nrCores = detectCores() 
cl <- makeCluster(nrCores) 
registerDoParallel(cl) 
clusterEvalQ(cl,library(RPostgreSQL)) 
clusterEvalQ(cl,library(DBI)) 

result <- foreach(i=1:nrCores) %dopar% 
{ 
    drv <- dbDriver("PostgreSQL") 
    con <- dbConnect(drv, user="user", password="password", dbname="ISO",host="localhost") 
    queryResult <- dbGetQuery(con, "fetch something...") 
    dbDisconnect(con) 
    return(queryResult) 
} 
stopCluster(cl) 

Однако теперь вы должны принять во внимание, что вы будете создавать и отключить новое соединение каждого foreach итерации. Из-за этого могут возникнуть некоторые издержки производительности. Очевидно, вы обойдете это, разделив ваши запросы/данные разумно, чтобы много работы выполнялось на той же самой итерации. В идеале вы должны разделить работу точно так же, как и количество доступных вам ядер.

Смежные вопросы