2015-02-27 2 views
1

Я работаю над модулем обработки данных в R, используя код C/C++, в основном по причинам скорости. Вот список фактов моей проблемы.R parallel написать структуру SEXP

  • Конечные данные результата - это список векторов строк и занимает от 20 МБ до 200 МБ памяти.
  • Обработка данных может быть вписана в модель с одним производителем/несколькими потребителями.
  • Для получения детальной информации нажмите на кнопку wrap для получения пересылки vector<vector<string> > на List.

Поэтому я намерен работать непосредственно в структурах SEXP, благодаря чему я мог бы сэкономить время для окончательного преобразования. Моя основная функция выглядит так.

boost::atomic<bool> done(false); 
SEXP myfun(...) { 
    ... 
    SEXP sdataStr; 
    PROTECT(sdataStr=allocVector(VECSXP, nElem)); 
    vector<SEXP> dataStr(nElem); 
    for (int i=0; i<nElem; ++i) { 
     dataStr[i]=SET_VECTOR_ELT(sdataStr, i, allocVector(STRSXP, n)); 
    } 
    Producer producer(&queue); 
    Consumer consumer1(dataStr, nElem, &queue); 
    Consumer consumer2(dataStr, nElem, &queue); 

    boost::thread produce(producer); 
    boost::thread consume1(consumer1); 
    boost::thread consume2(consumer2); 

    produce.join(); 
    done=true; 
    consume1.join(); 
    consume2.join(); 
    UNPROTECT(1); 
    return sdataStr; 
} 

Мой потребительский класс выглядит следующим образом

class Consumer { 
    vector<SEXP>& m_dataStr; 
    boost::lockfree::queue<buffer>* m_queue; 
    buffer m_buffer; 

    public: 
    Consumer(vector<SEXP>& dataStr, boost::lockfree::queue<buffer>* queue) : m_dataStr(dataStr), m_queue(queue) {} 

    void operator()() { 
     while (!done) { 
      while (m_queue->pop(m_buffer)) { 
       process_item(); 
      } 
     } 
     while (m_queue->pop(m_buffer)) { 
      process_item(); 
     } 
    } 

    private: 
    process_item() { 
     ... 
     // for some 0<=idx<nElem, 0<=i<n, some char* f and integer len 
     SET_STRING_ELT(m_dataStr[idx], i, mkCharLen(f,len)); 
     ... 
    } 
} 

Это единственные места, которые я использую Rinternals. Логика программы гарантирует, что запись в одно и то же место по различным потокам никогда не произойдет, то есть комбинация idx и i в классе Consumer может в большинстве случаев встречаться один раз. Я столкнулся с различными странными проблемами, такими как «дисбаланс стека» или «привязка к неправильному поколению» и т. Д. Есть ли что-то, что мне не хватает? Или вызов SET_STRING_ELT в нескольких потоках не рекомендуется? Большое спасибо!

+0

Есть ли документация, которая сообщает, какая функция C/R API является потокобезопасной или нет? Например, 'REAL (...) = 1.0' thread-safe? – hchen

+0

Боюсь, что такого документа нет. Вы должны вывести его из источника. О 'REAL', я бы предложил вам сначала захватить' double * ', а затем вы можете безопасно использовать указатель в нескольких потоках (учитывая, что вы не изменяете одну и ту же позицию из нескольких потоков ... –

ответ

2

Функции API C/R не должны вызываться в потоках, если вы не знаете, что делаете, например, mkCharLen может изменить внутреннюю хеш-таблицу, которая используется для всех строк R, поэтому вы не можете вызвать это в потоке , SET_STRING_ELT, вероятно, также не используется в потоке, особенно если барьер записи включен.