Посмотрите ReactiveExtensions
A Playful Introduction to Rx by Erik Meijer
Intro to Rx by Lee Campbell
Примервзятую из codePERFdotNET
// Or whatever producer you have, expose an IObservable
public interface IApiService
{
IObservable<Response> Responses { get; }
}
Затем вы можете подписаться на что Наблюдаемые несколько раз:
public Controller(IApiService service)
{
// Do one thing for each response
service.Responses
.Subscribe(responseHandler);
// Do something else for each response
service.Responses
.Subscribe(r => _throttle.OnNext(0));
}
EDIT
Это не ясно, если вы хотите, чтобы оба потребителей обрабатывать каждое сообщение или распространение с использованием системы массового обслуживания, где несколько потоков считываются из очереди, которая имеет работу для обработки , и каждый кусочек работы обрабатывается только одним потребителем. Если да, то посмотрите на решение для корпоративных сообщений, такое как MSMQ или RabbitMQ или любую другую систему MQ. Если он является внутренним для вашего приложения, используйте только то, что ConcurrentQueue
Какие активные классы, производитель? или потребителей? –
приложение имеет 3 основных потока - два от потребителей и на производителя. все они активны – Yanshof