2012-06-07 3 views
5

У меня есть многопоточное приложение, в котором есть один поток производителей и несколько потоков потребителей. Данные хранятся в общей потоковой безопасной коллекции и сбрасываются в базу данных, когда в буфере имеется достаточное количество данных.Буферизация данных в многопоточном приложении Java

Из Javadocs -

BlockingQueue<E> 

очередь, что дополнительно поддерживает операции, которые ожидают очереди, чтобы стать непустыми при получении элемента, и ждать пространства, чтобы стать доступными в очереди при хранении элемента ,

take() 

Извлекает и удаляет головку этой очереди, ожидая при необходимости, пока элемент не станет доступным.

Мои вопросы -

  1. Есть еще одна коллекция, которая имеет метод E [принимать] (Int N)? то есть блокировка очереди ожидает, пока элемент не будет доступен. Я хочу , что он должен подождать, пока не будет доступно 100 или 200 элементов.
  2. В качестве альтернативы, есть ли другой способ, который я мог бы использовать для решения проблемы без опроса?
+0

Должны ли элементы распределяться поровну каждому потребителю, или если первый потребитель к методу принятия получает первые элементы 'n', второй потребитель - следующие элементы' n' и т. Д.? – SimonC

+0

Это действительно то, что вы хотите сделать?Это может привести к почти произвольной большой задержке между производимыми данными и их сбросом в базу данных, если темпы производства будут замедляться за пределами того, что вы в конечном итоге настраиваете. Если вам действительно нужно сделать эту буферизацию, то ваша логика должна, вероятно, быть более похожей на «Подождите, пока у меня не будет элементов N или X ms» – DRMacIver

+0

Почему вы хотите подождать? Почему бы просто не использовать 'drain()'? Я бы написал все имеющиеся у вас данные до некоторого максимума, и я бы предпочел не терять данные. –

ответ

2

Я думаю, что единственный способ либо продлить некоторую реализацию BlockingQueue, либо создать какой-либо полезный метод, используя take:

public <E> void take(BlockingQueue<E> queue, List<E> to, int max) 
     throws InterruptedException { 

    for (int i = 0; i < max; i++) 
     to.add(queue.take()); 
} 
+0

На самом деле, ваш подход намного более здравый, чем мой, предполагая, что только один будет один потребитель. – Zarkonnen

+1

Этот подход не имеет отношения к InterruptedException вообще, потому что вы теряете любые элементы, которые были приняты, если прерваны. На самом деле нужно добавить элементы в переданный в коллекцию, если он не будет обрабатывать само прерывание, или поймать и вернуть те элементы, которые до сих пор удалены, если они этого захотят. – DRMacIver

+0

О, хорошая точка +1 на комментарий. Обновлено! – dacwe

1

Я не уверен, есть ли подобный класс в стандартной библиотеке, которая имеет take(int n) метод типа, но вы должны быть в состоянии обернуть по умолчанию BlockingQueue, чтобы добавить эту функцию, не слишком много хлопот, не вы думать?

Альтернативный сценарий должен был инициировать действие, в котором вы помещаете элементы в коллекцию, где установленный вами порог запускает промывку.

2

Метод drainTo не совсем то, что вы ищете, но будет ли он служить вашей цели?

http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html#drainTo(java.util.Collection, целое)

EDIT

Вы могли бы реализовать несколько более производительную партия блокирует takemin используя комбинацию take и drainTo:

public <E> void drainTo(final BlockingQueue<E> queue, final List<E> list, final int min) throws InterruptedException 
{ 
    int drained = 0; 
    do 
    { 
    if (queue.size() > 0) 
     drained += queue.drainTo(list, min - drained); 
    else 
    { 
     list.add(queue.take()); 
     drained++; 
    } 
    } 
    while (drained < min); 
} 
+1

, как указано в другом ответе (который теперь, по-видимому, удален), это то, что он хочет делать. – posdef

+0

Я обновил ответ, указав, что он не решает точный вопрос. Иногда ОП не знает альтернативных решений, поэтому всегда стоит задаться вопросом. – SimonC

+0

Не могу утверждать, что :) – posdef

1

Таким образом, это должна быть потоковая очередь, которая позволяет блокировать принятие произвольного количества элементов. Больше глаз, чтобы проверить код резьбы, будет правильным.

package mybq; 

import java.util.ArrayList; 
import java.util.LinkedList; 
import java.util.List; 

public class ChunkyBlockingQueue<T> { 
    protected final LinkedList<T> q = new LinkedList<T>(); 
    protected final Object lock = new Object(); 

    public void add(T t) { 
     synchronized (lock) { 
      q.add(t); 
      lock.notifyAll(); 
     } 
    } 

    public List<T> take(int numElements) { 
     synchronized (lock) { 
      while (q.size() < numElements) { 
       try { 
        lock.wait(); 
       } catch (InterruptedException e) { 
        Thread.currentThread().interrupt(); 
       } 
      } 
      ArrayList<T> l = new ArrayList<T>(numElements); 
      l.addAll(q.subList(0, numElements)); 
      q.subList(0, numElements).clear(); 
      return l; 
     } 
    } 
} 
+1

'notifyAll' в' add' здесь немного расточительно. Он должен быть единственным «уведомлять», а затем «принимать» может снова вызвать «оповещение», если осталось еще много элементов, когда это будет сделано. – SimonC

Смежные вопросы