2012-02-22 6 views
2

Это классическая проблема c/p, когда некоторые потоки создают данные, а другие - данные. Как производитель, так и потребители используют буфер размером в const. Если буфер пуст, потребители должны ждать, и если он заполнен, то продюсер должен ждать. Я использую семафоры, чтобы отслеживать полные или пустые очереди. Производитель собирается уменьшать семафор свободных пятен, добавлять значения и сгенерировать заполненные слоты семафором. Поэтому я пытаюсь реализовать программу, которая получает некоторые числа из функции генератора, а затем выводит среднее число чисел. Рассматривая это как проблему производителя-потребителя, я пытаюсь сэкономить время на выполнение программы. Функция generateNumber вызывает некоторую задержку в процессе, поэтому я хочу создать несколько потоков, которые генерируют числа, и помещать их в очередь. Затем «основной поток», который выполняет основную функцию, должен читать из очереди и находить сумму, а затем среднюю. Так вот, что я до сих пор:потребитель/производитель в C++

#include <cstdio> 
#include <cstdlib> 
#include <time.h> 
#include "Thread.h" 
#include <queue> 

int generateNumber() { 
    int delayms = rand()/(float) RAND_MAX * 400.f + 200; 
    int result = rand()/(float) RAND_MAX * 20; 
    struct timespec ts; 
    ts.tv_sec = 0; 
    ts.tv_nsec = delayms * 1000000; 
    nanosleep(&ts, NULL); 
    return result; } 


struct threadarg { 
    Semaphore filled(0); 
    Semaphore empty(n); 
    std::queue<int> q; }; 


void* threadfunc(void *arg) { 
    threadarg *targp = (threadarg *) arg; 
    threadarg &targ = *targp; 
    while (targ.empty.value() != 0) { 
     int val = generateNumber(); 
     targ.empty.dec(); 
     q.push_back(val); 
     targ.filled.inc(); } 
} 
int main(int argc, char **argv) { 
    Thread consumer, producer; 
    // read the command line arguments 
    if (argc != 2) { 
     printf("usage: %s [nums to average]\n", argv[0]); 
     exit(1); } 
    int n = atoi(argv[1]); 
    // Seed random number generator 
    srand(time(NULL)); 
} 

Я немного запутался сейчас, потому что я не знаю, как создать несколько потоков производителей, которые генерируют число (если д не полностью), а потребитель читает из очередь (то есть, если q не пусто). Я не уверен, что добавить в основной смысл. также в разделе «Thread.h» вы можете создать поток, мьютекс или семафор. В потоке есть методы .run (threadFunc, arg), .join() и т. Д. Мьютекс можно заблокировать или разблокировать. Семафорные методы были использованы в моем коде.

+1

Привет, Дэн, вы не приняли ни одного из ответов, которые были даны вам. Просьба дать некоторый стимул сообществу ответить на ваши вопросы. –

+0

Мне очень жаль, что я даже не догадывался, что это вариант до сих пор! Я принял ответы на все вопросы, которые я ранее задавал. –

+0

Так спасибо за ответы. Тем не менее, это не столько код, с которым я борюсь, я просто не уверен, где определить, что особенно с потребителями. –

ответ

3

Ваша очередь не синхронизирована, поэтому несколько продюсеров могут одновременно звонить push_back или в то же время потребитель звонит pop_front ... это сломается.

Простой подход к выполнению этой работы - использовать потокобезопасную очередь, которая может быть оберткой вокруг std::queue, которую вы уже имеете, плюс мьютекс.

Вы можете начать с добавления мьютекса и блокировки/разблокировки вокруг каждого вызова вперед std::queue - для одного потребителя, который должен быть достаточным, для нескольких потребителей вы должны сливаться front() и pop_front() в один Синхронизировано вызов.

Чтобы потребитель блокировал, пока очередь пуста, вы можете добавить переменную условия в свою оболочку.

Этого должно быть достаточно, чтобы вы могли найти ответ онлайн - пример кода ниже.


template <typename T> class SynchronizedQueue 
{ 
    std::queue<T> queue_; 
    std::mutex mutex_; 
    std::condition_variable condvar_; 

    typedef std::lock_guard<std::mutex> lock; 
    typedef std::unique_lock<std::mutex> ulock; 

public: 
    void push(T const &val) 
    { 
     lock l(mutex_); // prevents multiple pushes corrupting queue_ 
     bool wake = queue_.empty(); // we may need to wake consumer 
     queue_.push(val); 
     if (wake) condvar_.notify_one(); 
    } 

    T pop() 
    { 
     ulock u(mutex_); 
     while (queue_.empty()) 
      condvar_.wait(u); 
     // now queue_ is non-empty and we still have the lock 
     T retval = queue_.front(); 
     queue_.pop(); 
     return retval; 
    } 
}; 

std::mutex Заменить др с тем, что примитивы ваш "Thread.h" дает вам.

+0

Вы можете указать пример кода? –

+0

также что вы подразумеваете под второй до последней строкой о потребительском блоке? –

+1

Предполагая, что ваш потребитель будет вызывать 'pop()' для получения следующего результата: если очередь пуста, это должно блокироваться до тех пор, пока производитель ничего не добавит, а затем вернет его; образец код приход. – Useless

1

Что бы я сделал это:

  • сделать класс данных, который скрывает вашу очередь
  • Создать поточно-методы доступа для сохранения части данных к ц и удаление части данных из q (я бы использовал один мьютекс или критический раздел для аксессуаров)
  • Обращайтесь к случаю, когда у потребителя нет данных для работы (сон)
  • Обращайтесь к случаю, когда q становится слишком полные, и производители должны замедлить
  • Пусть потоки идут волей-неволей добавлять и удалять, поскольку они производят/потребляют

Кроме того, помните, чтобы добавить сон в каждом и каждом потоке, иначе вы будете привязывать процессор и не дает планировщик потоков хорошее место для переключения контекстов и совместного использования процессора с другими потоками/процессами. Вам это не нужно, но это хорошая практика.

+0

(Upvote, чтобы получить меня более 5k!) = D – Kieveli

+0

Я проголосую, но ваше предложение не проясняет эту проблему. –

+0

Собственно, изменение архитектуры, как это, по сути, устраняет проблему. – Kieveli

0

При управлении общим состоянием, подобным этому, вам нужна переменная условия и мьютекс. Основной образцом является функцией вдоль линий:

ScopedLock l(theMutex); 
while (!conditionMet) { 
    theCondition.wait(theMutex); 
} 
doWhatever(); 
theCondition.notify(); 

В вашем случае, я бы, вероятно, сделать переменные состояния и мьютекс членов класса, реализующих очередь. Для того, чтобы написать, то conditionMet будет !queue.full(), так что вы бы в конечном итоге с чем-то вроде :

ScopedLock l(queue.myMutex); 
while (queue.full()) { 
    queue.myCondition.wait(); 
} 
queue.insert(whatever); 
queue.myCondition.notify(); 

и читать:

ScopedLock l(queue.myMutex); 
while (queue.empty()) { 
    queue.myCondition.wait(); 
} 
results = queue.extract(); 
queue.myCondition.notify(); 
return results; 

В зависимости от интерфейса резьбы, может быть два notify функции: уведомить один (который просыпается один поток) и уведомить все (который просыпает все ожидающие потоки); в этом случае вам понадобится уведомить обо всех (или вам понадобятся две переменные условия, одна для пробела - , а другая - для чтения, причем каждая функция ждет один, , но уведомит об этом другое).

0

Защитить доступ к очереди с помощью мьютекса, который должен быть таким. Ограниченная очередь производителей-потребителей «Computer Science 101» требует двух семафоров (для управления счетчиком free/empty и для продюсеров/потребителей для ожидания, как вы уже это делаете), и одного mutex/futex/criticalSection для защиты очереди ,

Я не вижу, как замена семафоров и мьютексов с помощью condvars - отличная помощь. В чем смысл? Как вы реализуете ограниченную очередь производителей-потребителей с condvars, которая работает на всех платформах с несколькими производителями/потребителями?