2009-06-20 3 views
3

У меня есть приложение, которое основано на прототипе потока потока сообщений. Всякий раз, когда есть действие, которое может блокироваться, оно реализуется как действие «обратный вызов при завершении/триггер evnet», поэтому он не останавливает исполняемый поток.Как сохранить сообщение в ожидании?

Хотя эта техника подходит для большинства случаев, бывают ситуации, когда она становится очень неудобной и чрезмерно усложняет код.

То, что я хотел бы иметь, заключается в том, чтобы продолжать обрабатывать события в ожидании прозрачным способом, не нарушая работу функции до частей предварительного ожидания.

Как мне это сделать?

У меня было два варианта в виде:

  1. Выполнить цикл обработки сообщений из функции исполняющей во время ожидания.
  2. Создайте новую рабочую нить во время ожидания и завершите ее (правильно) при возобновлении.

Оба варианта имеют свои недостатки, чтобы назвать несколько:

Для 1:

  • Может потенциально привести к переполнению стека.
  • Возможно, это может быть заблокировано.
  • Если внутреннее сообщение приводит к ожиданию завершения второго события, а внешнее событие завершается тем временем, внешняя функция не может продолжаться до тех пор, пока второе событие не завершится, и эта ситуация может расширяться.

Вариант 2 может просто оказаться в создании большего количества потоков.

Конечно, могут быть другие варианты, о которых я не думал.

EDIT: Язык - это C++, поэтому функции не могут быть удалены из и в удобной (переносной?) Форме. Платформа - это Windows (API), хотя я не думаю, что это актуально.

ответ

0

РЕДАКТИРОВАТЬ: Вы говорите, что не хотите «взломать функцию в до/после ожидания».

На каких языках вы разрабатываете? Если у него есть продолжения (yield return в C#), то это дает возможность написать код, который выглядит процедурным, но который можно легко приостановить до тех пор, пока операция блокировки не завершит обратный вызов завершения.

Вот статья об идее: http://msdn.microsoft.com/en-us/magazine/cc546608.aspx

UPDATE:

Unfortunatly, язык C++

Это сделало бы большую футболку лозунг.

Хорошо, так что вам может показаться полезным структурировать ваш последовательный код в качестве государственной машины, чтобы он стал прерывать/возобновлять работу.

например. Ваша боль необходимости написать две функции, тот, который инициирует и тот, который действует в качестве обработчика для события завершения:

void send_greeting(const std::string &msg) 
{ 
    std::cout << "Sending the greeting" << std::endl; 
    begin_sending_string_somehow(msg, greeting_sent_okay); 
} 

void greeting_sent_okay() 
{ 
    std::cout << "Greeting has been sent successfully." << std::endl; 
} 

Ваша идея была ждать:

void send_greeting(const std::string &msg) 
{ 
    std::cout << "Sending the greeting" << std::endl; 

    waiter w; 
    begin_sending_string_somehow(msg, w); 
    w.wait_for_completion(); 

    std::cout << "Greeting has been sent successfully." << std::endl; 
} 

В этом примере, waiter перегружает operator(), поэтому он может служить обратным вызовом, и wait_for_completion как-то зависает, пока не увидит, что вызван оператор().

Я предполагаю, что второй параметр begin_sending_string_somehow является параметром шаблона, который может быть любым вызываемым типом, не принимающим никаких параметров.

Но, как вы говорите, у этого есть недостатки. В любой момент, когда поток ожидает такого, вы добавили еще один потенциальный тупик, и вы также потребляете «ресурс» всего потока и его стека, что означает, что больше потоков потребуется создать в другом месте, чтобы можно было выполнить работу , что противоречит всей точке пула потоков.

Так вместо этого, написать класс:

class send_greeting 
{ 
    int state_; 
    std::string msg_; 

public: 
    send_greeting(const std::string &msg) 
     : state_(0), msg_(msg) {} 

    void operator() 
    { 
     switch (state_++) 
     { 
      case 0: 
       std::cout << "Sending the greeting" << std::endl; 
       begin_sending_string_somehow(msg, *this); 
       break; 

      case 1: 
       std::cout << "Greeting has been sent successfully." 
          << std::endl; 
       break; 
     } 
    } 
}; 

Класс реализует оператор вызова функции (). Каждый раз, когда он вызывается, он выполняет следующий шаг в логике. (Конечно, являясь таким тривиальным примером, теперь это в основном шум управления состоянием, но в более сложном примере с четырьмя или пятью состояниями это может помочь прояснить последовательный характер кода).

Проблема:

  • Если событие функции обратного вызова подписи есть специальные параметры, вам необходимо добавить еще одну перегрузки operator(), которая хранит параметры в дополнительных полях, а затем вызывает на перегрузку без параметров. Затем он начинает запутываться, потому что эти поля будут доступны во время компиляции в исходном состоянии, даже если они не имеют смысла во время выполнения в этом состоянии.

  • Как объекты класса построены и удалены? Объект должен выжить до тех пор, пока операция не завершится или не будет прекращена ... центральная ошибка C++. Я бы рекомендовал реализовать общую схему управления ею. Создайте список «вещей, которые нужно удалить» и убедитесь, что это происходит автоматически в определенных безопасных точках, т. Е. Попытайтесь как можно ближе подойти к GC. Чем дальше вы от этого, тем больше у вас будет утечки.

+0

Unfortunatly, язык C++: P –

+0

Спасибо за подробный ответ. Это интересная техника, однако я не уверен, что она соответствует моей конкретной проблеме. Смотрите, одно из требований, как я уже упоминал, является прозрачностью, поэтому send_greeting должен выглядеть так (псевдо): send_grt (msg) {...; send_string (MSG); сообщение cout << "отправлено";}, поэтому ожидающая часть находится внутри send_string, а не send_greeting. Ваше решение работает, если код написан как конечный автомат на нижнем уровне, в то время как я ищу решение верхнего уровня. Во всяком случае, хорошая идея, и я уверен, что буду использовать ее в другом месте. –

+0

Но тогда вы просто мечтаете о невозможном. Если вы * требуете *, чтобы он выглядел как обычная синхронная функция для вызывающего, то вы * требовали * техники ожидания точно так же, как вы ее первоначально описали, или как показано в моем примере с гипотетическим классом «официант». Это почти все, что нужно. –

0

Не зная больше о вашей конкретной области применения (т.е. как долго сообщения требуется для обработки и т.д ..) там будет много handwaving:

  • ли это управляемый или неуправляемый C++?

  • Какой тип ThreadPool вы используете?

    • QueueUserWorkItem?
    • Ваш собственный пул через CreateIoCompletionPort?
    • Или Vista's SubmitThreadpoolWork?

Я думаю, что платформа несколько актуальна как природа пула потоков имеет важное значение.

Например:

Если вы используете (Completion Ports) для пула потоков (т.е. CreateIoCompletionPort). У вас есть некоторый контроль над тем, сколько потоков выполняется одновременно (и, следовательно, от того, сколько всего потоков создано в конечном итоге). Если вы укажете максимальное количество одновременных потоков, скажем 4. Windows попытается разрешить одновременное одновременное выполнение 4 потоков. Если все 4 потока заняты обработкой, и вы ставите пятый элемент, тогда окна не позволят этому элементу работать до тех пор, пока не закончится один из 4 (повторное использование потока). Единственный раз, когда это правило нарушается, - это когда потоки блокируются (т. Е. Ждут ввода-вывода), тогда больше потоков разрешено запускать.

Это важная вещь для понимания портов завершения, и почему платформа важна. Очень сложно реализовать что-то подобное без участия ядра. Знание разницы между занятыми потоками и заблокированными потоками требует доступа к состояниям Thread. Дополнения портов очень эффективны в отношении количества переключателей контекста в ядре.

Возвращаясь к вашему вопросу:

Казалось бы, что вы должны иметь один поток для обработки/отправка сообщений и обработка сообщения все обрабатывается толкая рабочий на пул потоков. Пусть порты завершения обрабатывают балансировку нагрузки и параллелизм. Цикл обработки сообщений никогда не будет блокироваться и может продолжать обрабатывать сообщения.

Если скорость входящих сообщений намного превышает вашу способность обрабатывать их, вам, вероятно, придется обратить внимание на размер очереди и блокировать, когда она станет слишком большой.

+0

Пул потоков использует собственную очередь и механизм schelduing, а C++ неуправляем. Однако, я думаю, он может быть преобразован в порты завершения. Должен признаться, что у меня очень мало опыта с портами завершения, и кажется, что они несколько недооценены, поэтому, если бы вы могли предложить хорошую справочную ссылку, это было бы полезно (MSDN сделала несколько неряшливую работу за один раз). Я, однако, не понял твое предложение. Если у меня есть только один поток для отправки порта и распространения работы, то порты завершения фактически не обрабатывают балансировку для меня. -----> –

+0

<----- Из того, что я знаю о CP, у меня может быть больше потоков в пуле, чем параллелизм, поэтому блокированные потоки будут дополняться дополнительными потоками. Вопрос в том, как я могу динамически расширять и сокращать пул, поэтому в конечном итоге всегда будут готовы потоки к действию? –

+0

Если вы указали количество потоков, используемых в настоящее время, вы можете увеличить количество потоков. В основном, когда GetQueuedCompletionStatus возвращается, увеличивайте счетчик, отправляйте свою функцию обработки, а затем, когда она возвращает декремент счетчика. Если счетчик превышает количество потоков в вашем пуле, добавьте его. Идея состоит в том, чтобы всегда держать хотя бы дополнительный поток в пуле, ожидая GetQueuedCompletionStatus. Сокращение пула - еще одна история. Вам нужно будет решить, сколько лишних потоков слишком много. Вы можете использовать возраст или число, чтобы определить, когда уничтожать лишние потоки. – GrendleM

0

Кажется, ваша проблема фундаментальна и не связана с C++. Другие языки, возможно, лучше скрывают использование стека, но пока вы не вернулись из Foo(), вам нужен стек вызовов для Foo(). И если вы также выполняете Bar(), для этого также требуется столбец.

Нитки - отличный подход к этому, поскольку каждая нить снабжена собственным стоп-сигналом. Continuations - это умный, но сложный способ сэкономить стоп-кары, поэтому, если это доступно, это тоже вариант. Но если вы этого не хотите, вам придется поработать с одним столом.

Daling с помощью одного вызова требует повторного размещения. Здесь нет общего ответа на то, что возможно. В общем, у вас будет набор сообщений M1..Mx, которые обрабатываются функциями F1 ... Fy, с некоторым зависящим от приложения и, возможно, зависящим от состояния отображением. При повторном цикле сообщений вы можете выполнять Fi, когда получаете Mj. Теперь проблема в том, что делать. Не все функции F1 ... Fn могут быть вызваны; в частности, сам Fi не может быть вызван. Однако некоторые другие функции также могут быть недоступны, например. потому что они обмениваются ресурсами. Это зависит от приложения.

Если обработка Mj требует любой из этих недоступных функций, вы должны отложить ее. Можете ли вы принять следующее сообщение в очереди? Опять же, это зависит от реализации, и это может даже относиться к типу и контенту. Если сообщения достаточно независимы, их можно выполнить не по порядку.Это быстро становится довольно сложным - чтобы определить, можно ли принять N-е сообщение в очереди, вы должны проверить, может ли он быть выполнен из-за порядка по отношению к предыдущим сообщениям N-1.

Язык может помочь вам, не скрывая зависимости, но в конце вы должны принять явные решения. Там нет серебряной пули.

1

Для портативных C++ этого не будет, но поскольку вы упомянули о своей платформе Windows, почему бы не использовать MsgWaitForMultipleObjects? Его цель - позволить вам делать именно то, что говорит ваш вопрос, - продолжайте накачивать сообщения во время ожидания.

0

Ваша проблема в синхронизации нитей? Если это ваша проблема, почему бы не использовать мьютекс? Его можно обернуть интерфейсом. Фактически вы можете использовать идиом PIMPL, чтобы сделать мьютекс портативным.

http://msdn.microsoft.com/en-us/library/system.threading.mutex(VS.71).aspx

Смежные вопросы