2015-05-09 6 views
0

Я предваряю это, сказав, что я впервые вникаю в многопоточность. Несмотря на то, что вы много читаете о параллелизме и синхронизации, я не всегда вижу решение для требований, которые мне были даны.Как отправить сигнал/данные из рабочего потока в основной поток?

Используя C++ 11 и Boost, я пытаюсь выяснить, как отправлять данные из рабочего потока в основной поток. Рабочий поток генерируется в начале приложения и непрерывно контролирует свободную от блокировки очередь. Объекты заполняют эту очередь различными интервалами. Эта часть работает.

После того, как данные доступны, его необходимо обработать основным потоком, так как другой сигнал будет отправлен в остальную часть приложения, которое не может быть на рабочем потоке. Это то, с чем я столкнулся.

Если мне нужно заблокировать основной поток через мьютекс или переменную условия до тех пор, пока рабочий поток не будет выполнен, как это улучшит отзывчивость? Я мог бы просто остаться с одним потоком, чтобы у меня был доступ к данным. Я должен что-то упустить.

Я написал пару вопросов, считая, что Boost :: Asio - это путь. Существует пример того, как сигналы и данные могут быть отправлены между потоками, а также ответы показывают, вещи быстро получить чрезмерно сложными, и это не работает отлично:

How to connect signal to boost::asio::io_service when posting work on different thread?

Boost::Asio with Main/Workers threads - Can I start event loop before posting work?

После разговора с некоторым коллегам было предложено использовать две очереди - один вход, один выход. Это будет в общем пространстве, и очередь вывода будет заполнена рабочим потоком. Рабочий поток всегда идет, но должен быть таймер, возможно, на уровне приложения, который заставит основной поток исследовать очередь вывода, чтобы увидеть, есть ли какие-либо ожидающие решения задачи.

Любые идеи о том, куда я должен обратить свое внимание? Существуют ли какие-либо методы или стратегии, которые могут работать для того, что я пытаюсь сделать? Я буду смотреть на Таймеров.

Спасибо.

Редактировать: Это производственный код для плагиновой системы, которая обрабатывает результаты моделирования после обработки. Сначала мы используем C++ 11, а затем Boost. Мы используем стопку без блокировки ::. Приложение делает то, что мы хотим, в одном потоке, но теперь мы пытаемся оптимизировать, где мы видим, что есть проблемы с производительностью (в данном случае расчет происходит через другую библиотеку). Основной поток имеет много обязанностей, включая доступ к базе данных, поэтому я хочу ограничить то, что на самом деле делает рабочий поток.

Update: Я уже успешно опробовал зЬй :: нитки, чтобы запустить рабочий поток, который изучает замок Повысьте :: бесплатно очередь и обрабатывает задачи поставили ее в это шаг 5 в @ Pressacco в ответ, что я». m есть проблема. Любые примеры, возвращающие значение в основной поток, когда рабочий поток завершается и информирует основной поток, а не просто ждет завершения работника?

+0

Что еще является основной темой? – Yakk

ответ

0

Обратный вывод привел меня в правильном направлении за то, что мне было нужно. Решение было явно проще, чем использование сигналов/слотов или Boost :: Asio, как я уже пытался. У меня есть две незакрепленные очереди, одна для ввода (в рабочем потоке) и одна для вывода (в основном потоке, заполненном рабочим потоком). Я использую таймер для планирования, когда обрабатывается очередь вывода. Код ниже; возможно, он кому-то полезен:

//Task.h 

#include <iostream> 
#include <thread> 


class Task 
{ 
public: 
    Task(bool shutdown = false) : _shutdown(shutdown) {}; 
    virtual ~Task() {}; 

    bool IsShutdownRequest() { return _shutdown; } 

    virtual int Execute() = 0; 

private: 
    bool _shutdown; 
}; 


class ShutdownTask : public Task 
{ 
public: 
    ShutdownTask() : Task(true) {} 

    virtual int Execute() { return -1; } 
}; 


class TimeSeriesTask : public Task 
{ 
public: 
    TimeSeriesTask(int value) : _value(value) {}; 

    virtual int Execute() 
    { 
     std::cout << "Calculating on thread " << std::this_thread::get_id() << std::endl; 
     return _value * 2; 
    } 

private: 
    int _value; 
}; 


// Main.cpp : Defines the entry point for the console application. 

#include "stdafx.h" 
#include "afxwin.h" 

#include <boost/lockfree/spsc_queue.hpp> 

#include "Task.h" 

static UINT_PTR ProcessDataCheckTimerID = 0; 
static const int ProcessDataCheckPeriodInMilliseconds = 100; 


class Manager 
{ 
public: 
    Manager() 
    { 
     //Worker Thread with application lifetime that processes a lock free queue 
     _workerThread = std::thread(&Manager::ProcessInputData, this); 
    }; 

    virtual ~Manager() 
    { 
     _workerThread.join(); 
    }; 

    void QueueData(int x) 
    { 
     if (x > 0) 
     { 
     _inputQueue.push(std::make_shared<TimeSeriesTask>(x)); 
     } 
     else 
     { 
     _inputQueue.push(std::make_shared<ShutdownTask>()); 
     } 
    } 

    void ProcessOutputData() 
    { 
     //process output data on the Main Thread 
     _outputQueue.consume_one([&](int value) 
     { 
     if (value < 0) 
     { 
      PostQuitMessage(WM_QUIT); 
     } 
     else 
     { 
      int result = value - 1; 
      std::cout << "Final result is " << result << " on thread " << std::this_thread::get_id() << std::endl; 
     } 
     }); 
    } 

private: 
    void ProcessInputData() 
    { 
     bool shutdown = false; 

     //Worker Thread processes input data indefinitely 
     do 
     { 
     _inputQueue.consume_one([&](std::shared_ptr<Task> task) 
     {  
      std::cout << "Getting element from input queue on thread " << std::this_thread::get_id() << std::endl;   

      if (task->IsShutdownRequest()) { shutdown = true; } 

      int result = task->Execute(); 
      _outputQueue.push(result); 
     }); 

     } while (shutdown == false); 
    } 

    std::thread _workerThread; 
    boost::lockfree::spsc_queue<std::shared_ptr<Task>, boost::lockfree::capacity<1024>> _inputQueue; 
    boost::lockfree::spsc_queue<int, boost::lockfree::capacity<1024>> _outputQueue; 
}; 


std::shared_ptr<Manager> g_pMgr; 


//timer to force Main Thread to process Manager's output queue 
void CALLBACK TimerCallback(HWND hWnd, UINT nMsg, UINT nIDEvent, DWORD dwTime) 
{ 
    if (nIDEvent == ProcessDataCheckTimerID) 
    { 
     KillTimer(NULL, ProcessDataCheckPeriodInMilliseconds); 
     ProcessDataCheckTimerID = 0; 

     //call function to process data 
     g_pMgr->ProcessOutputData(); 

     //reset timer 
     ProcessDataCheckTimerID = SetTimer(NULL, ProcessDataCheckTimerID, ProcessDataCheckPeriodInMilliseconds, (TIMERPROC)&TimerCallback); 
    } 
} 


int main() 
{ 
    std::cout << "Main thread is " << std::this_thread::get_id() << std::endl; 

    g_pMgr = std::make_shared<Manager>(); 

    ProcessDataCheckTimerID = SetTimer(NULL, ProcessDataCheckTimerID, ProcessDataCheckPeriodInMilliseconds, (TIMERPROC)&TimerCallback); 

    //queue up some dummy data 
    for (int i = 1; i <= 10; i++) 
    { 
     g_pMgr->QueueData(i); 
    } 

    //queue a shutdown request 
    g_pMgr->QueueData(-1); 

    //fake the application's message loop 
    MSG msg; 
    bool shutdown = false; 
    while (shutdown == false) 
    { 
     if (GetMessage(&msg, NULL, 0, 0)) 
     { 
     TranslateMessage(&msg); 
     DispatchMessage(&msg); 
     } 
     else 
     { 
     shutdown = true; 
     } 
    } 

    return 0; 
} 
+0

Я рад, что мои комментарии помогли. FYI: ожидание события более эффективно, чем while (true), и я бы удостоверился, что ваша очередь является потокобезопасной. Если нет, то плохое произойдет неожиданно. (Я никогда не использовал очередь повышения.) – Pressacco

1

Если ваша цель заключается в разработке решения с нуля (с использованием собственных потоков, очередей и т. Д.):

  1. создать поток СОХРАНИТЬ очереди очереди (Mutex/CriticalSection вокруг добавить/удалить)
  2. создать счетный семафор, связанный с очередью
  3. имеет один или несколько рабочих потоков ждать подсчета семафора (то есть нить будет блокировать)
    • семафор является более эффективным, чем иметь поток постоянно опрашивать очередь
  4. в виде сообщений/задания добавляются в очередь, увеличиваем семафор
    • поток будет просыпаться
    • нить должна удалить одно сообщение
  5. если необходимо вернуть результат ...
    • установки другой: Queue + Semaphore + WorkerThread s

ДОПОЛНИТЕЛЬНЫЕ УКАЗАНИЯ

Если вы решили реализовать Потокобезопасная очередь с нуля, взгляните на:

С учетом сказанного, я беру бы другой взгляд на BOOST. Я не использовал библиотеку, но из того, что я слышал, она, скорее всего, будет содержать некоторые соответствующие структуры данных (например, очередь с потоком).

Моя любимая цитата из MSDN:

"При использовании многопоточности любого рода, вы потенциально подвергать себя очень серьезные и сложные ошибки"

Sidebar

Поскольку вы просматриваете параллельное программирование в первый раз, вы можете рассмотреть:

  • Является ли ваша цель построить достойный товар, или это просто учебное упражнение?
    • производство? рассмотрим существующие существующие проверенные библиотеки
    • обучение? рассмотрите возможность написания кода с нуля
  • Рассмотрите возможность использования пула потоков с асинхронным обратным вызовом вместо собственных потоков.
  • еще темы! = Лучше
  • Нужны ли потоки?
  • Следуйте за KISS principle.
+0

Привет, это шаг 5, что я не могу оборачивать голову. Результат должен быть возвращен в основной поток, поэтому, когда основной поток отключен от выполнения всех других вещей, как я могу сообщить ему, что «теперь вам нужно проверить эту очередь сейчас»? Это не другой рабочий поток, и я, очевидно, не могу блокировать основной поток в течение какого-то времени, чтобы система не реагировала. – jslmsca

+0

+1 для [Принцип KISS] (http://stackoverflow.com/questions/30110792/boostasio-with-main-workers-threads-can-i-start-event-loop-before-posting-wo#comment48337145_30112460) – sehe

+0

Если основной поток выполняет другую работу, пусть это. Просто убедитесь, что основной поток периодически проверяет его очередь сообщений, чтобы увидеть, есть ли какие-либо сообщения. Эта проверка может быть неблокирующей: if queue.count> 0 then queue.remove message – Pressacco

Смежные вопросы