2013-05-19 4 views
1

Я пытаюсь выполнить некоторую выборочную реализацию блокирующей очереди с массивом с фиксированной длиной массивов байтов. Я не удаляю опрошенные элементы, поэтому я скорректировал метод put для возврата массива байтов, чтобы он мог быть написан напрямую (поток производителя использует MappedByteBuffer для записи непосредственно в этот массив байтов). Я добавил метод «commitPut()», чтобы просто увеличить счетчики и установить массивы «lengths». (если несколько потоков будут писать, это могут быть проблемы параллелизма, но я знаю, что пишет только один поток).Ошибка блокировки очереди блокировки

Ниже приведено то, что у меня есть. Он работает, если я отлаживаю шаг за шагом, но если я «запустил», похоже, что он сталкивается с некоторыми проблемами с блокировкой. Я скопировал, разделил и скорректировал код ArrayBlockingQueue. Может ли кто-нибудь с лучшими знаниями посмотреть класс и рассказать мне, что я делаю неправильно, или как это сделать лучше (например, написать directy для буферизации и задать длину массива и счетчиков на том же шаге)?

public class ByteArrayBlockingQueue { 

    private final int[] lens; // array to valid lengths 
    private final byte[][] items; // array of byte arrays 

    private int takeIndex = 0; 
    private int putIndex = 0; 
    private int count = 0; 

    public volatile int polledLen = 0; // lenght of last polled byte array 

    private final ReentrantLock lock; 
    private final Condition notEmpty; 
    private final Condition notFull; 

    final int inc(int i) { 
     return (++i == items.length)? 0 : i; 
    } 

    public ByteArrayBlockingQueue(int capacity, int size, boolean fair) { 
     if (capacity <= 0) 
      throw new IllegalArgumentException(); 
     this.items = new byte[capacity][size]; 
     this.lens = new int[capacity]; 
     lock = new ReentrantLock(fair); 
     notEmpty = lock.newCondition(); 
     notFull = lock.newCondition(); 
    } 

    public byte[] put() throws InterruptedException { 
     final byte[][] items = this.items; 
     final ReentrantLock lock = this.lock; 
     lock.lockInterruptibly(); 
     try { 
      try { 
       while (count == items.length) 
        notFull.await(); 

      } catch (InterruptedException ie) { 
       notFull.signal(); // propagate to non-interrupted thread 
       throw ie; 
      } 
      //insert(e, len); 
      return items[putIndex]; 
     } finally { 
      lock.unlock(); 
     } 
    } 

    public void commitPut(int lenBuf) throws InterruptedException { 
     final ReentrantLock lock = this.lock; 
     lock.lockInterruptibly(); 
     try { 
      lens[putIndex] = lenBuf; 
      putIndex = inc(putIndex); 
      ++count; 
      notEmpty.signal(); 
     } finally { 
      lock.unlock(); 
     } 
    } 

    public byte[] poll() { 
     final ReentrantLock lock = this.lock; 
     lock.lock(); 
     try { 
      if (count == 0) 
       return null; 
      final byte[][] items = this.items; 
      final int[] lens = this.lens; 
      byte[] e = items[takeIndex]; 
      this.polledLen = lens[takeIndex]; 
      //items[takeIndex] = null; 
      takeIndex = inc(takeIndex); 
      --count; 
      notFull.signal(); 
      return e; 

     } finally { 
      lock.unlock(); 
     } 
    } 
} 
+1

Можете ли вы описать «проблемы с блокировкой», с которыми вы сталкиваетесь? – jop

+0

Кажется, что «потоки производителя» блокируют «потребительскую нить». Я не совсем получаю эти блокировки, хотя они кажутся мне полезными (почти тот же принцип, что и другие встроенные блокирующие очереди). Если я пройду через отладчик, установив точки останова в «poll» и «put» - он работает. Я вижу, что растровые изображения (байты в массиве байтов) декодируются должным образом. Как только я «забегаю», он терпит неудачу (байты не могут быть декодированы). Если я попытаюсь отладить значение опроса, то наблюдатель выражений будет терпеть неудачу с некоторой ошибкой «дочернего обновления», поэтому я предполагаю, что это проблема блокировки. – hpet

ответ

0

Если очередь оборачивается вокруг, то возможно, что массивы байтов получить повторно и перезаписаны перед тем, считанных потребителями. Короче говоря, вам нужно будет иметь метод commitGet, чтобы убедиться, что производители ждут потребителей, прежде чем перезаписывать массив новыми данными.

Тем не менее, мое предложение состоит в том, что вы полагаетесь на java.util.concurrent.BlockingQueue, имея вторую очередь, чтобы вернуть их от потребителей к производителю, а также по адресу java.nio.ByteByffer, чтобы отслеживать длины. Производитель должен был бы сделать следующим образом:

ByteBuffer buffer = bufferQueue.poll(); // instead of your put() 
buffer.put(source);      // fill buffer from source MappedByteBuffer 
buffer.flip();       // set length to the amount written 
dataQueue.offer(buffer);    // instead of commitPut() 

потребитель:

ByteBuffer buffer = dataQueue.poll(); // instead of your get() 
buffer.get(...);      // use data 
buffer.clear();       // reset length    
bufferQueue.offer(buffer);    // this is the missing commitGet() 

Вы должны сначала вставить capacity элементы в freeQueue. Обратите внимание, однако, что это все равно будет копировать данные один раз из буфера source во временные буферы в очередях, как уже делал ваш оригинальный код.

Если вы действительно не хотите копировать данные (и убедитесь, что источник не изменился, пока все потребители не прочитали его!), Ваш лучший вариант - использовать одну блокирующую очередь и вставить буферы, полученные с помощью ByteBuffer.slice(). ваш исходный буфер для каждого фрагмента данных, который будет передан потребителям. Затем они будут собраны в мусор, но должны иметь гораздо меньше памяти, чем сами байтовые массивы.

+0

спасибо за помощь. Почему я пытаюсь повторно использовать байтовые массивы - до того, как я использовал простой массив ArrayBlockingQeueu с растровыми изображениями. У меня есть продюсер, который заполняет растровые изображения qeueu и consumer (drawer). Это работало, по крайней мере, я думал, пока я не попробовал это на каком-нибудь «забавном» низкобюджетном Android-планшете, где GC все время пинает за самые маленькие ассигнования (всего я использую 10 МБ из 48 разрешенных, поэтому даже не закрываю чтобы понять, почему GC настолько агрессивен на этом планшете). По этой причине я решил попробовать с фиксированным выделенным байтовым массивом, повторно использовать его и не давать запуск GC. – hpet

+0

Я вижу. Использование второй очереди для возврата пустых массивов должно сделать это безопасно. – jop

+0

Я все еще пытаюсь обернуть голову вокруг вашей концепции двух очередей ... Не могли бы вы объяснить это еще раз?Если у меня есть «читатель» производителя, который хочет читать байты из MappedByteBuffer непосредственно в очередь «читателей» (чтобы избежать копирования байтов) и ждать, когда очередь заполнена, и потребительский «ящик», который потребляет байты из очереди читателей. Поскольку байтовые массивы фиксированной длины (для размещения самого большого растрового изображения), мне все равно нужно отслеживать «длины байтов» для каждого слота очереди. – hpet

Смежные вопросы