Проведя романтический вечер с проблемой и попробовав разные вещи, предложенные Крисом, я обнаружил, что есть еще одна вещь, которую вы должны сделать, чтобы она работала так, как должна.
В частности, да, вам необходимо установить предварительную выборку по адресу очереди потребителя:
sbc.UseRabbitMq(
f =>
f.ConfigureHost(
new Uri("rabbitmq://guest:[email protected]/masstransit_consumer"),
c =>
{
})
);
int pf = 20; // prefetch
// set consumer prefetch (required!)
sbc.ReceiveFrom(string.Format("rabbitmq://guest:[email protected]/masstransit_consumer?prefetch={0}", pf));
Но это еще не достаточно.
Ключ доступен в коде инструмента mtstress
. Крис упоминает в своем комментарии ниже своего ответа. Оказалось, что вызов инструмента:
int _t, _ct;
ThreadPool.GetMinThreads(out _t, out _ct);
ThreadPool.SetMinThreads(pf, _ct);
Добавление этого кода в мой код решает проблему. Интересно, хотя, почему это не требуется с MSMQ транспорта, хотя ...
Update # 1
После дальнейших исследований я обнаружил возможный виновник. Он находится в ServiceBusBuilderImpl
.
Существует способ повышения лимита, ConfigureThreadPool
.
Проблема заключается в том, что она вызывает CalculateRequiredThreads
, которая должна возвращать количество требуемых потоков. К сожалению, последний возвращает значение минус как на моем клиенте Windows 7, так и на моем Windows Server. Таким образом, ConfigureThreadPool
фактически ничего не делает, поскольку отрицательное значение затем игнорируется при вызове ThreadPool.SetMin/MaxThreads
.
Как насчет этого отрицательного значения? Кажется, CalculateRequiredThreads
звонки ThreadPool.GetMinThreads
и ThreadPool.GetAvailableThreads
и использует формулу придумал с количеством необходимых потоков:
var requiredThreads = consumerThreads + (workerThreads - availableWorkerThreads);
Проблема здесь состоит в том, что на моих машинах это эффективно делает:
40 (my limit) + 8 (workerThreads) - 1023 (availableThreads)
, какой из курс возвращается
-975
Вывод: приведенный выше код из внутренних органов массового транспорта кажется неправильным. Когда я вручную поднимаю предел заранее, ConfigureMinThreads
уважает его (поскольку он устанавливает предел только в том случае, если он превышает значение чтения).
Не устанавливая лимит вручную заранее, предел не может быть установлен, и, таким образом, код выполняет столько потоков, сколько ограничение по умолчанию для пула потоков (которое, как представляется, равно 8 на моей машине).
Видимо кто-то предположил, что это формула даст
40 + 8 - 8
в сценарии по умолчанию. Почему GetMinThreads
и GetAvailableThreads
возвратные такие несвязанные значения еще предстоит определить ...
Update # 2
Изменение
static int CalculateRequiredThreads(int consumerThreads)
{
int workerThreads;
int completionPortThreads;
ThreadPool.GetMinThreads(out workerThreads, out completionPortThreads);
int availableWorkerThreads;
int availableCompletionPortThreads;
ThreadPool.GetAvailableThreads(out availableWorkerThreads, out availableCompletionPortThreads);
var requiredThreads = consumerThreads + (workerThreads - availableWorkerThreads);
return requiredThreads;
}
в
static int CalculateRequiredThreads(int consumerThreads)
{
int workerThreads;
int completionPortThreads;
ThreadPool.GetMaxThreads(out workerThreads, out completionPortThreads);
int availableWorkerThreads;
int availableCompletionPortThreads;
ThreadPool.GetAvailableThreads(out availableWorkerThreads, out availableCompletionPortThreads);
var requiredThreads = consumerThreads + (workerThreads - availableWorkerThreads);
return requiredThreads;
}
решает проблему. Оба возвращают здесь 1023, а вывод формулы - правильное количество ожидаемых потоков.
Вы не должны использовать SetConcurrentReceiverLimit, который предназначен только для конкретных случаев использования с MSMQ и транзакционными очередями. –
@IanCotterill: удалось ли решить проблему? У нас точно такая же проблема, и эта SO-нить кажется единственным доказательством того, что это действительно происходит. И, как и ваш, RabbitMQ + MT = колпачок из 10 одновременных потоков, несмотря на ограничение лимита на что-то большее. При переключении на MSMQ нет проблем. –