2014-10-31 3 views
2

Я использую RabbitMQ в .net, и я вижу странную проблему, когда я отбрасываю 100 сообщений в очереди. Он обрабатывает приблизительно 50 сообщений, затем метод Dequeue() просто зависает. Если я перезапущу службу, она обрабатывает остальные элементы.RabbitMQ обрабатывает только 50 сообщений, затем блокирует

EDIT: Он обрабатывает ровно 50% очереди. Когда я добавляю 1000 сообщений, он обрабатывает только 500. Даже когда однопоточная

Что мне здесь не хватает?

private void InitializeAgent() { 
     var agentFactory = new ConnectionFactory() { HostName = "localhost" }; 
     agentConnection = agentFactory.CreateConnection(); 
     agentChannel = agentConnection.CreateModel(); 
     var ok = agentChannel.QueueDeclare(GetType().Name, true, false, false, null); 
     consumer = new QueueingBasicConsumer(agentChannel); 
     agentChannel.BasicConsume(GetType().Name, false, consumer); 
    } 

    public void DequeueMessages() { 
     ThreadPool.SetMaxThreads(200, 200); 
     ThreadPool.SetMinThreads(200, 200); 
     var ea = consumer.Queue.Dequeue(); 
     ThreadPool.QueueUserWorkItem(ProcessWorkInThread, ea); 
    } 

    public void AgentTask() { 
     var instance = factory.GetInstance(threadItem); 

     while (true) 
      DequeueMessages(); 
    } 

    private void ProcessWorkInThread(object state) { 
     var ea = state as BasicDeliverEventArgs; 

     var message = Encoding.UTF8.GetString(ea.Body); 

     var settings = new JsonSerializerSettings(); 
     settings.ContractResolver = new DefaultContractResolver() { DefaultMembersSearchFlags = BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public }; 
     var item = JsonConvert.DeserializeObject<TEntity>(message, settings); 

     Thread.Sleep(10000) //simulate work 
     lock (agentChannel)    
      agentChannel.BasicAck(ea.DeliveryTag, false);    
    } 
+0

довольно уверен, что IModel не является потокобезопасным. в руководстве пользователя .net указано, что IMODEL не должен делиться между потоками. – user1450877

+0

@ user1450877 Могу я просто добавить замок вокруг dequeue & ack? –

+0

должны использовать один IModel для потока. – Gabriele

ответ

0

Это должно быть .NET версия клиента вопрос, используя 3.4.0, следующий код работает, как ожидалось.

static readonly ConnectionFactory Factory = new ConnectionFactory { HostName = "localhost" }; 
static readonly IConnection Connection = Factory.CreateConnection(); 
static QueueingBasicConsumer consumer; 
static IModel agentChannel; 

static CancellationTokenSource _tokenSource; 

static void Main(string[] args) 
{ 
    _tokenSource = new CancellationTokenSource(); 

    const string queueName = "testQueue"; 
    agentChannel = Connection.CreateModel(); 
    agentChannel.QueueDeclare(queueName, true, false, false, null); 
    agentChannel.QueueBind(queueName, "testExchange", ""); 

    consumer = new QueueingBasicConsumer(agentChannel); 
    agentChannel.BasicConsume(queueName, false, consumer); 

    while (!_tokenSource.Token.IsCancellationRequested) 
    { 
     DequeueMessages(); 
    } 
    Console.ReadLine(); 
    _tokenSource.Cancel(); 
} 

static void DequeueMessages() 
{ 
    ThreadPool.SetMaxThreads(200, 200); 
    ThreadPool.SetMinThreads(200, 200); 
    var ea = consumer.Queue.Dequeue(); 
    ThreadPool.QueueUserWorkItem(ProcessWorkInThread, ea); 
} 

static void ProcessWorkInThread(object state) 
{ 
    var ea = state as BasicDeliverEventArgs; 

    var message = Encoding.UTF8.GetString(ea.Body); 

    var settings = new JsonSerializerSettings(); 
    settings.ContractResolver = new DefaultContractResolver() 
    { 
     DefaultMembersSearchFlags = BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public 
    }; 
    var item = JsonConvert.DeserializeObject<string>(message, settings); 
    Console.WriteLine(item); 
    Thread.Sleep(10000); //simulate work 
    lock (agentChannel) 
     agentChannel.BasicAck(ea.DeliveryTag, false); 
} 
Смежные вопросы