2014-12-29 5 views
1

У меня есть процесс, который выглядит следующим образом.Подождите, пока предыдущие блоки закончат обработку до продолжения

  1. Получить набор CSV файлы из папки
  2. Прочитайте файлы CSV и сохранять содержимое в базе данных
  3. Считывание данных из базы данных и выполнить еще некоторую обработку.

Причина разделения этапов 2 & 3 заключается в разделении вопросов, связанных с чтением файлов из-за проблем, связанных с обработкой файлов.

Я могу моделировать это с помощью трех блоков потока данных. Проблема в том, что я не хочу, чтобы блок 3 начинался до тех пор, пока все файлы не будут сохранены в базе данных. Мне нужно каким-то образом определить, что все файлы, которые были собраны в блоке 1, обработаны блоком 2. Блок 2 будет иметь свой MaxDegreeOfParallelism, установленный в Unbounded. Я хочу, чтобы они обрабатывались параллельно.

Я рассмотрел использование Encapsulate на первых двух блоках, но я не думаю, что это сработало. Возможно, мне нужен какой-то Batchblock, но партии не все будут одинакового размера.

Как я могу это сделать? Нужно ли мне создавать собственный тип блока?

+0

вы должны прочитать все данные из базы данных в то же время, или вы можете прочитать их файл файлом, или что-то подобное ? – svick

ответ

1

Это не соответствует потоку TDF, так как этап №2 не передает элементы на этап №3, который начинается после того, как предыдущие уже завершены.

У вас должно быть 2 отдельных потока. Первый считывает из папки и сохраняет в базе данных, а второй считывает из базы данных и начинает обработку. Вы можете ждать первого потока завершить к ожидая Completion собственности:

var reader = // Create #1 block 
var dbFiller = // Create #2 block 

reader.LinkTo(dbFiller, new DataflowLinkOptions { PropagateCompletion = true }); // Link both blocks with Completion Propagation 

reader.Post(// Queue up work for reader 

await reader.Completion; // Asynchronously wait for previous steps to complete 

var processor = // Create #3 block 

processor.Post(// Queue up work for processor 
+0

Чтобы это сделать, мне нужно было бы заполнить() читателя, не так ли? Что отсутствует в коде выше. Я не хочу этого делать, поскольку это повторяемый процесс, и я хочу повторно использовать конвейер. – bornfromanegg

+0

@ user1158174, но вы говорите, что хотите начать обработку только после предыдущего завершения. Что значит конец, если он не завершен ... – i3arnon

+0

Да. Наверное, это действительно вопрос, не так ли? Шаг 1 будет запускаться один раз каждые, скажем, десять минут. Когда он выполняется, он может найти десять файлов. Я хочу, чтобы все они обрабатывались на этапе 2 до начала шага 3. Поэтому, когда я говорю, что закончил, я имею в виду «закончил текущую партию файлов». Здесь я застреваю, потому что Dataflow не предлагает этого из коробки. BatchBlock является самым близким, но ему нужны партии определенного размера, что не то, что у меня есть. – bornfromanegg

Смежные вопросы