Одно простое и надежное решение этих проблем с производителем-потребителем - это один и тот же список сообщений, защищенных мьютексом. Использование C99 и pthreads:
#define _POSIX_C_SOURCE 200809L
#include <stdlib.h>
#include <stdarg.h>
#include <stdio.h>
#include <errno.h>
struct message {
struct message *next;
/* Payload is irrelevant, here just as an example: */
size_t size;
char data[];
};
typedef struct {
pthread_mutex_t lock;
pthread_cond_t more;
struct message *newest;
struct message *oldest;
} queue;
#define QUEUE_INITIALIZER { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, NULL, NULL }
/* Nonblocking variant */
struct message *try_dequeue(queue *const q);
/* Blocking variants */
int enqueue(queue *const q, struct message *const m);
struct message *dequeue(queue *const q);
/* Suggested interface for queueing a new message */
int queue_printf(queue *const q, const char *const format, ...);
Реализация прост.
- Односвязный список
struct message
имеет самые старые сообщения с новыми сообщениями, прилагаемыми к концу.
- Пустая очередь имеет
newest == NULL
и oldest == NULL
.
- Все операции с очередью (
enqueue()
, dequeue()
, try_dequeue()
) принимают очередь lock
мьютексы перед рассмотрением указателей. (Чтобы уменьшить конфликт при интенсивном использовании, держите время блокировки как можно короче, другими словами, сначала полностью создайте сообщение, прежде чем делать блокировку.)
- Блокирующий вызов по вызову может ждать нового сообщения, ожидая на
more
(если очередь пуста).
- Когда первое сообщение находится в очереди, на него указывают как
newest
, так и oldest
.
- При вводе первого сообщения сигнализируется переменная состояния
more
, в случае, если существует очередь блокировки, ожидающая нового сообщения.
- Включить дополнительные наборы сообщений
newest->next
, чтобы указать на новое сообщение, а затем устанавливает newest
, чтобы указать на новое сообщение.
- Dequeuing отделяет член
oldest
от списка, обновляя oldest
, чтобы указать на oldest->next
. Если oldest
станет NULL
(тогда оба newest
и oldest
указали на то же сообщение, это единственное сообщение в очереди), также newest
установлен в NULL
, так как очередь становится пустой.
- Заблокировать сообщение может только сбой при блокировке mutex
lock
(и это обычно происходит только в случае сбоя, если библиотека C обнаруживает тупиковую ситуацию), или если у вас есть проверки, которые отмечают, что структура очереди находится в противоречивом состоянии (например, один, но не оба, от newest
и oldest
- это NULL
, например). Логика в вышеупомянутом прототипе заключается в том, что он возвращает 0
, если успех и код ошибки errno в противном случае (EINVAL
, EDEADLK
). Я также хотел бы установить errno
на этот код ошибки тоже для симметрии с dequeueing.
- Отказ от сообщения может быть сбой по тем же причинам, что и enqueuing, плюс также, когда очередь пуста (
EWOULDBLOCK
/EAGAIN
). В этих случаях функция может вернуть NULL
с набором errno
.
Как вы можете видеть, обе очереди и dequeue являются операциями O (1), т. Е. Принимают постоянное время; нет необходимости проходить весь список в любой точке.
Операция enqueue/dequeue может en/dequeue более одного сообщения сразу, просто повторяя вышеизложенное. Однако это довольно редко приходится делать на практике. (Для устранения очереди основная причина заключается в том, что если вы захватываете более одного сообщения за раз, и у вас есть ошибка или ошибка с одним сообщением, вам приходится иметь дело с ошибкой и еще необработанными, но перегруженными сообщениями, склонная Легче делать вещи по одному Кроме того, если заказ сообщений не является критическим, вы всегда можете иметь больше чем один потребитель работает параллельно, если они из очереди сообщений по одному)
Дополнительных примечания...:
Если вы полагаетесь на стандарт C99, вы можете использовать тот же код для любых типов структур, которые начинаются с struct message *next;
. По правилам C99 такие структуры совместимы (для этой общей начальной части), и это единственная часть, к которой обращаются операции очереди.
Другими словами, если у вас есть несколько типов сообщений, каждый хранится в своей собственной очереди, вам нужно только одну реализацию enqueue()
/dequeue()
/try_dequeue()
для всех различных типов сообщений, до тех пор, как структуры сообщений начинаются с struct message *next;
.
Для безопасности типа, вам нужно тривиальные функции оболочки:
static inline int enqueue_yourtype(yourtype_queue *const q, struct yourtype_message *const m)
{
return enqueue((queue *const)q, (struct message *const)m);
}
static inline struct yourtype_message *dequeue_yourtype(yourtype_queue *const q)
{
return dequeue((queue *const)q);
}
static inline struct yourtype_message *try_dequeue_yourtype(yourtype_queue *const q)
{
return try_dequeue((queue *const)q);
}
, что, когда это определено в заголовочном файле, не должны фактически генерировать любые накладные расходы - на самом деле, не должен генерировать какой-либо дополнительный код вообще, если вы по какой-либо причине не берете адрес по одному (в этом случае одна неинлайн-версия должна быть выбрана в каждом блоке компиляции с адресом одного). Тем не менее, они обеспечивают проверку типов во время компиляции, что часто полезно.
«* возможно использовать очередь FIFO, доступную для всех потоков *« Да. – alk
"* ... mutexes [...] переменные, но они здесь не так полезны здесь *" уверены, что они, а именно, для защиты буфера FIFO-потока потока от параллельного доступа. – alk
Пожалуйста, смотрите 'man queue' и/или https://en.wikipedia.org/wiki/Queue_%28abstract_data_type%29#Queue_implementation – alk