2009-09-15 3 views
80

Мой вопрос относится к this question. В ситуациях, когда я использую очередь для связи между потоками производителя и потребителя, люди обычно рекомендуют использовать LinkedBlockingQueue или ConcurrentLinkedQueue?LinkedBlockingQueue vs ConcurrentLinkedQueue

Каковы преимущества/недостатки использования одного над другим?

Основное отличие, которое я вижу с точки зрения API, состоит в том, что LinkedBlockingQueue может быть дополнительно ограничено.

ответ

83

Для производителя/потребительской нити я не уверен, что ConcurrentLinkedQueue - даже разумный вариант - он не реализует BlockingQueue, что является фундаментальным интерфейсом для очередей производителей/потребителей IMO. Вам нужно будет позвонить poll(), немного подождать, если вы ничего не нашли, а затем снова опросите и т. Д. ... приводя к задержкам при поступлении нового элемента и неэффективности, когда он пуст (из-за неохотного пробуждения от сна).

Из документов для BlockingQueue:

BlockingQueue реализации предназначены для использования в первую очередь для производителей-потребителей очередей

Я знаю, что это не строго говорят, что только блокирование очереди должны используется для очередей производителей-потребителей, но даже в этом случае ...

+3

Спасибо Jon - я не заметил. Итак, где/почему вы используете ConcurrentLinkedQueue? – Adamski

+18

Когда вам нужно получить доступ к очереди из большого количества потоков, но вам не нужно «ждать» на ней. –

+1

«ConcurrentLinkedQueue» также полезен, если ваш поток проверяет несколько очередей. Например, на сервере с несколькими арендаторами. Исходя из соображений изоляции, вы не используете одну блокирующую очередь и дискриминатор арендатора. – LateralFractal

0

Другим решением (которое недостаточно масштабируется) является канал рандеву: java.u til.concurrent SynchronousQueue

1

Если ваша очередь не расширяется и содержит только один поток производителя/потребителя. Вы можете использовать блокированную очередь (вам не нужно блокировать доступ к данным).

12

LinkedBlockingQueue блокирует потребителя или производителя, когда очередь пуста или заполнена, а соответствующая потребительская/потоковая передача - спать. Но эта функция блокировки поставляется с затратами: каждая операция ввода или принятия блокируется между производителями или потребителями (если их много), поэтому в сценариях со многими производителями/потребителями операция может быть медленнее.

ConcurrentLinkedQueue не использует блокировки, но CAS, на своих операциях ввода/вывода, потенциально уменьшающих конкуренцию со многими потоками производителей и потребителей. Но, будучи структурой данных «без ожидания», ConcurrentLinkedQueue не будет блокироваться при пустом значении, что означает, что потребитель должен будет обрабатывать значения take(), возвращающие значения null, например, «ожидание ожидания», при этом потребительский поток потребляет процессор.

Так какой из них «лучше» зависит от количества потребительских потоков, от скорости, которую они потребляют/производят, и т. Д. Для каждого сценария необходим эталон.

Один конкретный случай использования, где ConcurrentLinkedQueue явно лучше, когда производители первого производить что-то и закончить свою работу, размещая работу в очереди и только после того, как потребители начинает потреблять, зная, что они будут делать, когда очередь пусто. (здесь нет параллелизма между производителем-потребителем, но только между производителем-производителем и потребителем-потребителем)

+0

один сомнения здесь. Как вы говорите, потребитель ждет, когда очередь пуста. Как долго она ждет. Кто уведомит об этом, чтобы не ждать? – Brinal

+0

@brindal Единственный способ подождать, о котором я знаю, находится в цикле. Это серьезная проблема, на которую здесь не было уделено особого внимания. Простое выполнение цикла, ожидающего данных, использует много процессорного времени. Вы узнаете это, когда ваши поклонники начнут вращаться. Единственное средство - поспать в петле. Таким образом, это проблема в системе с непоследовательным потоком данных. Возможно, я неправильно понимаю ответ АлександруНедельку, но сама операционная система является параллельной системой, которая была бы крайне неэффективной, если бы она была полна неблокирующих циклов событий. – orodbhen

38

Этот вопрос заслуживает лучшего ответа.

Java ConcurrentLinkedQueue основан на знаменитом algorithm by Maged M. Michael and Michael L. Scott для non-blocking lock-free очередей.

«Non-blocking» как термин здесь для конкурирующего ресурса (наша очередь) означает, что независимо от того, что делает планировщик платформы, например, прерывание потока, или если поток, о котором идет речь, слишком медленный, другие конфликтующие темы потому что тот же ресурс все равно сможет прогрессировать. Например, если блокировка задействована, поток, удерживающий блокировку, может быть прерван, и все потоки, ожидающие блокировки, будут заблокированы. Встроенные блокировки (ключевое слово synchronized) в Java также могут иметь суровое наказание за производительность - например, когда участвует biased locking, и у вас есть конфликт, или после того, как VM решает «раздуть» блокировку после периода отсрочки отжима и блокировать конфликтующие потоки ... поэтому во многих контекстах (сценариях с низким/средним соперничеством) выполнение сравнений и наборов по атомным ссылкам может быть намного более эффективным, и это именно то, что делают многие неблокирующие структуры данных.

Java ConcurrentLinkedQueue не только не блокирует, но обладает прекрасным свойством, которое производитель не поддерживает с потребителем. В одном сценарии производителя/единого потребителя (SPSC) это действительно означает, что говорить об этом не будет. В сценарии множественного производителя/единого потребителя потребитель не будет бороться с производителями. У этой очереди есть соперничество, когда несколько продюсеров пытаются offer(), но это параллелизм по определению. Это в основном общая цель и эффективная неблокирующая очередь.

Как для него не быть BlockingQueue, ну, блокируя нить, чтобы ждать в очереди, является причудливо ужасным способом проектирования параллельных систем. Не. Если вы не можете понять, как использовать ConcurrentLinkedQueue в сценарии «потребитель/производитель», просто переключитесь на абстракции более высокого уровня, например, на хорошую актерскую структуру.

+4

В последнем абзаце, почему вы говорите, что ожидание очереди - ужасный способ разработки параллельных систем? Если у нас есть нить-группа с 10 потоками, которые выполняют задачи из заданий, что не так с блокировкой, когда задача имеет менее 10 задач? – Pacerier

+9

@AlexandruNedelcu Вы не можете сделать резкое заявление вроде «причудливо ужасного», где очень часто те самые рамки актера, которые вы говорите, используют использование threadpools, которые вы сами * BlockingQueue *. Если вам нужна высокоактивная система, и вы знаете, как справляться с противодавлением (что-то, что блокирует очереди), чем неблокирование явно превосходит. Но ... часто блокирование IO и блокирующих очередей часто может выполнять неблокирование, особенно если у вас есть длинные запущенные задачи, которые связаны с IO и не могут быть делятся на n '. –

+0

@AdamGent - У фреймворков-актеров есть реализация почтовых ящиков на основе блокирующих очередей, но на мой взгляд это ошибка, потому что блокирование не работает над асинхронными границами и, следовательно, работает только в демонстрационных версиях. Для меня это было источником разочарования, так как, например, понятие Акки о переполнении состоит в том, чтобы блокировать, а не отбрасывать сообщения, до версии 2.4, которая еще не вышла. Тем не менее я не верю, что существуют прецеденты, для которых блокирующие очереди могут быть выше. Вы также объединяете две вещи, которые не должны быть объединены. Я не говорил о блокировании ввода-вывода. –

1

FYI, попробуйте это --- Вид Hacky, не очень научно, но, возможно, интересно:

import java.util.ArrayList; 
import java.util.List; 
import java.util.Queue; 
import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.CompletableFuture; 
import java.util.concurrent.ConcurrentLinkedQueue; 
import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.LinkedBlockingQueue; 
import java.util.function.Supplier; 

class QueueDemo { 
    private static final int NUM_NODES = 1000000; 
    private static final int NUM_TRIALS = 10; 
    private static final int NUM_THREADS = 8; 
    private static final Object ANY_OBJECT = new Object(); 

    public static void main(String[] args) { 
     Queue<Object> qN = new ConcurrentLinkedQueue<>(); 
     Queue<Object> qB = new LinkedBlockingQueue<>(); 
     Queue<Object> qA = new ArrayBlockingQueue<>(NUM_NODES+1); 

     for (int i=0 ; i<NUM_TRIALS ; i++) { 
      doOneTrial(qN, "non-blocking"); 
      doOneTrial(qB, " blocking"); 
      doOneTrial(qA, "  Array"); 
     } 
    } 

    private static void doOneTrial(final Queue<Object> q, String name) { 
     List<CompletableFuture<Integer>> futures = new ArrayList<>(NUM_THREADS); 
     CountDownLatch startSignal = new CountDownLatch(1); 
     CountDownLatch doneSignal = new CountDownLatch(NUM_THREADS); 

     fillQueue(q); 
     for (int i=0 ; i<NUM_THREADS ; i++) { 
      futures.add(CompletableFuture.supplyAsync(new Supplier<Integer>() { 
       public Integer get() { 
        int count = 0; 
        wait4(startSignal); 
        while (q.poll() != null) count++; 
        doneSignal.countDown(); 
        return count; 
       } 
      })); 
     } 

     long startTime = System.currentTimeMillis(); 
     startSignal.countDown(); 
     wait4(doneSignal); 
     long endTime = System.currentTimeMillis(); 

     int count = 0; 
     for (CompletableFuture<Integer> future : futures) { 
      count += future.join(); 
     } 
     if (count == NUM_NODES) { 
      System.out.println(name + ", " + Long.toString(endTime-startTime)); 
     } else { 
      System.out.println("Aieeeeeegh!"); 
      System.exit(1); 
     } 
    } 

    private static void fillQueue(Queue<Object> q) { 
     for (int i=0 ; i<NUM_NODES ; i++) { 
      q.add(ANY_OBJECT); 
     } 
    } 

    private static void wait4(CountDownLatch latch) { 
     try { 
      latch.await(); 
     } catch (InterruptedException ex) { 
      throw new RuntimeException(ex); 
     } 
    } 
} 
Смежные вопросы