2016-11-03 4 views
2

В последнее время я думаю о высокоэффективной платформе многопоточных потоков с использованием C++ 11. И это в основном принимает C++ 11 объектов, таких как std::thread, std::condition_variable, std::mutex, std::shared_ptr и т. Д. В целом, эта структура имеет три основных компонента: работа, рабочий и оптимизация, ну, похоже, это настоящая фабрика. Когда пользователь конструирует свою бизнес-модель на сервере, ему просто нужно рассмотреть данные и их процессор. После того, как модель будет установлена, пользователю нужно только построить унаследованный объект класса данных и класс унаследованного рабочего класса.C++ 11 shared_ptr, использующий в многопотоках

Например:

class Data : public job {}; 
class Processsor : public worker {}; 

Когда сервер получить данные, он только новый объект данных через auto data = std::make_shared<Data>() в источнике данных обратного вызова нити и вызвать обтекание. job_dispatch для передачи процессора и данных в другой поток. Конечно, пользователю не нужно думать, чтобы освободить память. Оптимизация. job_dispatch основном делает ниже вещей:

void evd_thread_pool::job_dispatch(std::shared_ptr<evd_thread_job> job) { 
    auto task = std::make_shared<evd_task_wrap>(job); 
    task->worker = streamline.worker; 
    // worker has been registered in streamline first of all 
    { 
     std::unique_lock<std::mutex> lck(streamline.mutex); 
     streamline.task_list.push_back(std::move(task)); 
    } 
    streamline.cv.notify_all(); 
} 

evd_task_wrap используется в job_dispatch определяется как:

struct evd_task_wrap { 
    std::shared_ptr<evd_thread_job> order; 
    std::shared_ptr<evd_thread_processor> worker; 
    evd_task_wrap(std::shared_ptr<evd_thread_job>& o) 
    :order(o) {} 
}; 

Наконец task_wrap будет послан в обрабатывающей нить через task_list, который является объектом std::list. А обработка нити в основном делают вещи, как:

void evd_factory_impl::thread_proc() { 
    std::shared_ptr<evd_task_wrap> wrap = nullptr; 
    while (true) { 
     { 
      std::unique_lock<std::mutex> lck(streamline.mutex); 
      if (streamline.task_list.empty()) 
       streamline.cv.wait(lck, 
       [&]()->bool{return !streamline.task_list.empty();}); 
      wrap = std::move(streamline.task_list.front()); 
      streamline.task_list.pop_front(); 
     } 
     if (-1 == wrap->order->get_type()) 
      break; 
     wrap->worker->process_task(wrap->order); 
     wrap.reset(); 
    } 
} 

Но я не знаю, почему этот процесс часто сбой в функции thread_proc. И подсказка coredump о том, что иногда обертка является пустой ошибкой shared_ptr или сегмента, произошла в _Sp_counted_ptr_inplace::_M_dispose, которая вызывается в wrap.reset(). И я предположил, что shared_ptr имеет синхронную проблему потока в этом сценарии, в то время как я знаю, что блок управления в shared_ptr является безопасным потоком. И, конечно же, shared_ptr в job_dispatch и thread_proc - это другой объект shared_ptr, хотя они указывают на то же хранилище. Кто-нибудь имеет более конкретное предложение о том, как решить эту проблему? Или, если существует подобный легкому каркас с автоматическим управлением памятью с использованием C++ 11


примере process_task, такие как: void log_handle::process_task(std::shared_ptr<crx::evd_thread_job> job) { auto j = std::dynamic_pointer_cast<log_job>(job); j->log->Printf(0, j->print_str.c_str()); write(STDOUT_FILENO, j->print_str.c_str(), j->print_str.size()); } class log_factory { public: log_factory(const std::string& name); virtual ~log_factory(); void print_ts(const char *format, ...) { //here dispatch the job char log_buf[4096] = {0}; va_list args; va_start(args, format); vsprintf(log_buf, format, args); va_end(args); auto job = std::make_shared<log_job>(log_buf, &m_log); m_log_th.job_dispatch(job); } public: E15_Log m_log; std::shared_ptr<log_handle> m_log_handle; crx::evd_thread_pool m_log_th; };

+1

В последнее время я думаю о высокоэффективной многопоточной инфраструктуре с использованием событий с использованием C++ 11. Это нормально для учебных целей, иначе я буду препятствовать вам в этом, используйте ASIO. – Arunmu

+1

@Arunmu Это настолько огромно, что мне нужен только легкий фейерверк. Когда я хочу удовлетворить некоторые конкретные требования, я могу исправить источник, чтобы встретить их быстро. Но Asio не может так быстро реагировать. –

+0

Это не лучший способ подумать.В итоге вы получите больше времени на отладку, исправив свою собственную инфраструктуру, чем используя ASIO, который, как я полагаю, займет около 1 дня, чтобы интегрировать и работать с хорошей производительностью. – Arunmu

ответ

-1

Я обнаружил проблему в коде, который может или не может быть связанные с:

Вы используете notify_all из вашего условия. Это пробудит ВСЕ потоки от сна. Это нормально, если вы оберните wait в цикле в то время, как:

while (streamline.task_list.empty()) 
    streamline.cv.wait(lck, [&]()->bool{return !streamline.task_list.empty();}); 

Но так как вы используете if все потоки покидают wait. Если вы отправляете один продукт и имеете несколько потребительских потоков, все, кроме одного потока, вызывают wrap = std::move(streamline.task_list.front());, тогда как список задач пуст и вызывает UB.

+0

Спасибо за консультанта, но я думаю, что это не ключевой момент аварии. Вот источник моей структуры: https://github.com/raine0524/kbase/blob/ ведущий/KBase/evd_thread_impl.cpp. И job_dispatch & thread_proc в нижней части страницы –

+0

На самом деле task_list & cv уникален в каждом потоке пула. И evd_task_wrap будет создан std :: make_shared для возврата обратно в список задач, которому принадлежит только один поток. –

+1

Он используя вариант 'wait()', который принимает предикат, который определяет условие ожидания. Это эквивалентно внешнему виду, поэтому он ему не нужен. – Matthias247

Смежные вопросы