2011-01-06 4 views
3

Я хочу создать параллельный конвейер в C#. Я declaered Интерфейса с именем IOperation:Параллельный конвейер в C#

public interface IOperation<Tin, Tout> 
{ 
    BlockingCollection<Tout> BlockingCollection(IEnumerable<Tin> input); 
} 

Теперь я хочу, чтобы написать класс, который выполняет кратный эти операции параллельно. я Баган с этим:

public class Pipeline : IPipeline 
{ 
    private List<IOperation<Object, Object>> operations = new List<IOperation<Object, Object>>(); 
    private List<BlockingCollection<Object>> buffers = new List<BlockingCollection<Object>>(); 
    public void Register(IOperation<Object, Object> operation) 
    { 
     operations.Add(operation); 
    } 

    public void Execute() 
    { 

    } 
} 

Но я не нахожу решения для сохранения операций и буферы между операциями, поскольку все они имеют различные обобщенные типы. У кого-нибудь есть идея?

+1

Не могли бы вы пояснить, что вы подразумеваете под «сохранением»? –

+0

Я хочу, чтобы они были в двух списках в верхней части класса Pipeline – AntonS

ответ

1

Рассматривали ли вы использование Parallel.ForEach от TPL?
Параллельная библиотека задач (TPL) представляет собой набор общедоступных типов и API в .NET 4.

+0

Могу ли я сделать цикл foreach, который ждет новых входов, если ничего в списке больше? – AntonS

+1

Я так считаю. Вместо использования списка используйте метод, возвращающий IEnumerable . В этом методе, если доступны данные, тогда «возвращай» данные, иначе дождитесь сигнала синхронизации (например, «Импульс монитора»). – ZunTzu

1

Не совсем ясно, как ваш трубопровод должен работать. Почему вы проходите вокруг BlockingCollections? Почему вы используете дженерики, но затем помещаете object в качестве типа?

Рассмотрите вместо этого, что вы загружаете с помощью делегирования типа Action, а затем используйте параллельную библиотеку задач для создания задач, которые выполняют эти действия параллельно.

public void Register(Action operation) 
    { 
     operations.Add(operation); 
    } 

public void Execute() 
    { 
     foreach (var action in operations) 
      Task.StartNew(operation); 
    } 

Но это не действительно «конвейер», это просто набор операций, выполняемых параллельно.

Конвейер, как правило, имеет шаги трубопровода с типом ввода и типом выхода. Вы можете справиться с этим, создав что-то вроде PipelineStep<T,U>, и вы построите каждый шаг шага, проходящий в операции Func. На внутреннем уровне каждый шаг в конвейере может потреблять входной IEnumerable и создавать выходной IEnumerable, и он может сделать это с помощью Task или более просто используя параллельный цикл foreach.

В качестве альтернативы вы можете использовать метод TCL Task.ContinueWith, чтобы связать задачи вместе с вводом на вывод.

0

Существует хорошая статья о http://msdn.microsoft.com/en-us/library/ff963548.aspx о параллельных трубопроводах с BlockingCollection.

В основном каждый шаг должен иметь очередь вывода типа BlockingCollection. Он берет элементы из очереди вывода предыдущего шага и добавляет их к своей выходной обработке после завершения обработки.

1

У Microsoft есть что-то в этом роде: TPL Dataflow позволяет вам определять блоки в конвейере с мелкозернистыми элементами управления, как они буферизируются и распараллеливаются.

В отличие от вашего решения, он использует полностью асинхронный дизайн push. Он не использует BlockingCollection (конструкция блокирующего тяги) и будет значительно быстрее для него, если у вас есть глубокий трубопровод.

Смежные вопросы