2014-06-02 2 views
3

У меня есть служба массового транзита, настроенная для работы с RabbitMQ, и я не могу понять, как увеличить скорость потребления потребителя - кажется, она имеет жесткую колпачку при 10 сообщениях, полученных в секунду.Скорость передачи сообщений MassTransit на уровне 10

Я пробовал перечисленные здесь шаги: https://groups.google.com/forum/#!msg/masstransit-discuss/plP4n2sixrY/xfORgTPqcwsJ, без успеха - установка предварительной выборки и одновременных потребителей до 25 не делает ничего, кроме увеличения подтвержденных сообщений, но не увеличивает скорость загрузки сообщений ,

Мой конфиг выглядит следующим образом:

ServiceBusFactory.ConfigureDefaultSettings(x => 
    { 
     x.SetConcurrentReceiverLimit(25); 
     x.SetConcurrentConsumerLimit(25); 
    }); 

_bus = ServiceBusFactory.New(
    sbc => 
     { 
      sbc.UseRabbitMq(x => 
       x.ConfigureHost(
        "rabbitmq://localhost/Dev/consume?prefetch=25", 
        y => 
         { 
          y.SetUsername(config.Username); 
          y.SetPassword(config.Password); 
         })); 
      sbc.UseLog4Net(); 
      sbc.ReceiveFrom("rabbitmq://localhost/Dev/consume?prefetch=25"); 
      sbc.Subscribe(x => RegisterConsumers(x, container)); 
      sbc.UseJsonSerializer(); 
      sbc.SetConcurrentConsumerLimit(25); 
     }); 

Я устанавливаю одновременное ограничение потребителя в двух местах, я не уверен, нужно ли мне установить его на значение по умолчанию или в конфигурации шины и потребители регистрируются через единицу - я пропустил подписку потребителя, поскольку все подписчики получают.

Я немного смущен относительно того, есть ли что-то еще, что мне нужно установить, или мне нужно изменить порядок, в котором я устанавливаю конфиги.

Любая помощь очень ценится.

+0

Вы не должны использовать SetConcurrentReceiverLimit, который предназначен только для конкретных случаев использования с MSMQ и транзакционными очередями. –

+0

@IanCotterill: удалось ли решить проблему? У нас точно такая же проблема, и эта SO-нить кажется единственным доказательством того, что это действительно происходит. И, как и ваш, RabbitMQ + MT = колпачок из 10 одновременных потоков, несмотря на ограничение лимита на что-то большее. При переключении на MSMQ нет проблем. –

ответ

0

Какую работу выполняет ваш потребитель? Если он работает достаточно быстро, вполне вероятно, что среда выполнения .NET не должна создавать дополнительные потоки для обработки скорости входящего сообщения.

У нас есть много систем в производстве, в которых используются указанные счетчики, где мы сопоставляем потребительский предел с подсчетом предварительной выборки, а во всех этих случаях под нагрузкой количество подтвержденных сообщений, отображаемое RabbitMQ, равно этим настройкам. Обычно мы видим почти такое же количество сообщений обработки потоков. Первоначально среда выполнения .NET является консервативной в распределенных потоках, но она быстро приближается к полному количеству потоков, когда потребители просто ждут на удаленной операции, такой как HTTP-запрос или команда SQL.

Если есть область потребителя, которая имеет одинарную резьбу, это может ограничивать масштабирование нитей на основе этого узкого места, поэтому убедитесь, что ваша модель резьбы также правильно настроена.

+0

мы наблюдали точно ту же проблему, что и при работе с Mass Transit - ограничение, которое ограничивает потребителя 10 параллельными сообщениями с помощью RabbitMQ. К сожалению, для нас это является недостатком. Временное решение заключалось в том, чтобы на мгновение вернуться к MSMQ, но это не вариант в долгосрочной перспективе из-за других потребителей, которые напрямую привязаны к RabbitMQ. Мы также переключились на MT 2.10, чтобы иметь возможность использовать недавно введенный 'SetPrefetchCount', но все же без эффекта. Я мог бы описать наш сценарий более подробно, но есть ли способ связаться с вами и получить шанс на любую помощь (кроме SO)? –

+1

Если вы ограничены до 10, вам нужно увеличить потребительский предел для шины, а также добавить '' ''? PrefetchCount = 50''' в свой URI очереди. Новый метод предназначен для переопределения во время выполнения, теперь подход URL работает некоторое время. 50 - всего лишь пример, FYI. Это должно быть как минимум ваш потребительский предел. –

+0

Попробуй, конечно, и дайте знать. Это похоже на наш последний шанс. –

5

Проведя романтический вечер с проблемой и попробовав разные вещи, предложенные Крисом, я обнаружил, что есть еще одна вещь, которую вы должны сделать, чтобы она работала так, как должна.

В частности, да, вам необходимо установить предварительную выборку по адресу очереди потребителя:

sbc.UseRabbitMq(
       f => 
        f.ConfigureHost(
         new Uri("rabbitmq://guest:[email protected]/masstransit_consumer"), 
         c => 
         { 
         }) 
       ); 

int pf = 20; // prefetch 

// set consumer prefetch (required!) 
sbc.ReceiveFrom(string.Format("rabbitmq://guest:[email protected]/masstransit_consumer?prefetch={0}", pf)); 

Но это еще не достаточно.

Ключ доступен в коде инструмента mtstress. Крис упоминает в своем комментарии ниже своего ответа. Оказалось, что вызов инструмента:

int _t, _ct; 
ThreadPool.GetMinThreads(out _t, out _ct); 
ThreadPool.SetMinThreads(pf, _ct); 

Добавление этого кода в мой код решает проблему. Интересно, хотя, почему это не требуется с MSMQ транспорта, хотя ...

Update # 1

После дальнейших исследований я обнаружил возможный виновник. Он находится в ServiceBusBuilderImpl.

Существует способ повышения лимита, ConfigureThreadPool.

Проблема заключается в том, что она вызывает CalculateRequiredThreads, которая должна возвращать количество требуемых потоков. К сожалению, последний возвращает значение минус как на моем клиенте Windows 7, так и на моем Windows Server. Таким образом, ConfigureThreadPool фактически ничего не делает, поскольку отрицательное значение затем игнорируется при вызове ThreadPool.SetMin/MaxThreads.

Как насчет этого отрицательного значения? Кажется, CalculateRequiredThreads звонки ThreadPool.GetMinThreads и ThreadPool.GetAvailableThreads и использует формулу придумал с количеством необходимых потоков:

var requiredThreads = consumerThreads + (workerThreads - availableWorkerThreads); 

Проблема здесь состоит в том, что на моих машинах это эффективно делает:

40 (my limit) + 8 (workerThreads) - 1023 (availableThreads) 

, какой из курс возвращается

-975 

Вывод: приведенный выше код из внутренних органов массового транспорта кажется неправильным. Когда я вручную поднимаю предел заранее, ConfigureMinThreads уважает его (поскольку он устанавливает предел только в том случае, если он превышает значение чтения).

Не устанавливая лимит вручную заранее, предел не может быть установлен, и, таким образом, код выполняет столько потоков, сколько ограничение по умолчанию для пула потоков (которое, как представляется, равно 8 на моей машине).

Видимо кто-то предположил, что это формула даст

40 + 8 - 8 

в сценарии по умолчанию. Почему GetMinThreads и GetAvailableThreads возвратные такие несвязанные значения еще предстоит определить ...

Update # 2

Изменение

static int CalculateRequiredThreads(int consumerThreads) 
    { 
     int workerThreads; 
     int completionPortThreads; 
     ThreadPool.GetMinThreads(out workerThreads, out completionPortThreads); 
     int availableWorkerThreads; 
     int availableCompletionPortThreads; 
     ThreadPool.GetAvailableThreads(out availableWorkerThreads, out availableCompletionPortThreads); 
     var requiredThreads = consumerThreads + (workerThreads - availableWorkerThreads); 
     return requiredThreads; 
    } 

в

static int CalculateRequiredThreads(int consumerThreads) 
    { 
     int workerThreads; 
     int completionPortThreads; 
     ThreadPool.GetMaxThreads(out workerThreads, out completionPortThreads); 
     int availableWorkerThreads; 
     int availableCompletionPortThreads; 
     ThreadPool.GetAvailableThreads(out availableWorkerThreads, out availableCompletionPortThreads); 
     var requiredThreads = consumerThreads + (workerThreads - availableWorkerThreads); 
     return requiredThreads; 
    } 

решает проблему. Оба возвращают здесь 1023, а вывод формулы - правильное количество ожидаемых потоков.

+0

Должен ли я упомянуть, что Крис зафиксировал это в MT 2.10.2. –

Смежные вопросы