2010-05-28 1 views
4

У меня есть случай, когда многие потоки одновременно генерируют данные, которые в конечном счете записываются в один длинный, серийный файл . Мне нужно как-то сериализовать эти записи так, чтобы поток был написан в правильном порядке.Стандартный термин для буфера перезаписи ввода-вывода потока?

т.е., у меня есть входная очередь 2048 заданий J ..j п, каждый из которых производит порцию данных о я. Работы выполняются параллельно, скажем, из восьми потоков, но выходные блоки должны появляться в потоке в том же порядке, что и соответствующие входные блоки —, выходной файл должен быть в порядке o o ...

решения этой проблемы довольно самоочевидный: мне нужно какое-то буфер, который накапливает и записывает выходные блоков в правильном порядке, похожий на буфер CPU переупорядочивания в Tomasulo's algorithm, или к тому, что TCP повторно собирает пакеты не по порядку, прежде чем передавать их на прикладной уровень.

Прежде чем я переведу код, я хотел бы сделать быстрый поиск литературы, чтобы увидеть, есть ли какие-либо документы, которые решили эту проблему особенно умным или эффективным способом, так как у меня серьезные ограничения в реальном времени и памяти. Кажется, я не могу найти статей, описывающих это; поиск в Scholar по каждой перестановке [потоков, параллельного, переупорядоченного буфера, повторной сборки, io, serialize] не принес ничего полезного. Я чувствую, что я просто не должен искать правильные условия.

Есть ли общее академическое имя или ключевое слово для такого типа шаблона, который я могу выполнить?

ответ

0

На самом деле вам не нужно накапливать куски. Большинство операционных систем и языков предоставляют абстракцию файла произвольного доступа, которая позволяет каждому потоку независимо записывать свои выходные данные в правильное положение в файле, не влияя на выходные данные из любого другого потока.

Или вы пишете поистине серийный выходной файл, например, сокет?

+0

Поистине серийный - шифр потока. – Crashworks

+0

Ваше решение работает только в том случае, если длина выходных записей известна до завершения обработки. –

0

Я бы не использовал перезаписываемый буфер лично. Я бы создал один объект «job» для каждого задания и, в зависимости от вашей среды, либо использовал передачу сообщений, либо мьютексы для получения завершенных данных из каждого задания по порядку. Если следующее задание не выполняется, ваш процесс «писатель» ждет, пока он не появится.

+0

Боюсь, я не понимаю, что вы имеете в виду. Вы имеете в виду, что у меня должно быть (n) много мьютексов, по одному для каждой работы, и что писатель должен ждать по каждому из них в порядке возрастания? Проблема в том, что у меня есть память только для хранения около двадцати заданий за раз, и если я столкнусь с ситуацией, когда текущее окно завершается в обратном порядке, это оставит несколько потоков бездействующими до тех пор, пока "голова" один завершается. – Crashworks

+0

Вот что я предлагал, да. Я не думаю, что любое другое решение будет лучше, если задачи будут выполнены в обратном порядке, за исключением предложения Стива, если ваши записи известны длиной или кэшируют завершенные результаты на диск. –

0

Я бы использовал ringbuffer, который имеет ту же длину, что и количество потоков, которые вы используете. В кольцевом буфере также будет одинаковое количество мьютексов.

Rinbuffer должен также знать id последнего фрагмента, который он написал в файл. Это эквивалентно индексу 0 вашего буфера.

При добавлении в ringbuffer вы можете проверить, можете ли вы писать, то есть установить индекс 0, затем вы можете записать более одного фрагмента за один раз в файл.

Если индекс 0 не установлен, просто заблокируйте текущий поток для ожидания. - У вас также может быть кольцевой буффер 2-3 раза в длину, чем количество потоков и блокировка только тогда, когда это необходимо, т. Е. Когда запущено достаточное количество заданий для заполнения буфера.

Не забудьте обновить последний кусок написан жестко;)

Вы можете также использовать двойную буферизацию, когда сочинительство к файлу.

0

Укажите, что в очереди на выход есть futures вместо фактических данных. Когда вы извлекаете элемент из входной очереди, немедленно отправляйте соответствующее будущее в очередь вывода (следя за тем, чтобы это сохраняло порядок - см. Ниже). Когда рабочий поток обработал элемент, он может затем установить значение в будущем. Выходной поток может читать каждое будущее из очереди и блокировать, пока это будущее не будет готово. Если более поздние становятся готовыми рано, это никак не влияет на выходной поток, если фьючерсы в порядке.

Существует два способа гарантировать, что фьючерсы на выходной очереди находятся в правильном порядке. Первый заключается в использовании одного мьютекса для чтения из входной очереди и записи в очередь вывода. Каждый поток блокирует мьютекс, берет элемент из входной очереди, отправляет будущее в очередь вывода и освобождает мьютекс.

Второй заключается в том, чтобы иметь один главный поток, который читается из входной очереди, отправляет будущее в очередь вывода, а затем передает элемент в рабочий поток для выполнения.

В C++ с одним мьютекса, защищающего очереди это будет выглядеть следующим образом:

#include <thread> 
#include <mutex> 
#include <future> 

struct work_data{}; 
struct result_data{}; 

std::mutex queue_mutex; 
std::queue<work_data> input_queue; 
std::queue<std::future<result_data> > output_queue; 

result_data process(work_data const&); // do the actual work 

void worker_thread() 
{ 
    for(;;) // substitute an appropriate termination condition 
    { 
     std::promise<result_data> p; 
     work_data data; 
     { 
      std::lock_guard<std::mutex> lk(queue_mutex); 
      if(input_queue.empty()) 
      { 
       continue; 
      } 
      data=input_queue.front(); 
      input_queue.pop(); 
      std::promise<result_data> item_promise; 
      output_queue.push(item_promise.get_future()); 
      p=std::move(item_promise); 
     } 
     p.set_value(process(data)); 
    } 
} 

void write(result_data const&); // write the result to the output stream 

void output_thread() 
{ 
    for(;;) // or whatever termination condition 
    { 
     std::future<result_data> f; 
     { 
      std::lock_guard<std::mutex> lk(queue_mutex); 
      if(output_queue.empty()) 
      { 
       continue; 
      } 
      f=std::move(output_queue.front()); 
      output_queue.pop(); 
     } 
     write(f.get()); 
    } 
} 
Смежные вопросы