2015-08-20 4 views
0

В программе C некоторые потоки (pthread1, pthread2, ...) генерируют сообщение, а полученные сообщения обрабатываются дополнительным потоком (pthreadprint), который их печатает.Связь между двумя pthreads

Сообщения могут «накапливаться» перед обработкой: более одного сообщения могут стекаться в определенное время в качестве ввода в pthreadprint, в зависимости от времени, необходимого для печати pthreadprint. Я, очевидно, не знаю максимального количества сообщений, которые могут ждать обработки в худшем сценарии.

Что это может быть лучший способ отправить это сообщение (от pthread1, pthread2, ... что «производят» их) до pthreadprint, который их печатает? Они должны не только передаваться, но и «накапливаться». Я знаю о мьютексах и переменных условий, но они не представляют очереди: возможно ли использовать очередь FIFO, доступную для всех потоков?

+2

«* возможно использовать очередь FIFO, доступную для всех потоков *« Да. – alk

+1

"* ... mutexes [...] переменные, но они здесь не так полезны здесь *" уверены, что они, а именно, для защиты буфера FIFO-потока потока от параллельного доступа. – alk

+2

Пожалуйста, смотрите 'man queue' и/или https://en.wikipedia.org/wiki/Queue_%28abstract_data_type%29#Queue_implementation – alk

ответ

0

Это, кажется, типичный первое назначение года на «производитель-потребитель» проблемы в межпроцессного коммуникаций (может быть, в классе OS?)

Что вам нужно, это способ передачи информации от процесса к другому процессу , Есть несколько способов сделать это, тем проще быть «общей памятью», то есть частью памяти, доступной всем процессам.

С моей точки зрения, я предлагаю реализовать очередь. Затем весь ваш процесс должен иметь доступ к этой очереди, но ТОЛЬКО, когда это разрешено (т. Е. Результат из мьютекса). Вы также можете увеличить очередь, добавив функциональность, которая проверяет размер (например, int isQueueFull(void);, учитывая, что каждый узел очереди также должен иметь индекс, например struct{ queueNode* nextNode; unsigned int index; void* data;}).

Если у вас все это в том же начальном процессе, а затем вы запускаете потоки из него, то все ваши «подпроцессы» будут иметь доступ к тому же пространству памяти, что и родительский процесс.

1

Одно простое и надежное решение этих проблем с производителем-потребителем - это один и тот же список сообщений, защищенных мьютексом. Использование C99 и pthreads:

#define _POSIX_C_SOURCE 200809L 
#include <stdlib.h> 
#include <stdarg.h> 
#include <stdio.h> 
#include <errno.h> 

struct message { 
    struct message *next; 
    /* Payload is irrelevant, here just as an example: */ 
    size_t   size; 
    char   data[]; 
}; 

typedef struct { 
    pthread_mutex_t lock; 
    pthread_cond_t more; 
    struct message *newest; 
    struct message *oldest; 
} queue; 
#define QUEUE_INITIALIZER { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, NULL, NULL } 

/* Nonblocking variant */ 
struct message *try_dequeue(queue *const q); 

/* Blocking variants */ 
int enqueue(queue *const q, struct message *const m); 
struct message *dequeue(queue *const q); 

/* Suggested interface for queueing a new message */ 
int queue_printf(queue *const q, const char *const format, ...); 

Реализация прост.

  • Односвязный список struct message имеет самые старые сообщения с новыми сообщениями, прилагаемыми к концу.
  • Пустая очередь имеет newest == NULL и oldest == NULL.
  • Все операции с очередью (enqueue(), dequeue(), try_dequeue()) принимают очередь lock мьютексы перед рассмотрением указателей. (Чтобы уменьшить конфликт при интенсивном использовании, держите время блокировки как можно короче, другими словами, сначала полностью создайте сообщение, прежде чем делать блокировку.)
  • Блокирующий вызов по вызову может ждать нового сообщения, ожидая на more (если очередь пуста).
  • Когда первое сообщение находится в очереди, на него указывают как newest, так и oldest.
  • При вводе первого сообщения сигнализируется переменная состояния more, в случае, если существует очередь блокировки, ожидающая нового сообщения.
  • Включить дополнительные наборы сообщений newest->next, чтобы указать на новое сообщение, а затем устанавливает newest, чтобы указать на новое сообщение.
  • Dequeuing отделяет член oldest от списка, обновляя oldest, чтобы указать на oldest->next. Если oldest станет NULL (тогда оба newest и oldest указали на то же сообщение, это единственное сообщение в очереди), также newest установлен в NULL, так как очередь становится пустой.
  • Заблокировать сообщение может только сбой при блокировке mutex lock (и это обычно происходит только в случае сбоя, если библиотека C обнаруживает тупиковую ситуацию), или если у вас есть проверки, которые отмечают, что структура очереди находится в противоречивом состоянии (например, один, но не оба, от newest и oldest - это NULL, например). Логика в вышеупомянутом прототипе заключается в том, что он возвращает 0, если успех и код ошибки errno в противном случае (EINVAL, EDEADLK). Я также хотел бы установить errno на этот код ошибки тоже для симметрии с dequeueing.
  • Отказ от сообщения может быть сбой по тем же причинам, что и enqueuing, плюс также, когда очередь пуста (EWOULDBLOCK/EAGAIN). В этих случаях функция может вернуть NULL с набором errno.

Как вы можете видеть, обе очереди и dequeue являются операциями O (1), т. Е. Принимают постоянное время; нет необходимости проходить весь список в любой точке.

Операция enqueue/dequeue может en/dequeue более одного сообщения сразу, просто повторяя вышеизложенное. Однако это довольно редко приходится делать на практике. (Для устранения очереди основная причина заключается в том, что если вы захватываете более одного сообщения за раз, и у вас есть ошибка или ошибка с одним сообщением, вам приходится иметь дело с ошибкой и еще необработанными, но перегруженными сообщениями, склонная Легче делать вещи по одному Кроме того, если заказ сообщений не является критическим, вы всегда можете иметь больше чем один потребитель работает параллельно, если они из очереди сообщений по одному)


Дополнительных примечания...:

Если вы полагаетесь на стандарт C99, вы можете использовать тот же код для любых типов структур, которые начинаются с struct message *next;. По правилам C99 такие структуры совместимы (для этой общей начальной части), и это единственная часть, к которой обращаются операции очереди.

Другими словами, если у вас есть несколько типов сообщений, каждый хранится в своей собственной очереди, вам нужно только одну реализацию enqueue()/dequeue()/try_dequeue() для всех различных типов сообщений, до тех пор, как структуры сообщений начинаются с struct message *next;.

Для безопасности типа, вам нужно тривиальные функции оболочки:

static inline int enqueue_yourtype(yourtype_queue *const q, struct yourtype_message *const m) 
{ 
    return enqueue((queue *const)q, (struct message *const)m); 
} 

static inline struct yourtype_message *dequeue_yourtype(yourtype_queue *const q) 
{ 
    return dequeue((queue *const)q); 
} 

static inline struct yourtype_message *try_dequeue_yourtype(yourtype_queue *const q) 
{ 
    return try_dequeue((queue *const)q); 
} 

, что, когда это определено в заголовочном файле, не должны фактически генерировать любые накладные расходы - на самом деле, не должен генерировать какой-либо дополнительный код вообще, если вы по какой-либо причине не берете адрес по одному (в этом случае одна неинлайн-версия должна быть выбрана в каждом блоке компиляции с адресом одного). Тем не менее, они обеспечивают проверку типов во время компиляции, что часто полезно.

Смежные вопросы