2015-03-23 2 views
1

Я хотел бы знать, какой был бы лучший механизм для реализации нескольких сценариев Producer - single Consumer, где мне нужно постоянно обновлять текущее количество необработанных запросов.Потребитель-производитель с высоким разрешением

Моя первая мысль была использовать ConcurrentLinkedQueue:

public class SomeQueueAbstraction { 

    private Queue<SomeObject> concurrentQueue = new ConcurrentLinkedQueue<>(); 
    private int size; 

    public void add(Object request) { 
     SomeObject object = convertIncomingRequest(request); 
     concurrentQueue.add(object); 
     size++; 

    } 

    public SomeObject getHead() { 
     SomeObject object = concurrentQueue.poll(); 
     size--; 
    } 

    // other methods 

Проблема с этим состоит в том, что я должен явно синхронизировать add и size ++, а также на poll и size--, чтобы всегда иметь точные size, что делает ConccurentLinkedQueue бессмысленно для начала.

Каким будет наилучший способ достижения максимально возможной производительности при сохранении согласованности данных?

Вместо этого я использую ArrayDequeue и явно синхронизирую, или есть лучший способ достичь этого?

Существует своего родом подобный вопрос/ответ здесь:

java.util.ConcurrentLinkedQueue

, где обсуждаются, как композиционные операции на ConcurrentLinkedQueue, естественно, не атомный, но нет прямого ответа, что это лучший вариант для данного сценарий.

Примечание: Я вычисляю размер явно, потому что временная сложность для встроенного метода .size() - это O (n).

Note2: Я также обеспокоен тем, что метод getSize(), который я явно не написал, добавит еще больше конфликтов. Его можно было бы назвать относительно часто.

Я ищу наиболее эффективный способ обработки нескольких производителей - один потребитель с частыми вызовами getSize().

Альтернативное предложение: Если в структуре SomeObject был элемент, то я мог бы получить текущий размер из ConcurrentLinkedQueue.poll(), и только блокирование должно выполняться внутри механизма для создания такого идентификатора. Теперь добавить и получить можно правильно использовать без дополнительной блокировки. Как это будет в качестве альтернативы?

+2

Имеет ли размер() метод всегда должен быть _exactly_ правильно?(Подсказка: что произойдет, если поток A вызывает size() и возвращает точно правильный ответ, но тогда, когда поток A принимает какое-то решение, основанное на размере, какой-то другой поток B пробирается и меняет размер?) –

+0

Поскольку вы пытаются сохранить количество необработанных запросов на низком уровне, почему вас беспокоит размер O (n)? Как часто вам нужно знать размер? – NamshubWriter

ответ

1

Таким образом, необходимо сообщить текущее количество необработанных запросов. И это часто запрашивается, что действительно делает ConcurrentLinkedQueue.size() непригодным.

Это можно сделать с помощью AtomicInteger: он быстрый и всегда как можно ближе к текущему количеству необработанных запросов.

Вот пример, отметят некоторые небольшие обновления для того, чтобы зарегистрированного размера является точным:

import java.util.Queue; 
import java.util.concurrent.ConcurrentLinkedQueue; 
import java.util.concurrent.atomic.AtomicInteger; 

public class SomeQueueAbstraction { 

    private final Queue<SomeObject> concurrentQueue = new ConcurrentLinkedQueue<>(); 
    private final AtomicInteger size = new AtomicInteger(); 

    public boolean add(Object request) { 

     SomeObject object = convertIncomingRequest(request); 
     if (concurrentQueue.add(object)) { 
      size.incrementAndGet(); 
      return true; 
     } 
     return false; 
    } 

    public SomeObject remove() { 

     SomeObject object = concurrentQueue.poll(); 
     if (object != null) { 
      size.decrementAndGet(); 
     } 
     return object; 
    } 

    public int getSize() { return size.get(); } 

    private SomeObject convertIncomingRequest(Object request) { 
     return new SomeObject(getSize()); 
    } 

    class SomeObject { 
     int id; 
     SomeObject(int id) { this.id = id; } 
    } 
} 
+0

Итак, это было бы очень быстро из-за реализации Atomic, но было бы небольшим шансом не обладать абсолютно точным значением в каждый данный момент? Это может быть даже достаточно хорошим для сообщения – John

+0

@ user3360241 Да. Представьте 1 элемент в очереди и 2 потока, добавляющий элемент и 1 поток, удаляющий элемент _ в то же время_. В течение короткого времени размер находится где-то между 0 и 3. Но атомная часть целого будет гарантировать, что за этот короткий период отчетный размер не будет меньше 0 и не более 3. После того, как потоки будут выполнены, размер отчета будет равен 2. – vanOekel

1

Вы можете использовать явную блокировку, что означает, что вам, вероятно, не понадобится параллельная очередь.

public class SomeQueueAbstraction { 

    private Queue<SomeObject> queue = new LinkedList<>(); 
    private volatile int size; 
    private Object lock = new Object(); 

    public void add(Object request) { 
     SomeObject object = convertIncomingRequest(request); 
     synchronized(lock) { 
      queue.add(object); 
      size++; 
     } 
    } 

    public SomeObject getHead() { 
     SomeObject object = null; 
     synchronized(lock) { 
      object = queue.poll(); 
      size--; 
     } 
     return object; 
    } 

    public int getSize() { 
     synchronized(lock) { 
      return size; 
     } 
    } 

    // other methods 
} 

Таким образом, добавление/удаление элементов в/из очереди и обновление size будет сделано безопасно.

+0

Это то же самое, что я предложил в качестве альтернативы, только с ArrayDequeue вместо LinkedList. Есть ли преимущество использования LinkedList для ArrayDeque, который должен быть быстрее? Меня беспокоит такое большое спорение, особенно учитывая, как часто можно было бы получить getSize. – John

+0

Моя плохая, эта часть меня привлекла. Дело в том, что вы хотите избежать ситуации, когда Thread A добавляет в очередь, Thread B читает 'size', а THEN Thread A обновляет' size'. Это позволяет избежать таких ситуаций. Кроме того, я думаю, что лучше сделать 'size' переменной' volatile'. (см. мое новое редактирование) – jadhachem

+0

Если вы делаете размер volatile, getSize строго не нужно синхронизировать. Как и в случае, если он изменчив, он является избыточным. – Necreaux

Смежные вопросы