2010-06-25 3 views
32

Возможно, это глупый вопрос, но я не могу найти очевидного ответа.Параллельная установка

Мне нужна параллельная очередь FIFO, которая содержит только уникальные значения. Попытка добавить значение, которое уже существует в очереди, просто игнорирует это значение. Что, если бы не безопасность потока, было бы тривиально. Есть ли структура данных на Java или, возможно, код snipit на interwebs, который проявляет это поведение?

+0

К сожалению, термин «очередь» неоднозначно, так как для некоторых читателей это неявно означает «очереди FIFO» в то время как для других он имеет более общее значение 'java.util.Queue', что в основном означает любую коллекцию, которая имеет * некоторую * концепцию« элемента head », независимо от того, является ли этот элемент первым или нет. Так! Что он? –

+0

FIFO, извините за упущение =) –

ответ

5

Если вам нужен лучший параллелизм, чем полная синхронизация, есть один способ, которым я это знаю, используя ConcurrentHashMap в качестве карты поддержки. Ниже приведен только эскиз.

public final class ConcurrentHashSet<E> extends ForwardingSet<E> 
    implements Set<E>, Queue<E> { 
    private enum Dummy { VALUE } 

    private final ConcurrentMap<E, Dummy> map; 

    ConcurrentHashSet(ConcurrentMap<E, Dummy> map) { 
    super(map.keySet()); 
    this.map = Preconditions.checkNotNull(map); 
    } 

    @Override public boolean add(E element) { 
    return map.put(element, Dummy.VALUE) == null; 
    } 

    @Override public boolean addAll(Collection<? extends E> newElements) { 
    // just the standard implementation 
    boolean modified = false; 
    for (E element : newElements) { 
     modified |= add(element); 
    } 
    return modified; 
    } 

    @Override public boolean offer(E element) { 
    return add(element); 
    } 

    @Override public E remove() { 
    E polled = poll(); 
    if (polled == null) { 
     throw new NoSuchElementException(); 
    } 
    return polled; 
    } 

    @Override public E poll() { 
    for (E element : this) { 
     // Not convinced that removing via iterator is viable (check this?) 
     if (map.remove(element) != null) { 
     return element; 
     } 
    } 
    return null; 
    } 

    @Override public E element() { 
    return iterator().next(); 
    } 

    @Override public E peek() { 
    Iterator<E> iterator = iterator(); 
    return iterator.hasNext() ? iterator.next() : null; 
    } 
} 

Все это не солнечный свет с таким подходом. У нас нет подходящего способа выбрать элемент head, отличный от использования карты поддержки entrySet().iterator().next(), в результате чего карта становится все более неуравновешенной с течением времени. Этот дисбаланс является проблемой как из-за больших коллизионных столкновений, так и для большей конкуренции.

Примечание: этот код использует Guava в нескольких местах.

+3

Как это сохранить порядок «очереди»? Порядок итераций зависит от реализации карты поддержки. – erickson

+1

Сохраните, в каком порядке? Я не вижу никаких требований относительно порядка, если не предполагается, что он хочет * FIFO * очередь ... Я добавил комментарий, чтобы спросить. –

+0

@NitsanWakart Вопрос, на который я отвечал, заключался в том, как получить «очередь». –

5

Существует не встроенная коллекция, которая делает это. Существует несколько параллельных реализаций Set, которые могут использоваться вместе с параллельным Queue.

Например, элемент добавляется в очередь только после того, как он был успешно добавлен в набор, и каждый элемент, удаленный из очереди, удаляется из набора. В этом случае содержимое очереди логически является тем, что находится в наборе, и очередь используется только для отслеживания заказа и обеспечения эффективных операций take() и poll(), найденных только на BlockingQueue.

+1

Одна из моих реализаций использовала LinkedHashSet, так что у меня была только одна структура данных и могла зависеть от порядка. Тем не менее, алгоритм ConcurrentQueue довольно немного сложнее, чем использование блокировок синхронизации, и мне было интересно, есть ли набор исполнителей, который имел дополнительное ограничение уникальности. –

+0

@Ambience - такой коллекции нет. Моя методика позволяет использовать параллельную «очередь» (либо «LinkedBlockingQueue», если вам нужно «take()» или «ConcurrentLinkedQueue», если вам нужно только 'poll()'), сохраняя порядок FIFO, добавляя 'Set'-like уникальность. – erickson

4

A java.util.concurrent.ConcurrentLinkedQueue доставит вам больше всего пути.

Оберните ConcurrentLinkedQueue своим собственным классом, который проверяет уникальность добавления. Ваш код должен быть потокобезопасным.

+1

После того, как он завернут, он больше не будет потокобезопасным, даже если он основан на 'ConcurrentLinkedQueue'. – finnw

+0

@finnw: Хорошо. Я обновил свой ответ. –

+1

Моя реализация первого прохода в значительной степени делает именно это, но меня беспокоит стоимость вызова .contains() в связанной очереди списка ссылок, а также синхронизация методов очереди полностью отрицает преимущества базового алгоритма ConcurrentLinkdQueue. –

4

Я бы использовал синхронизированный LinkedHashSet, пока не было достаточного обоснования для рассмотрения альтернатив. Основное преимущество, которое может предложить более параллельное решение, - это разделение блокировок.

Простейшим параллельным подходом будет ConcurrentHashMap (действующий как набор) и ConcurrentLinkedQueue. Порядок операций обеспечил бы необходимое ограничение. Предложение() сначала должно выполнить CHM # putIfAbsent() и, если успешно вставить в CLQ. Опрос() будет выведен из CLQ, а затем удалить его из CHM. Это означает, что мы рассматриваем запись в нашей очереди, если она находится на карте, а CLQ обеспечивает упорядочение. Затем производительность может быть скорректирована путем увеличения количества параллелизма карты. Если вы терпимы к дополнительной уверенности, тогда дешевый CHM # get() может действовать как разумное предварительное условие (но он может пострадать, будучи слегка устаревшим).

2

Что вы подразумеваете под параллельной очередью с помощью Set semantics? Если вы имеете в виду подлинно параллельную структуру (в отличие от потокобезопасной структуры), я бы утверждал, что вы просите пони.

Что происходит, например, если вы вызываете put(element) и обнаруживаете, что что-то уже есть, которое сразу же удаляется? Например, что это означает в вашем случае, если offer(element) || queue.contains(element) возвращает false?

Такие вещи часто нужно думать немного по-другому в параллельном мире, так как часто ничего не происходит, как кажется, если вы не остановите мир (заблокируйте его). В противном случае вы обычно смотрите на что-то в прошлом. Итак, что вы на самом деле пытаетесь сделать?

+1

Интересное наблюдение, если не ответ =) Вы ниже порога точки комментирования? –

+0

Я был :-) Извините. –

0

Возможно удлиниться ArrayBlockingQueue. Чтобы получить доступ к блокировке доступа к пакетам, мне пришлось поместить мой подкласс в один и тот же пакет. Предостережение: Я не проверял это.

package java.util.concurrent; 

import java.util.Collection; 
import java.util.concurrent.locks.ReentrantLock; 

public class DeDupingBlockingQueue<E> extends ArrayBlockingQueue<E> { 

    public DeDupingBlockingQueue(int capacity) { 
     super(capacity); 
    } 

    public DeDupingBlockingQueue(int capacity, boolean fair) { 
     super(capacity, fair); 
    } 

    public DeDupingBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { 
     super(capacity, fair, c); 
    } 

    @Override 
    public boolean add(E e) { 
     final ReentrantLock lock = this.lock; 
     lock.lock(); 
     try { 
      if (contains(e)) return false; 
      return super.add(e); 
     } finally { 
      lock.unlock(); 
     } 
    } 

    @Override 
    public boolean offer(E e) { 
     final ReentrantLock lock = this.lock; 
     lock.lock(); 
     try { 
      if (contains(e)) return true; 
      return super.offer(e); 
     } finally { 
      lock.unlock(); 
     } 
    } 

    @Override 
    public void put(E e) throws InterruptedException { 
     final ReentrantLock lock = this.lock; 
     lock.lockInterruptibly(); //Should this be lock.lock() instead? 
     try { 
      if (contains(e)) return; 
      super.put(e); //if it blocks, it does so without holding the lock. 
     } finally { 
      lock.unlock(); 
     } 
    } 

    @Override 
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { 
     final ReentrantLock lock = this.lock; 
     lock.lock(); 
     try { 
      if (contains(e)) return true; 
      return super.offer(e, timeout, unit); //if it blocks, it does so without holding the lock. 
     } finally { 
      lock.unlock(); 
     } 
    } 
} 
0

Простой ответ для очереди уникальных объектов могут быть следующие:

import java.util.concurrent.ConcurrentLinkedQueue; 

public class FinalQueue { 

    class Bin { 
     private int a; 
     private int b; 

     public Bin(int a, int b) { 
      this.a = a; 
      this.b = b; 
     } 

     @Override 
     public int hashCode() { 
      return a * b; 
     } 

     public String toString() { 
      return a + ":" + b; 
     } 

     @Override 
     public boolean equals(Object obj) { 
      if (this == obj) 
       return true; 
      if (obj == null) 
       return false; 
      if (getClass() != obj.getClass()) 
       return false; 
      Bin other = (Bin) obj; 
      if ((a != other.a) || (b != other.b)) 
       return false; 
      return true; 
     } 
    } 

    private ConcurrentLinkedQueue<Bin> queue; 

    public FinalQueue() { 
     queue = new ConcurrentLinkedQueue<Bin>(); 
    } 

    public synchronized void enqueue(Bin ipAddress) { 
     if (!queue.contains(ipAddress)) 
      queue.add(ipAddress); 
    } 

    public Bin dequeue() { 
     return queue.poll(); 
    } 

    public String toString() { 
     return "" + queue; 
    } 

    /** 
    * @param args 
    */ 
    public static void main(String[] args) { 
     FinalQueue queue = new FinalQueue(); 
     Bin a = queue.new Bin(2,6); 

     queue.enqueue(a); 
     queue.enqueue(queue.new Bin(13, 3)); 
     queue.enqueue(queue.new Bin(13, 3)); 
     queue.enqueue(queue.new Bin(14, 3)); 
     queue.enqueue(queue.new Bin(13, 9)); 
     queue.enqueue(queue.new Bin(18, 3)); 
     queue.enqueue(queue.new Bin(14, 7)); 
     Bin x= queue.dequeue(); 
     System.out.println(x.a); 
     System.out.println(queue.toString()); 
     System.out.println("Dequeue..." + queue.dequeue()); 
     System.out.println("Dequeue..." + queue.dequeue()); 
     System.out.println(queue.toString()); 
    } 
} 
Смежные вопросы