2010-03-03 3 views
2

(Вкратце: main() WaitForSingleObject висит в программе ниже).ReleaseSemaphore не освобождает семафор

Я пытаюсь написать фрагмент кода, который отправляет потоки и ждет их завершения до его возобновления. Вместо того, чтобы создавать потоки каждый раз, что дорого, я их спал. Основной поток создает потоки X в состоянии CREATE_SUSPENDED.

Синхронизация выполняется с помощью семафора с X как MaximumCount. Счётчик семафора сбрасывается до нуля, и потоки отправляются. The threds выполняют какую-то глупую петлю и вызывают ReleaseSemaphore, прежде чем они отправятся спать. Затем основной поток использует WaitForSingleObject X раз, чтобы убедиться, что каждый поток завершил свою работу и спал. Затем он петляет и делает все это снова.

Время от времени программа не выходит. Когда я клюю программу, я вижу, что WaitForSingleObject зависает. Это означает, что поток ReleaseSemaphore в потоке не работал. Ничего не напечатано, так что, мол, ничто не пошло не так.

Может быть две нити не должно называть ReleaseSemaphore в то же самое время, но это было бы свести на нет цели семафоров ...

я просто не обращал внимание на это ...

Других решений для синхронизирующие потоки с благодарностью принимаются!

#define TRY 100 
#define LOOP 100 

HANDLE *ids; 
HANDLE semaphore; 

DWORD WINAPI Count(__in LPVOID lpParameter) 
{ 
float x = 1.0f; 
while(1) 
{ 
    for (int i=1 ; i<LOOP ; i++) 
    x = sqrt((float)i*x); 
    while (ReleaseSemaphore(semaphore,1,NULL) == FALSE) 
    printf(" ReleaseSemaphore error : %d ", GetLastError()); 
    SuspendThread(ids[(int) lpParameter]); 
} 
return (DWORD)(int)x; 
} 

int main() 
{ 
SYSTEM_INFO sysinfo; 
GetSystemInfo(&sysinfo); 
int numCPU = sysinfo.dwNumberOfProcessors; 

semaphore = CreateSemaphore(NULL, numCPU, numCPU, NULL); 
ids = new HANDLE[numCPU]; 

for (int j=0 ; j<numCPU ; j++) 
    ids[j] = CreateThread(NULL, 0, Count, (LPVOID)j, CREATE_SUSPENDED, NULL); 

for (int j=0 ; j<TRY ; j++) 
{ 
    for (int i=0 ; i<numCPU ; i++) 
    { 
    if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT) 
    printf("Timed out !!!\n"); 
    ResumeThread(ids[i]); 
    } 
    for (int i=0 ; i<numCPU ; i++) 
    WaitForSingleObject(semaphore,INFINITE); 
    ReleaseSemaphore(semaphore,numCPU,NULL); 
} 
CloseHandle(semaphore); 
printf("Done\n"); 
getc(stdin); 
} 

ответ

3

проблема происходит в следующих случаях:

основной поток возобновляет рабочие потоки:

for (int i=0 ; i<numCPU ; i++) 
    { 
    if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT) 
    printf("Timed out !!!\n"); 
    ResumeThread(ids[i]); 
    } 

рабочие потоки делают свою работу и освободить семафор:

for (int i=1 ; i<LOOP ; i++) 
    x = sqrt((float)i*x); 
    while (ReleaseSemaphore(semaphore,1,NULL) == FALSE) 

главного поток ждет всех рабочих потоков и сбрасывает семафор:

for (int i=0 ; i<numCPU ; i++) 
    WaitForSingleObject(semaphore,INFINITE); 
    ReleaseSemaphore(semaphore,numCPU,NULL); 

основной поток переходит в следующий раунд, пытаясь возобновить рабочие потоки (обратите внимание, что рабочие потоки не событие приостановлены себя еще! это где начинаются проблемы ... вы пытаетесь возобновить потоки, которые не обязательно еще приостановленных):

for (int i=0 ; i<numCPU ; i++) 
    { 
    if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT) 
    printf("Timed out !!!\n"); 
    ResumeThread(ids[i]); 
    } 

наконец рабочие потоки приостановить себя (хотя они уже должны начать следующий раунд):

SuspendThread(ids[(int) lpParameter]); 

и основной поток ждет навсегда, так как все рабочие подвешены прямо сейчас:

for (int i=0 ; i<numCPU ; i++) 
    WaitForSingleObject(semaphore,INFINITE); 

вот ссылка, которая показывает, как правильно решить проблемы производитель/потребитель:

http://en.wikipedia.org/wiki/Producer-consumer_problem

также я думаю, что critical sections гораздо быстрее, чем семафоры и мьютексы. их также легче понять в большинстве случаев (imo).

+0

Ницца теория/описание. Помещение основной в спящий режим (1), прежде чем возобновление потоков, похоже, решает проблему, но тогда производительность не ушла. По крайней мере, это подтверждает теорию: основные потоки возобновления, которые еще не спали \ o / – Gabriel

3

Я не понимаю код, но синхронизация потоков определенно плохая. Вы предполагаете, что потоки будут вызывать SuspendThread() в определенном порядке. A преуспевший вызов WaitForSingleObject() не сообщает вам , который вызвал поток ReleaseSemaphore(). Таким образом, вы вызываете ReleaseThread() в потоке, который не был приостановлен. Это быстро блокирует программу.

Другое плохое предположение заключается в том, что поток, уже вызванный SuspendThread после возврата WFSO. Обычно да, не всегда. Поток может быть предварительно упущен сразу после вызова RS. Вы снова вызовите ReleaseThread() в потоке, который не был приостановлен. Обычно это занимает около дня или около того, чтобы затормозить вашу программу.

И я думаю, что есть один вызов ReleaseSemaphore слишком много. Попытка не обмануть его, без сомнения.

Вы не можете управлять потоками с помощью функции Suspend/ReleaseThread(), не пытайтесь.

+0

Ну я не берем на себя заказ на SuspendThread. Самый внутренний цикл фактически возобновляет поток в произвольном порядке, но затем я вызываю WaitForSingleObject numCPU раз, это ждет numCPU ReleaseSemaphore(), который может произойти в любом порядке. – Gabriel

+0

@Gabriel: Я не думаю, что вы решите это, пока не увидите, что вызов ReleaseThread в потоке, который не заблокирован, является вашей основной проблемой. –

+0

Я не понял ваш второй абзац. Теперь я понял, это та же идея, что и stmax ». Спасибо за объяснение! – Gabriel

4

Вместо использования семафора (по крайней мере, непосредственно) или с явным явным образом просыпать поток, чтобы выполнить какую-либо работу, я всегда использовал потокобезопасную очередь. Когда main хочет, чтобы рабочий поток делал что-то, он подталкивает описание задания, которое должно выполняться в очередь. Работник нити каждый раз сделать работу, а затем попытаться выскочить другую работу из очереди, и в конечном итоге приостановлен до тех пор, пока это задание в очереди для них сделать:

код очереди выглядит следующим образом:

#ifndef QUEUE_H_INCLUDED 
#define QUEUE_H_INCLUDED 

#include <windows.h> 

template<class T, unsigned max = 256> 
class queue { 
    HANDLE space_avail; // at least one slot empty 
    HANDLE data_avail; // at least one slot full 
    CRITICAL_SECTION mutex; // protect buffer, in_pos, out_pos 

    T buffer[max]; 
    long in_pos, out_pos; 
public: 
    queue() : in_pos(0), out_pos(0) { 
     space_avail = CreateSemaphore(NULL, max, max, NULL); 
     data_avail = CreateSemaphore(NULL, 0, max, NULL); 
     InitializeCriticalSection(&mutex); 
    } 

    void push(T data) { 
     WaitForSingleObject(space_avail, INFINITE);  
     EnterCriticalSection(&mutex); 
     buffer[in_pos] = data; 
     in_pos = (in_pos + 1) % max; 
     LeaveCriticalSection(&mutex); 
     ReleaseSemaphore(data_avail, 1, NULL); 
    } 

    T pop() { 
     WaitForSingleObject(data_avail,INFINITE); 
     EnterCriticalSection(&mutex); 
     T retval = buffer[out_pos]; 
     out_pos = (out_pos + 1) % max; 
     LeaveCriticalSection(&mutex); 
     ReleaseSemaphore(space_avail, 1, NULL); 
     return retval; 
    } 

    ~queue() { 
     DeleteCriticalSection(&mutex); 
     CloseHandle(data_avail); 
     CloseHandle(space_avail); 
    } 
}; 

#endif 

И грубый эквивалент вашего кода в используемых им потоках выглядит примерно так. Я не выяснил, что именно делает ваша функция потока, но это было что-то с суммированием квадратных корней, и, по-видимому, вас больше интересует синхронизация потоков, чем то, что на самом деле делают потоки.

Редактировать: (на основе комментария): Если вам нужно main(), чтобы дождаться завершения некоторых задач, выполните еще одну работу, а затем назначьте больше задач, лучше всего справиться с этим, помещая событие (например) в каждой задачи, и ваша функция потока задает события. Пересмотренный код, чтобы сделать это будет выглядеть следующим образом (обратите внимание, что код очереди не влияет):

#include "queue.hpp" 

#include <iostream> 
#include <process.h> 
#include <math.h> 
#include <vector> 

struct task { 
    int val; 
    HANDLE e; 

    task() : e(CreateEvent(NULL, 0, 0, NULL)) { } 
    task(int i) : val(i), e(CreateEvent(NULL, 0, 0, NULL)) {} 
}; 

void process(void *p) { 
    queue<task> &q = *static_cast<queue<task> *>(p); 

    task t; 
    while (-1 != (t=q.pop()).val) { 
     std::cout << t.val << "\n"; 
     SetEvent(t.e); 
    } 
} 

int main() { 
    queue<task> jobs; 

    enum { thread_count = 4 }; 
    enum { task_count = 10 }; 

    std::vector<HANDLE> threads; 
    std::vector<HANDLE> events; 

    std::cout << "Creating thread pool" << std::endl; 
    for (int t=0; t<thread_count; ++t) 
     threads.push_back((HANDLE)_beginthread(process, 0, &jobs)); 
    std::cout << "Thread pool Waiting" << std::endl; 

    std::cout << "First round of tasks" << std::endl; 

    for (int i=0; i<task_count; ++i) { 
     task t(i+1); 
     events.push_back(t.e); 
     jobs.push(t); 
    } 

    WaitForMultipleObjects(events.size(), &events[0], TRUE, INFINITE); 

    events.clear(); 

    std::cout << "Second round of tasks" << std::endl; 

    for (int i=0; i<task_count; ++i) { 
     task t(i+20); 
     events.push_back(t.e); 
     jobs.push(t); 
    } 

    WaitForMultipleObjects(events.size(), &events[0], true, INFINITE); 
    events.clear(); 

    for (int j=0; j<thread_count; ++j) 
     jobs.push(-1); 

    WaitForMultipleObjects(threads.size(), &threads[0], TRUE, INFINITE); 

    return 0; 
} 
+0

Насколько я понимаю ваш код, синхронизация происходит в последнем WaitForMultipleObjects, когда вы ждете, пока все потоки умрут. Я не могу позволить себе позволить им умереть. Код находится в бесконечном цикле, который будет нуждаться в потоках, чтобы поспать некоторое время, а затем еще раз записать больше заданий. Мне нужно убедиться, что все задания были выполнены потоками перед основными поступлениями, а другие - до того, как они вернутся к этим потокам и дадут им больше заданий. Я не могу воссоздать новые потоки, это слишком медленно, даже с _beginthreads(). – Gabriel

+0

Хорошее решение. Тем не менее, одно наблюдение заключается в том, что вся РУЧКА из потоков и событий будет протекать. Они должны быть явно очищены с помощью CloseHandle. –

0

Проблема в том, что вы ждёте чаще, чем вы сигнализируете.

Цикл for (int j=0 ; j<TRY ; j++) ждет восемь раз для семафора, а четыре потока будут сигнализировать только один раз, и сам цикл сигнализирует об этом один раз. В первый раз через цикл это не проблема, потому что семафор получает начальный счет из четырех. Второе и каждое последующее время вы ожидаете слишком большого количества сигналов. Это смягчается тем фактом, что на первых четырех ожиданиях вы ограничиваете время и не пытаетесь повторить ошибку. Поэтому иногда это может работать, и иногда ваше ожидание будет зависать.

Я думаю, что следующие (непроверенные) изменения помогут.

Инициализировать семафор нулевого отсчета:

semaphore = CreateSemaphore(NULL, 0, numCPU, NULL); 

избавиться от ожидания в нити цикла возобновления (т.е.удалить следующее):

if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT) 
     printf("Timed out !!!\n"); 

Удалите посторонний сигнал от конца цикла Ьги (т.е. удалить следующее):

ReleaseSemaphore(semaphore,numCPU,NULL); 
0

Вот практическое решение.

Я хотел, чтобы моя основная программа использовала потоки (затем использовала более одного ядра), чтобы выполнять задания и ждать завершения всех потоков, прежде чем возобновлять и делать другие вещи. Я не хотел, чтобы потоки умирали и создавали новые, потому что это медленно. В моем вопросе я пытался это сделать, приостановив потоки, что казалось естественным. Но, как отметил дворянин, «вы можете управлять нарезкой с помощью Suspend/ReleaseThread()».

Решение включает в себя семафоры, как тот, который я использовал для управления потоками. На самом деле для управления основной нитью используется еще один семафор. Теперь у меня есть один семафор в потоке, чтобы управлять потоками и одним семафором для управления основным.

Вот решение:

#include <windows.h> 
#include <stdio.h> 
#include <math.h> 
#include <process.h> 

#define TRY 500000 
#define LOOP 100 

HANDLE *ids; 
HANDLE *semaphores; 
HANDLE allThreadsSemaphore; 

DWORD WINAPI Count(__in LPVOID lpParameter) 
{ 
    float x = 1.0f;   
    while(1) 
    { 
     WaitForSingleObject(semaphores[(int)lpParameter],INFINITE); 
     for (int i=1 ; i<LOOP ; i++) 
      x = sqrt((float)i*x+rand()); 
     ReleaseSemaphore(allThreadsSemaphore,1,NULL); 
    } 
    return (DWORD)(int)x; 
} 

int main() 
{ 
    SYSTEM_INFO sysinfo; 
    GetSystemInfo(&sysinfo); 
    int numCPU = sysinfo.dwNumberOfProcessors; 

    ids = new HANDLE[numCPU]; 
    semaphores = new HANDLE[numCPU]; 

    for (int j=0 ; j<numCPU ; j++) 
    { 
     ids[j] = CreateThread(NULL, 0, Count, (LPVOID)j, NULL, NULL); 
     // Threads blocked until main releases them one by one 
     semaphores[j] = CreateSemaphore(NULL, 0, 1, NULL); 
    } 
    // Blocks main until threads finish 
    allThreadsSemaphore = CreateSemaphore(NULL, 0, numCPU, NULL); 

    for (int j=0 ; j<TRY ; j++) 
    { 
     for (int i=0 ; i<numCPU ; i++) // Let numCPU threads do their jobs 
      ReleaseSemaphore(semaphores[i],1,NULL); 
     for (int i=0 ; i<numCPU ; i++) // wait for numCPU threads to finish 
      WaitForSingleObject(allThreadsSemaphore,INFINITE); 
    } 
    for (int j=0 ; j<numCPU ; j++) 
     CloseHandle(semaphores[j]); 
    CloseHandle(allThreadsSemaphore); 
    printf("Done\n"); 
    getc(stdin); 
}