2012-01-17 5 views
2

У меня есть простое приложение с потоком «менеджер», в котором появляются десять простых «рабочих» потоков. Я хочу, чтобы все «рабочие» потоки блокировались при одной и той же переменной условия (то есть: condvar), и я хочу вручную сигнализировать все десять потоков, чтобы просыпаться одновременно с вызовом pthread_cond_broadcast.Синхронизация потоков для вызова pthread_cond_broadcast

В случае моего приложения, потоки могут страдать от ошибки и заканчиваться раньше, так что возможно, что не все десять потоков попадают в точку синхронизации.

Одним простым механизмом является создание pthread_barrier_t и все десяти потоков, вызывающих pthread_barrier_wait, и когда все десять потоков завершат этот вызов, они могут продолжить выполнение. Однако для этого потребуется, чтобы потоки могли изменять количество потоков, которые барьер требует разблокировать. Я не знаю, можно ли это безопасно изменить.

Кроме того, я хочу, чтобы все неподвижные потоки не запускались автоматически, как если бы они были с барьером, я хочу вручную запустить их с помощью вызова pthread_cond_broadcast. Как я могу гарантировать, что все потоки, которые все еще живы (в идеале десять) заблокированы на конвале, прежде чем я сделал широковещательный вызов?

Спасибо!

+0

Я действительно не понимаю вашу проблему. Чтобы разбудить всех, кто ждет переменную cond, вы просто используете 'pthread_cond_broacast'. Это разбудит всех, кто все еще там. Вы хотите убедиться, что все на самом деле ждут переменную cond, или какова ваша проблема? –

+0

Это правильно, я хочу, чтобы все ждали. Проблема в том, что некоторые, возможно, уже умерли, поэтому я хочу, чтобы все те, кто все еще жив, ожидали. – DevNull

ответ

1

Ниже показан один из способов сделать это, используя переменную условия и несколько других переменных; хотя могут быть и лучшие способы. Комментарии должны показать, как это работает. Вам, конечно, придется модифицировать вещи в соответствии с вашей реальной ситуацией; например, там могут быть петли вовлечены и т.д.

int activeThreads = 0; /* number of threads currently going */ 
int waitingThreads = 0; /* number of threads waiting on the condition */ 
int readyFlag = 0;  /* flag to tell the threads to proceed when signaled */ 
pthread_cond_t cond; /* condition to wait on/signal */ 
pthread_mutex_t mtx;  /* mutex for the above */ 

pthread_cond_t condWaiting; /* EDIT: additional condition variable to signal 
           * when each thread starts waiting */ 

void *threadFunc(void *arg) 
{ 
    /* Edit: Rather than incrementing 'activeThreads' here, it should be done 
    * in the main thread when each thread is created (to avoid a race) */ 

    /* ...do stuff... */ 

    /* When the threads should wait, do this (they wait for 'readyFlag' to be 
    * set, but also adjust the waiting thread count so the main thread can 
    * determine whether to broadcast) */ 
    pthread_mutex_lock(&mtx); 
    if (readyFlag == 0) { 
     waitingThreads++; 
     do { 
     pthread_cond_signal(&condWaiting); /* EDIT: signal the main thread when 
              * a thread begins waiting */ 
     pthread_cond_wait(&cond,&mtx); 
     } while (readyFlag == 0); 
     waitingThreads--; 
    } 
    pthread_mutex_unlock(&mtx); 

    /* ...more stuff... */ 

    /* When threads terminate, they decrement the active thread count */ 
    pthread_mutex_lock(&mtx); 
    activeThreads--; 
    pthread_cond_signal(&condWaiting); /* EDIT: also signal the main thread 
             * when a thread exits to make it 
             * recheck the waiting thread count if 
             * waiting for all threads to wait */ 
    pthread_mutex_unlock(&mtx); 

    return NULL; 
} 

int main(int argc, char *argv[]) 
{ 
    /* Edit: Showing some code to initialize the mutex, condition variable(s), 
    * etc., and create some threads -- modify as needed */ 
    pthread_mutex_init(&mtx,NULL); 
    pthread_cond_init(&cond,NULL); 
    pthread_cond_init(&condWaiting,NULL); /* EDIT: if main thread should block 
             * until all threads are waiting */ 
    activeThreads = waitingThreads = readyFlag = 0; 

    /* Edit: Increment 'activeThreads' here rather than in the thread function, 
    * to avoid a race (if the main thread started waiting for the others 
    * when not all had incremented the count yet, the main thread might end 
    * up waiting for fewer threads to be ready -- though it's unlikely */ 
    #define NUM_THREADS 10 
    pthread_t workers[NUM_THREADS]; 
    for (int i = 0; i < NUM_THREADS; i++) { 
    /* should use appropriate thread attributes */ 
    if (pthread_create(&workers[i],NULL,threadFunc,NULL) == 0) 
     activeThreads++; 
    } 

    /* ...do stuff... */ 

    /* Set 'readyFlag' and do condition broadcast IF all threads are waiting, 
    * or just carry on if they aren't */ 
    pthread_mutex_lock(&mtx); 
    if ((activeThreads != 0) && (activeThreads == waitingThreads)) { 
     readyFlag = 1; 
     pthread_cond_broadcast(&cond); 
    } 
    pthread_mutex_unlock(&mtx); 

    /* EDIT: OR.. to wait until all threads are waiting and then broadcast, do 
    * this instead: */ 
    pthread_mutex_lock(&mtx); 
    while (waitingThreads < activeThreads) { /* wait on 'condWaiting' until all 
               * active threads are waiting */ 
     pthread_cond_wait(&condWaiting,&mtx); 
    } 
    if (waitingThreads != 0) { 
     readyFlag = 1; 
     pthread_cond_broadcast(&cond); 
    } 
    pthread_mutex_unlock(&mtx); 

    /* ...more stuff... */ 

    /* If needed, you can clear the flag when NO threads are waiting.. */ 
    pthread_mutex_lock(&mtx); 
    if (waitingThreads == 0) 
     readyFlag = 0; 
    pthread_mutex_unlock(&mtx); 

    /* ...even more stuff... */ 

    return 0; 
} 

Я хотел бы добавить, однако, что я не вижу, когда бы хороший повод, чтобы сделать это, а не только защищает ресурсы в более прямой путь.

EDIT: Добавлены некоторые вещи в код, показывая вторую переменную условия, используемую, чтобы основной поток ожидал, что все рабочие будут готовы. Измененные части отмечены «EDIT:» в комментариях и могут быть опущены, если не нужны. Я также исправил условие гонки, перемещая приращение activeThreads из функции потока и показал инициализацию мьютекса и т. Д. (Без обработки ошибок).

+1

Мне кажется, вам действительно нужна вторая переменная условия, чтобы поток main() '(или 'master') ожидал, когда рабочие потоки станут готовыми. В противном случае, если рабочие потоки не готовы, когда 'main()' получает, где он будет устанавливать 'readyFlag = 1', тогда это условие никогда не будет сигнализироваться, и рабочие потоки никогда не будут продолжены. –

+0

@MichaelBurr Я считал это, но нужно ли это, зависит от того, должен ли 'main()' ждать или просто продолжать с другой работой и опросом периодически. Я могу редактировать его в ... – Dmitri

1

Вообще говоря, вы должны просто установить переменную условия (и связанный флаг), когда работа готова к работе - обычно нет необходимости ждать, пока потоки будут блокироваться при условии var. Если они «опоздали», они просто заметят, что флаг уже установлен и не блокирует блокировку.

Но если вам действительно нужно подождать, пока все рабочие потоки не будут в точке, где они зацикливаются на условии var, вы можете использовать комбинацию переменных условия - одну, которая отслеживает, сколько потоков «готова к работе» ', а другой запускают их, чтобы начать работу. Некоторый peudo код:

// manager thread thread 

pthread_cond_t pseudo_barrier; 
pthread_cond_t pseudo_barrier_complete_cond; 
pthread_mutex_t pseudo_barrier_mux; 
int pseudo_barrier_counter = NUM_THREADS; 
int pseudo_barrier_complete_flag = 0; 

void thread_manager(void) 
{ 
    pthread_cond_init(&pseudo_barrier, NULL); 
    pthread_cond_init(&pseudo_barrier_complete_cond, NULL); 
    pthread_mutex_init(&pseudo_barrier_mux, NULL); 


    for (int i = 0 ; i < NUM_THREADS; ++i) { 
     pthread_create(/*... */); 
    } 

    // wait for threads to 'stage' 
    pthread_mutex_lock(&pseudo_barrier_mux); 
    while (pseudo_barrier_counter != 0) { 
     pthread_cond_wait(&pseudo_barrier, &pseudo_barrier_mux); 
    } 
    pthread_mutex_unlock(&pseudo_barrier_mux); 


    // at this point, all threads have either bailed out or are waiting to go 
    // let 'em rip 

    pthread_mutex_lock(&pseudo_barrier_mux); 
    pseudo_barrier_complete_flag = 1; 
    pthread_mutex_unlock(&pseudo_barrier_mux); 
    pthread_cond_broadcast(&pseudo_barrier_complete_cond); 

    // do whatever else the manager thread needs to do... 
} 


// worker threads 
void* worker_thread(void* context) 
{ 
    int error_result = 0; 

    // whatever initialization... 
    // if this thread is going to bail out due to an error, it needs to 
    // set the `error_result` value appropriately and still drop into the 
    // following code 

    // let the manager know that this thread is waiting (or isn't going to participate) 
    pthread_mutex_lock(&pseudo_barrier_mux); 
    --pseudo_barrier_counter; 

    if (pseudo_barrier_counter == 0) { 
     // all other threads are accounted for, let the manager know we're ready 
     pthread_cond_signal(&pseudo_barrier); 
    } 

    // if this thread isn't going to contine because of some error, it's already 
    // accounted for that fact in the `my_barrier_count`, so we can return here 
    // without preventing the pseudo-barrier from being met. 
    if (some_error_occurred) { 
     pthread_mutex_lock(&pseudo_barrier_mux); 
     return NULL; 
    } 

    // NOTE: we're still holding pseudo_barrier_mux, so the master thread is still 
    // blocked, even if we've signaled it - it'll jhave to wait until this 
    // thread is blocking on `pseudo_barrier_complete_cond` 

    while (!pseudo_barrier_complete_flag) { 
     pthread_cond_wait(&pseudo_barrier_complete_cond, &pseudo_barrier_mux); 
    } 
    pthread_mutex_unlock(&pseudo_barrier_mux); 


    // do the work... 
} 

Конечно, псевдо-кода, представленная должна быть очищена для любого реального использования (включая обработку ошибок), вероятно, упаковку все вспомогательные переменные состояния, семафор и флаги в структуру

Смежные вопросы