2015-01-21 3 views
7

Мое приложение принимает пакеты из сети и отправляет их одному или нескольким «процессорам». (Каждый пакет принадлежит к заранее определенному «поток», который может быть идентифицирован, глядя на пакетных данных.)Одиночный производитель и несколько однопоточных потребителей

Существует в настоящее время один поток, который делает всю работу:

  1. извлечения пакетов из сети устройство
  2. идентифицировать процессоры для каждого пакета
  3. посылки пакета на свои процессоры

Входящие данные поступают со скоростью 20 миллионов пакетов в секо nd (10 Гбит/с 60-байтных пакетов.)

Это решение, однако, может поддерживать только небольшое количество потоков и процессоров. Например, в случае 10 потоков потери пакетов уже на 10-20%.

Поскольку шаг (3) является самым дорогим, я планирую делегировать эту работу пулу рабочих потоков.

Однако я должен быть осторожным, потому что сами процессоры не являются потокобезопасными. Таким образом, только один рабочий поток может одновременно отправлять пакеты одному процессору.

Это похоже на хороший вариант использования для программирования на основе задач. Но я не могу легко сопоставить шаблоны проектирования, описанные в документах TBB, с моей проблемой.

Итак, мой вопрос: как я могу организовать свои потребительские потоки, чтобы они равномерно распределяли пакеты на однопоточные процессоры?

Я не ожидал полностью выработанное решения, но я был бы счастлив только с вашими предложениями или случайными идеями :)

+1

Это хелла-тон данных. Просто говорю = P – WhozCraig

+0

Почему шаг 3 занимает так много времени? Это вопрос _dispatching_ для процессоров, или это потому, что он ждет завершения каждого процессора, прежде чем двигаться дальше? Существует ли требование для заказа на процессорах или требуется просто, чтобы все сообщения были видны, но не обязательно в полученном порядке? – Charlie

ответ

5

Я сделал некоторые встроенные программы, где мне приходилось иметь дело с относительно высокими пропускными способностями - не так быстро, как у вас здесь! Надеюсь, вы используете гораздо более мощное оборудование, чем я привык ... Есть несколько простых стратегий, которые должны применяться к вашей ситуации!

1. Очередь ввода/обработки и связанное с ней управление памятью имеют решающее значение.

Очередь для входящих данных должна быть очень эффективной, если у вас высокие скорости передачи данных. Вы должны выполнить минимальное количество обработки, иначе вы рискуете потерять данные с устройства. (Я привык к чтению данных с какого-то устройства с быстрым последовательным порядком с относительно небольшим буфером, поэтому существуют ограничения в реальном времени на то, как долго устройство можно оставить без чтения без потери данных. Это привело меня к привычке касающейся считывания с устройства как полностью автономной задачи, которая касается только данных чтения и ничего другого.)

Очень простая серия предварительно выделенных буферов фиксированного размера примерно такая же эффективная, как и она: есть очередь ' свободных "буферов и очереди заполненных буферов. Если вы используете связанный список без блокировки, сохранение этих списков может быть очень быстрым, а операции с enqueue/dequeue довольно распространены во многих ОС.

Избегайте использования malloc или другого динамического распределения, поскольку они имеют значительные (и часто непредсказуемые) накладные расходы, когда им необходимо управлять своими собственными структурами данных «свободных» и «выделенных» блоков. Они также могут выполнять блокировки, которые могут непредсказуемо блокировать потоки производителей или рабочих, если они одновременно освобождают или распределяют память примерно в одно и то же время. Вместо этого попробуйте найти подпрограммы нижнего уровня для выделения и отпускания целых страниц, предоставленных вашей ОС для ваших очередей (mmap на unixy-платформах, VirtualAllocEx). Обычно им приходится делать гораздо меньше работы, так как они используют функции MMU для отображения физических страниц ОЗУ и не имеют сложной структуры данных в памяти для поддержки, имеют более надежное время выполнения для каждого вызова и могут быть достаточно быстрым, чтобы расширить свой бесплатный список, если он работает на низком уровне.

В производителе не беспокойтесь о единицах, меньших, чем целые блоки. Возьмите бесплатный блок из очереди, упакуйте блок, полный данных, добавьте его в очередь, подлежащую обработке. Если вам нужно обеспечить, чтобы каждый пакет обрабатывался в течение фиксированного периода времени, или вам нужно иметь дело с «всплескными» скоростями передачи данных, тогда еще попробуйте прочитать полный буфер с вашего устройства ввода, но либо уменьшите размер блока до быть «разумным» временем или использовать тайм-аут и заполнять частично заполненные блоки для обработки и «заполнять» остаток каким-то нулевым пакетом. Я обнаружил, что это часто бывает быстрее, чем включать много кода для обработки частично заполненных буферов.

Если вы можете, очень аккуратно установите близость процессора и приоритет потока для вашей производственной нити. В идеале вы хотите, чтобы поток производителей имел более высокий приоритет, чем любой из потребительских потоков, и привязывался к определенному ядру. Ничто не должно препятствовать чтению входящих данных из-за нехватки места в буфере.

2.Обработка

Вы сказали, что есть:

  1. Несколько потоков
  2. Несколько «процессоров», которые не поточно-

Что бы полезно сделать здесь является параллельный запуск процессоров на пакетах, но из вашего вопроса не ясно, насколько это возможно.

Являются ли процессоры потокобезопасными через потоки? (Можем ли мы запустить процессор в двух разных потоках, если они работают на двух разных потоках?)

Являются ли процессоры потокобезопасными для разных процессоров в одном потоке? (Можно ли запустить несколько процессоров в одном потоке в отдельных потоках?)

Нужно ли работать процессорам в определенном порядке?

Не зная этого, есть еще некоторые общие вещи, которые являются полезными советами.

У вас есть второй поток, который предназначен для чтения полных буферов от производителя и отправки их соответствующим процессорам (в других потоках), а затем возвращает полный буфер обратно в «пустую» очередь для обработки. В то время как вы теряете определенную прямолинейную эффективность (один поток, выполняющий чтение и диспетчеризацию, будет немного «быстрее», чем два), по крайней мере, этот способ не будет блокировать чтение с устройства ввода, если есть мгновенная блокировка.

Создайте или найдите библиотеку, позволяющую распределять задания в пуле потоков, особенно если у вас много процессоров по сравнению с количеством потоков, которые вы можете запускать параллельно. Сравнительно просто реализовать какую-то очередность заданий, которая позволяет несколько простых отношений между заданиями (например, «эта работа требует, чтобы работа X и Y была выполнена первой», «это задание невозможно запустить параллельно с любым другим заданием, которое использует тот же процессор "). Даже простая стратегия, в которой менеджер заданий запускает первое выполняемое задание в первом доступном потоке, может быть очень эффективным.

Старайтесь избегать копирования. Если процессоры могут обрабатывать пакет «на месте», не копируя их из буфера, вы сохранили много бессмысленных циклов. Даже если вам нужно скопировать, наличие нескольких потоков, копирующих данные из общего «общего доступа», лучше, чем однократное копирование и отправка сообщений нескольким потокам.

Если проверка того, должен ли процессор быть запущен для данного пакета, очень быстр, тогда вам может быть лучше иметь несколько заданий, каждый из которых проверяет, должен ли он выполнять некоторую обработку. Вместо того, чтобы указать один поток, какие процессоры должны запускать на каких пакетах, может быть быстрее иметь несколько потоков, по одному для каждого процессора или группы процессоров, каждый раз проверять каждый пакет, независимо от того, должен ли он работать. Это сводится к мысли о том, что простая проверка ресурса только для чтения несколько раз в нескольких потоках может занять меньше времени, чем выполнение синхронизации между потоками.

Если вы можете запускать процессоры параллельно, если они обрабатывают данные из разных потоков, то сделать проход через данные, чтобы получить список потоков, а затем начать работу для каждого потока - хорошая идея. Вы также можете собрать список пакетов, принадлежащих каждому потоку, но опять же, это компромисс между тем, насколько быстро задание может проверять каждый пакет и время, необходимое для сбора этого списка в одном потоке, и передавать их каждому их соответствующих рабочих мест.

Надеюсь, некоторые из этих стратегий могут быть полезны в вашем случае!Сообщите нам, как это работает ... это чертовски много данных, которые вам нужно обработать, и было бы хорошо знать, что такое и не эффективно для более быстрой скорости передачи данных, чем я привык к ! Удачи!

1

Вот моя идея для возможного решения.

Предположим, что у нас есть n процессоров. Давайте представим n мьютексов, по одному на процессор. Давайте также представим очередь для пакетов. Все входящие пакеты помещаются в эту очередь.

рабочий поток работает следующим образом:

  1. Захват пакетов из очереди входящих пакетов.
  2. Определите необходимый процессор.
  3. Попробуйте приобрести соответствующий мьютекс. Если захват блокировки завершается успешно, обработайте пакет. В противном случае, вновь поставить в очередь и перейти к 1.
  4. После обработки сделано, перейдите к шагу 1.

Возможные недостатки:

  1. Пакеты повторно помещён означает, что они могут быть задержаны/обработанный вне порядка, который может быть для вас нарушением сделки (не уверен).
  2. Конфликт в очереди, вероятно, будет высоким. Вероятно, вы захотите посмотреть на использование для этого блокировки.
  3. В очереди явно потребляется дополнительная память, я не знаю, есть ли у вас запасная память.

EDIT: больше мыслей о потреблении памяти - конечно, можно установить верхний предел объема памяти, которую может потреблять очередь, - тогда возникает вопрос, что делать, когда у вас заканчивается память. Я бы сказал, что самое лучшее, что можно сделать, это просто начать отбрасывать пакеты (у меня сложилось впечатление, что некоторые из них не имеют большого значения в вашем случае), пока очередь не истощится немного.

В некоторой степени это связано с тем, что я думаю, что хорошая реализация очереди для этого варианта использования должна избегать динамического распределения памяти любой ценой - предварительно распределите память и убедитесь, что на пути критического кода нет назначений.

+1

Шаг 3 умный. Спасибо :) – StackedCrooked

+0

tbb имеет параллельную очередь фиксированного размера, которая может выполнять распределение перед своими микро-очередями. – BlamKiwi

1

Почему вы не можете использовать несколько очередей, по одному на каждый процессор? Эти очереди могут быть блокируемыми (без мьютексов).

  1. выборку пакетов от сетевого устройства
  2. идентифицировать процессоры для каждого пакета (PID)
  3. нажимную пакета в очередь [PID]
  4. работник: пакет процесса из очереди [к]

Для подобной проблемы я использую опрос блокировочных кольцевых буферов с автоматической перезаписью самых старых пакетов.