2013-06-02 1 views
1

У меня есть небольшая нить пример бассейна для использования узнать:циркулярно связанный список, показывающий странное поведение с PTHREAD

Он принимает на работу структуры, которые представляют собой простой расчет (+, -, *, /) из двух значений. Теперь рабочие потоки вытаскивают эти рабочие структуры из очереди и выполняют их.

Так много для теории. Теперь проблема заключается в том, что когда я хочу использовать вспомогательную функцию submit_work(), как-то очередь в разрешении рабочего потока в рабочем потоке не работает (выведенные данные недействительны, а когда я использую QUEUE_REMOVE, это вызывает ошибку сегментации).

Как это может быть, просто используя вспомогательную функцию, вся концепция больше не работает?

pool.c

Я имею в виду толкая данные вручную отлично работает ...

#include <stdio.h> 
#include <unistd.h> 
#include <pthread.h> 
#include "../queue/queue.h" 

#define MAX_THREADS 1 

pthread_t threads[MAX_THREADS]; 

pthread_cond_t cond; 

pthread_mutex_t mutex; 

/** 
* QUEUE is a "void * arr[2]". 
* The Queue itself is similar to the circularly linked list in linux 
*/ 
QUEUE queue; 

struct work_s { 
    int a; 
    int b; 
    int type; 
    QUEUE node; 
}; 

void * worker(); 
void submit_work(int a, int b, int type); 

int main() { 
    QUEUE_INIT(&queue); 

    pthread_cond_init(&cond, NULL); 
    pthread_mutex_init(&mutex, NULL); 

    struct work_s work[2]; 

    /* 5 + 7 */ 
    work[0].a = 5; 
    work[0].b = 7; 
    work[0].type = 1; 

    /* 3 x 3 */ 
    work[1].a = 3; 
    work[1].b = 3; 
    work[1].type = 3; 

    /* initialize there queue nodes */ 
    QUEUE_INIT(&work[0].node); 
    QUEUE_INIT(&work[1].node); 

    /* insert both tasks into the work queue */ 
    QUEUE_INSERT_TAIL(&queue, &work[0].node); 
    QUEUE_INSERT_TAIL(&queue, &work[1].node); 

    /* this does actually the same as above but causes a segmentation fault. */ 
    submit_work(5, 6, 3); 

    for (int i = 0; i < MAX_THREADS; i++) 
     pthread_create(&threads[i], NULL, worker, NULL); 
    for (int i = 0; i < MAX_THREADS; i++) 
     pthread_join(threads[i], NULL); 
    for (int i = 0; i < MAX_THREADS; i++) 
     pthread_detach(threads[i]); 

    pthread_mutex_destroy(&mutex); 
    pthread_cond_destroy(&cond); 

    return 0; 
} 

void submit_work(int a, int b, int type) { 
    struct work_s work; 

    work.a = a; 
    work.b = b; 
    work.type = type; 

    pthread_mutex_lock(&mutex); 

    QUEUE_INIT(&work.node); 
    QUEUE_INSERT_TAIL(&queue, &work.node); 

    pthread_mutex_unlock(&mutex); 

    pthread_cond_signal(&cond); 
} 

void * worker() { 
    /* a pointer to a queue node */ 
    QUEUE * q; 

    int result; 

    struct work_s * work; 

    /* infinite loop */ 
    for (;;) { 
     while (QUEUE_EMPTY(&queue)) { 
      pthread_cond_wait(&cond, &mutex); 
     } 

     pthread_mutex_lock(&mutex); 

     q = QUEUE_HEAD(&queue); 

     /* HERE THE SEGMENTSTION FAULT OCCURS when using submit_work */ 
     QUEUE_REMOVE(q); 

     pthread_mutex_unlock(&mutex); 

     /* set the work pointer to the work struct we have pulled from queue */ 
     work = QUEUE_DATA(q, struct work_s, node); 

     /* PRINTS INCORRECT DATA on submit_work() */ 
     printf("received work type %d with a %d and b %d \n", work->a, work->b, work->type); 

     if (work->type == 0) { 
      break; 
     } 

     switch (work->type) { 
      case 1: 
       result = work->a + work->b; 
       printf("%d + %d = %d\n", work->a, work->b, result); 
       break; 
      case 2: 
       result = work->a - work->b; 
       printf("%d - %d = %d\n", work->a, work->b, result); 
       break; 
      case 3: 
       result = work->a * work->b; 
       printf("%d * %d = %d\n", work->a, work->b, result); 
       break; 
      case 4: 
       result = work->a/work->b; 
       printf("%d/%d = %d\n", work->a, work->b, result); 
       break; 
     } 
    } 


    pthread_exit(NULL); 
} 

queue.h

/* Copyright (c) 2013, Ben Noordhuis <[email protected]> 
* 
* Permission to use, copy, modify, and/or distribute this software for any 
* purpose with or without fee is hereby granted, provided that the above 
* copyright notice and this permission notice appear in all copies. 
* 
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 
*/ 

#ifndef QUEUE_H_ 
#define QUEUE_H_ 

typedef void *QUEUE[2]; 

/* Private macros. */ 
#define QUEUE_NEXT(q)  ((*(q))[0]) 
#define QUEUE_PREV(q)  ((*(q))[1]) 
#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT((QUEUE *) QUEUE_PREV(q))) 
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV((QUEUE *) QUEUE_NEXT(q))) 

/* Public macros. */ 
#define QUEUE_DATA(ptr, type, field)           \ 
    ((type *) ((char *) (ptr) - ((long) &((type *) 0)->field))) 

#define QUEUE_FOREACH(q, h)             \ 
    for ((q) = (*(h))[0]; (q) != (h); (q) = (*(q))[0]) 

#define QUEUE_EMPTY(q)              \ 
    (QUEUE_NEXT(q) == (q)) 

#define QUEUE_HEAD(q)               \ 
    (QUEUE_NEXT(q)) 

#define QUEUE_INIT(q)               \ 
    do {                  \ 
    QUEUE_NEXT(q) = (q);              \ 
    QUEUE_PREV(q) = (q);              \ 
    }                   \ 
    while (0) 

#define QUEUE_ADD(h, n)              \ 
    do {                  \ 
    QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n);          \ 
    QUEUE_NEXT_PREV(n) = QUEUE_PREV(h);          \ 
    QUEUE_PREV(h) = QUEUE_PREV(n);           \ 
    QUEUE_PREV_NEXT(h) = (h);             \ 
    }                   \ 
    while (0) 

#define QUEUE_SPLIT(h, q, n)             \ 
    do {                  \ 
    QUEUE_PREV(n) = QUEUE_PREV(h);           \ 
    QUEUE_PREV_NEXT(n) = (n);             \ 
    QUEUE_NEXT(n) = (q);              \ 
    QUEUE_PREV(h) = QUEUE_PREV(q);           \ 
    QUEUE_PREV_NEXT(h) = (h);             \ 
    QUEUE_PREV(q) = (n);              \ 
    }                   \ 
    while (0) 

#define QUEUE_INSERT_HEAD(h, q)            \ 
    do {                  \ 
    QUEUE_NEXT(q) = QUEUE_NEXT(h);           \ 
    QUEUE_PREV(q) = (h);              \ 
    QUEUE_NEXT_PREV(q) = (q);             \ 
    QUEUE_NEXT(h) = (q);              \ 
    }                   \ 
    while (0) 

#define QUEUE_INSERT_TAIL(h, q)            \ 
    do {                  \ 
    QUEUE_NEXT(q) = (h);              \ 
    QUEUE_PREV(q) = QUEUE_PREV(h);           \ 
    QUEUE_PREV_NEXT(q) = (q);             \ 
    QUEUE_PREV(h) = (q);              \ 
    }                   \ 
    while (0) 

#define QUEUE_REMOVE(q)              \ 
    do {                  \ 
    QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q);          \ 
    QUEUE_NEXT_PREV(q) = QUEUE_PREV(q);          \ 
    }                   \ 
    while (0) 

#endif /* QUEUE_H_ */ 

Если я должен также скопировать ОЧЕРЕДЬ .h оставить комментарий. Иначе вы нашли это here

Бодо

+0

Не связано с вашей проблемой, но вы не должны иметь цикл, который вызывает 'pthread_detach (threads [i]);' после того, как вы присоединились к этим потокам. Присоединение и отсоединение являются взаимоисключающими. –

+0

@MichaelBurr I, хотя pthread_join будет блокироваться до тех пор, пока потоки не закончатся? Куда порекомендовать его разместить? – bodokaiser

+0

'pthread_join()' блокируется до тех пор, пока поток не будет завершен; однако, как только вы присоединились к потоку, вы не можете называть 'pthread_detach()' на нем (и нет никаких причин). Поэтому моя рекомендация - просто избавиться от цикла отсоединения. –

ответ

3

Проблема заключается в том, что вы вставляете локальную переменную submit_work() на очереди (подпрограммы очереди не сделать копию объекта они очереди). Таким образом, как только submit_work() вернется, данные в структуре, которая была поставлена ​​в очередь, перестали быть действительными (и, вероятно, изменят указатели ссылок на мусор).

Это не происходит для первых двух элементов, находящихся в очереди, поскольку их срок службы по-прежнему действителен, пока функции потока выполняются с main() блоков в вызовах pthread_join().

Вы должны быть размещены в очереди, которые динамически распределяются, поэтому их время жизни проходит мимо lfe функции, которая их ставит в очередь. Конечно, тогда вам также необходимо иметь функции, которые деактивируют элементы, освобождая память. Конечно, это означает, что вы также должны динамически распределять элементы, помещенные непосредственно в main().

+0

Большое спасибо за этот ответ !! Я бы никогда не обнаружил, что это будет проблемой. – bodokaiser

+0

Я собираюсь дать вам щедрость, как в 23 часа ... – bodokaiser

2

Вы выполнить этот цикл в worker() без проведения мьютекс:

while (QUEUE_EMPTY(&queue)) { 
    pthread_cond_wait(&cond, &mutex); 
} 

pthread_cond_wait() можно назвать только держа семафор, который вы передаете его - это будет также возвратитесь с заблокированным мьютексом, поэтому вы не должны сразу звонить pthread_mutex_lock(&mutex) после этого цикла - он уже заблокирован.

Так что бит кода должен выглядеть примерно так:

pthread_mutex_lock(&mutex); 
while (QUEUE_EMPTY(&queue)) { 
    pthread_cond_wait(&cond, &mutex); 
} 

// don't call pthread_mutex_lock(&mutex) here... 
+0

Я изменил это, но проблема все еще остается. Я пытаюсь загрузить это где-нибудь, где вы можете запустить его самостоятельно. wait – bodokaiser

+0

Сделайте cd для потоков, а затем вы можете запустить make pool && ./pool.o https://c9.io/bodokaiser/libuv-internals – bodokaiser

+0

Возможно, вы можете отправить 'queue.h' в вопрос - я могу ' t действительно делает что-то большее, чем читать его на cloud9. Интересно, связана ли ваша проблема с потоком или просто старой ошибкой в ​​обработке очереди. Возможно, вы можете взять весь код, связанный с потоком, из своей программы и иметь 'main()' просто вызвать 'worker()' после 'submit_work()' и посмотреть, получится ли вы аналогичный сбой. –

Смежные вопросы