Есть ли какая-либо хорошая реализация очереди обработки асинхронно?Выполнение асинхронных операций обработки
ответ
Довольно старый, но это хороший один, что я знаю, у http://www.codeproject.com/KB/cs/inprocessasynservicesincs.aspx
Если вы используете .NET 4, многие из них поставляются бесплатно из коробки.
Если у вас уже есть все предметы, вы можете использовать Parallel.ForEach
. Если вам нужна очередь производителей/потребителей, вы можете использовать BlockingCollection<T>
для переноса одной из параллельных коллекций (например, ConcurrentQueue<T>
или ConcurrentStack<T>
). Как вы это используете, зависит от вас; есть blog post here, вдающийся в подробный пример, и, вероятно, есть и другие подобные сообщения. (Возможно, вы захотите посмотреть на Parallel Team Blog для получения большего количества материала.)
Используйте задачи .NET 4.
var t = Task<int>.Factory.StartNew(() => ProcessItem());
Используйте ConcurrencyOptions, чтобы установить максимальную степень параллелизма в этой обработке.
Если вы хотите самостоятельно опрокинуть его, используйте BlockingCollection<T>
, который обеспечивает блокирующие и ограничивающие возможности для потокобезопасных коллекций и реализует отдельный поток (или потоки) для потребителя.
Вы могли бы взглянуть на Producer/Consumer шаблон, если вы достаточно неудачны, чтобы не использовать .net 4.
Вот мой код, я разобрал, мои извинения за беспорядок, но вы должны быть в состоянии использовать его, добавив в проект и перекомпилировать, а затем создав свой процесс с помощью полученной DLL.
Enum для ChannelState:
public enum ChannelState
{
WaitingForSend,
WaitingForReceive,
Open
}
Интерфейсы:
public interface IChannel<TMessage>
{
// Methods
TMessage Receive();
void Send(TMessage message);
// Properties
bool CanReceive { get; }
bool CanSend { get; }
ChannelState State { get; }
}
using System;
public interface IReceiver<TMessage>
{
// Events
event EventHandler<MessageReceivedEventArgs<TMessage>> MessageReceived;
// Methods
void Activate();
void Deactivate();
// Properties
bool IsActive { get; }
}
Конкретные классы:
using System.Collections.Generic;
using System.Threading;
using System;
public class BufferedChannel<TMessage> : IChannel<TMessage>
{
// Fields
private int _blockedReceivers;
private int _blockedSenders;
private Queue<TMessage> _buffer;
private int _capacity;
private EventWaitHandle _capacityAvailableEvent;
private EventWaitHandle _messagesAvailableEvent;
// Methods
public BufferedChannel()
{
this._buffer = new Queue<TMessage>();
this._messagesAvailableEvent = new EventWaitHandle(false, EventResetMode.AutoReset);
this._capacityAvailableEvent = new EventWaitHandle(true, EventResetMode.AutoReset);
this._capacity = 50;
}
public BufferedChannel(int bufferSize)
{
this._buffer = new Queue<TMessage>();
this._messagesAvailableEvent = new EventWaitHandle(false, EventResetMode.AutoReset);
this._capacityAvailableEvent = new EventWaitHandle(true, EventResetMode.AutoReset);
this._capacity = 50;
if (bufferSize <= 0)
{
throw new ArgumentOutOfRangeException("bufferSize", bufferSize, ExceptionMessages.ChannelsBufferSizeMustBeGreaterThanZero);
}
this._capacity = bufferSize;
}
public TMessage Receive()
{
Interlocked.Increment(ref this._blockedReceivers);
try
{
this._messagesAvailableEvent.WaitOne();
}
catch
{
lock (this._buffer)
{
Interlocked.Decrement(ref this._blockedReceivers);
}
throw;
}
lock (this._buffer)
{
Interlocked.Decrement(ref this._blockedReceivers);
this._capacityAvailableEvent.Set();
if ((this._buffer.Count - 1) > this._blockedReceivers)
{
this._messagesAvailableEvent.Set();
}
return this._buffer.Dequeue();
}
}
public void Send(TMessage message)
{
Interlocked.Increment(ref this._blockedSenders);
try
{
this._capacityAvailableEvent.WaitOne();
}
catch
{
lock (this._buffer)
{
Interlocked.Decrement(ref this._blockedSenders);
}
throw;
}
lock (this._buffer)
{
Interlocked.Decrement(ref this._blockedSenders);
this._buffer.Enqueue(message);
if (this._buffer.Count < this.BufferSize)
{
this._capacityAvailableEvent.Set();
}
this._messagesAvailableEvent.Set();
}
}
// Properties
public int BufferCount
{
get
{
lock (this._buffer)
{
return this._buffer.Count;
}
}
}
public int BufferSize
{
get
{
lock (this._buffer)
{
return this._capacity;
}
}
set
{
lock (this._buffer)
{
if (value <= 0)
{
throw new ArgumentOutOfRangeException("BufferSize", value, ExceptionMessages.ChannelsBufferSizeMustBeGreaterThanZero);
}
this._capacity = value;
if ((this._blockedSenders > 0) && (this._capacity > this._buffer.Count))
{
this._capacityAvailableEvent.Set();
}
}
}
}
public bool CanReceive
{
get
{
return true;
}
}
public bool CanSend
{
get
{
return true;
}
}
public ChannelState State
{
get
{
if (this._blockedSenders > 0)
{
return ChannelState.WaitingForReceive;
}
if (this._blockedReceivers > 0)
{
return ChannelState.WaitingForSend;
}
return ChannelState.Open;
}
}
}
using System;
using System.Collections.Generic;
using System.Threading;
using System.ComponentModel;
using System.Runtime.CompilerServices;
public sealed class Receiver<TMessage> : Component, IReceiver<TMessage>
{
// Fields
private volatile bool _continue;
private object _controlLock;
private volatile bool _disposed;
private Thread _receiverThread;
private bool _receiving;
private object _receivingLock;
private object _threadLock;
[CompilerGenerated]
private IChannel<TMessage> channel;
// Events
public event EventHandler<MessageReceivedEventArgs<TMessage>> MessageReceived;
// Methods
public Receiver(IChannel<TMessage> channel)
{
this._controlLock = new object();
this._threadLock = new object();
this._receivingLock = new object();
if (channel == null)
{
throw new ArgumentNullException("channel");
}
this.Channel = channel;
}
public void Activate()
{
this.CheckDisposed();
lock (this._controlLock)
{
if (this._receiverThread != null)
{
throw new InvalidOperationException();
}
this._continue = true;
this._receiverThread = new Thread(new ThreadStart(this.RunAsync));
this._receiverThread.IsBackground = true;
this._receiverThread.Start();
}
}
private void CheckDisposed()
{
if (this._disposed)
{
throw new ObjectDisposedException(base.GetType().Name);
}
}
public void Deactivate()
{
lock (this._controlLock)
{
if (this._continue)
{
this._continue = false;
lock (this._threadLock)
{
if (this._receiverThread != null)
{
this.SafeInterrupt();
this._receiverThread.Join();
this._receiverThread = null;
}
}
}
}
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
{
this.Deactivate();
this._disposed = true;
}
}
private void OnMessageReceived(TMessage message)
{
EventHandler<MessageReceivedEventArgs<TMessage>> messageReceived = this.MessageReceived;
if (messageReceived != null)
{
messageReceived(this, new MessageReceivedEventArgs<TMessage>(message));
}
}
private void RunAsync()
{
while (this._continue)
{
TMessage message = default(TMessage);
bool flag = false;
try
{
lock (this._receivingLock)
{
this._receiving = true;
}
message = this.Channel.Receive();
flag = true;
lock (this._receivingLock)
{
this._receiving = false;
}
Thread.Sleep(0);
}
catch (ThreadInterruptedException)
{
}
if (!this._continue)
{
if (flag)
{
this.Channel.Send(message);
return;
}
break;
}
this.OnMessageReceived(message);
}
}
private void SafeInterrupt()
{
lock (this._receivingLock)
{
lock (this._threadLock)
{
if (this._receiving && (this._receiverThread != null))
{
this._receiverThread.Interrupt();
}
}
}
}
// Properties
protected override bool CanRaiseEvents
{
get
{
return true;
}
}
public IChannel<TMessage> Channel
{
[CompilerGenerated]
get
{
return this.channel;
}
[CompilerGenerated]
private set
{
this.channel = value;
}
}
public bool IsActive
{
get
{
lock (this._controlLock)
{
return (this._receiverThread != null);
}
}
}
}
using System;
using System.Runtime.CompilerServices;
public class MessageReceivedEventArgs<TMessage> : EventArgs
{
// Fields
[CompilerGenerated]
private TMessage message;
// Methods
public MessageReceivedEventArgs(TMessage message)
{
this.Message = message;
}
// Properties
public TMessage Message
{
[CompilerGenerated]
get
{
return this.message;
}
[CompilerGenerated]
private set
{
this.message = value;
}
}
}
using System.Threading;
public class BlockingChannel<TMessage> : IChannel<TMessage>
{
// Fields
private TMessage _message;
private EventWaitHandle _messageReceiveEvent;
private EventWaitHandle _messageReceiveyEvent;
private object _sendLock;
private ChannelState _state;
private object _stateLock;
// Methods
public BlockingChannel()
{
this._state = ChannelState.Open;
this._stateLock = new object();
this._messageReceiveyEvent = new EventWaitHandle(false, EventResetMode.AutoReset);
this._messageReceiveEvent = new EventWaitHandle(false, EventResetMode.AutoReset);
this._sendLock = new object();
}
public TMessage Receive()
{
this.State = ChannelState.WaitingForSend;
this._messageReceiveyEvent.WaitOne();
this._messageReceiveEvent.Set();
this.State = ChannelState.Open;
return this._message;
}
public void Send(TMessage message)
{
lock (this._sendLock)
{
this._message = message;
this.State = ChannelState.WaitingForReceive;
this._messageReceiveyEvent.Set();
this._messageReceiveEvent.WaitOne();
}
}
// Properties
public bool CanReceive
{
get
{
return true;
}
}
public bool CanSend
{
get
{
return true;
}
}
public ChannelState State
{
get
{
lock (this._stateLock)
{
return this._state;
}
}
private set
{
lock (this._stateLock)
{
this._state = value;
}
}
}
}
- 1. Выполнение асинхронных операций
- 2. Последовательное выполнение асинхронных операций в Android
- 3. Производительность асинхронных операций
- 4. Цепочки асинхронных операций в iOS
- 5. Выполнение асинхронных операций в пределах одной и те же цепей
- 6. Выполнение 3 асинхронных операций (где второй зависит от состояния)
- 7. Идиоматический способ блокировки асинхронных операций
- 8. Несколько асинхронных операций с Делегаты
- 9. Очереди асинхронных операций через Promises
- 10. Разрешить генерацию асинхронных операций отключить
- 11. Используйте http status 202 для асинхронных операций
- 12. Выполнение операций
- 13. Выполнение 2 операций асинхронно
- 14. Выполнение параллельных асинхронных методов
- 15. Openresty: выполнение двух асинхронных задач
- 16. Обработка асинхронных операций на стороне сервера
- 17. Скрученные/выполнение асинхронных HTTP-запросов
- 18. WCF Restful Service - Внедрение асинхронных операций
- 19. Tomice дросселирует слишком много @ Асинхронных операций
- 20. Выполнение операций внутри группы
- 21. Задержка Выполнение ряда операций
- 22. Выполнение синхронных операций
- 23. Выполнение графиков операций hadoop
- 24. Выполнение операций в Pivot
- 25. Как ограничить Parallel.foreach для асинхронных операций?
- 26. python gio ждет выполнения асинхронных операций
- 27. Несколько асинхронных операций SqlClient - поиск хорошего примера
- 28. iOS - Объединение асинхронных операций на основе блоков
- 29. Как ждать результата асинхронных операций без ожидания?
- 30. Проверка завершения нескольких асинхронных сетевых операций
+1 Для получения в 4.0 :), я просто писал об этом. – TalentTuner
'Parallel.ForEach' блокирует вызывающий поток до тех пор, пока он не завершит их все, так что теперь это не поможет« обрабатывать очереди »асинхронно? или действительно ли вопрос действительно означает «параллельно», а не «асинхронно»? –
Это здорово, но, к сожалению, я использую .Net 3.5 – Sumee