2013-08-30 3 views
3

У меня есть несколько потоков, которые нужно добавить в одну очередь. Эта очередь разделяется на несколько переменных, так эффективно, я вызываю функцию, которая добавляется к каждой переменной в некоторой позиции i. От http://www.youtube.com/watch?v=8sgDgXUUJ68&feature=c4-overview-vl&list=PLBB24CFB073F1048E Мне показали, что для каждого метода добавляются блокировки, как показано здесь. http://www.caveofprogramming.com/java/java-multiple-locks/. Не представляется эффективным создание блокировки для 100 000 объектов в массиве. В java, допустим, у меня есть объект, который управляет большой очередью объектов. Как правильно синхронизировать addToQueue, не жертвуя производительностью, синхронизируя метод, просто позицию в floatQueue и intQueue, к которой я добавляю?Несколько потоков в java, добавляющих в очередь

class QueueManager { 
    private float[] floatQueue; 
    private int[] intQueue; 

    private int currentQueueSize; 
    private int maxQueueSize; 

    QueueManager(int sizeOfQueue) { 
     intQueue = new int[sizeOfQueue]; 
     floatQueue = new float[sizeOfQueue]; 

     currentQueueSize = 0; 
     maxQueueSize = sizeOfQueue; 
    } 

    public boolean addToQueue(int a, float b) { 
     if (currentQueueSize == maxQueueSize) { 
      return false; 
     } 
     else{ 
      intQueue[currentQueueSize] = a; 
      floatQueue[currentQueueSize] = b; 

      currentQueueSize++; 

      return true; 
     } 
    } 
} 
+0

Один из способов, по которым я хотел бы добавить экземпляр менеджера очередей в каждый поток и изменить переменные на статические изменчивые массивы. Есть лучший способ сделать это? – user1876508

+0

Похоже, что вы хотите использовать метод BlockingQueue с использованием метода предложения? –

+0

В качестве побочного примечания, к сожалению, нет реализации блокировки свободной очереди в java –

ответ

2

java.util.concurrent.ConcurrentLinkedQueue неблокирующая, блокировка свободной реализации очереди. Вместо блокировки используются команды сравнения и замены.

+0

Какова скорость? – user1876508

+1

В моей системе, поддерживающей до четырех параллельных потоков, требуется немного меньше 2,5 секунд, чтобы добавить миллион элементов за одну очередь в одну очередь. У меня есть эталонная программа, доступная здесь, если вы хотите проверить ее сами: https://gist.github.com/kyledewey/6399477 –

0

Вы лучше использовать Blocking Queue, чем изобретать колесо. В случае, если вы делаете это для практики, вы можете использовать два замка один для , поставив и один из , получая. Для этого обратитесь к источнику очереди блокировки.

Просто побочное примечание о том, как получить параллельное право, сложно, поэтому не используйте свою очередь в приложении развертывания и скорее полагайтесь на реализацию Java.

0

Чтобы сохранить параллельные массивы «в синхронизации», вам нужна синхронизация. Это единственный способ сохранить атомарность модификаций обоих массивов и связанного с ними индекса.

Вместо параллельных массивов, вы можете рассмотреть неизменный объект кортежа, с final полями для вашего int и float, а затем добавить те к BlockingQueue реализации, как ArrayBlockingQueue.

Если есть много оттока (частое создание и отбрасывание объектов), synchronized, вероятно, будет работать лучше, чем кортежи. Вам нужно проконсультировать их, чтобы быть уверенными.

0

Одно из возможных решений, которое я нашел, - использовать AtomicIntegers, который похож на блокировку, но работает на гораздо более низком уровне, чем синхронизированный. Вот обновленная копия моего кода с использованием AtomicInteger. Обратите внимание, что его все еще нужно тестировать, но теоретически это должно быть достаточно.

import java.util.concurrent.atomic.AtomicInteger; 

class ThreadSafeQueueManager { 
    private float[] floatQueue; 
    private int[] intQueue; 

    private AtomicInteger currentQueueSize; 
    private int maxQueueSize; 

    QueueManager(int sizeOfQueue) { 
     intQueue = new int[sizeOfQueue]; 
     floatQueue = new float[sizeOfQueue]; 

     currentQueueSize = new AtomicInteger(0); 
     maxQueueSize = sizeOfQueue; 
    } 

    public boolean addToQueue(int a, float b) { 
     if (currentQueueSize.get() == maxQueueSize) { 
      return false; 
     } 
     else{ 
      try { 
       // Subtract one so that the 0th position can be used 
       int position = currentQueueSize.incrementAndGet() - 1; 
       intQueue[position] = a; 
       floatQueue[positions] = b; 

       return true; 
      } 
      catch (ArrayIndexOutOfBoundsException e) { return false;} 
     } 
    } 
} 

Кроме того, для справки, это стоит читать

https://www.ibm.com/developerworks/java/library/j-jtp11234/
http://www.ibm.com/developerworks/library/j-jtp04186/

+1

Прошу прощения, но это хороший пример того, почему это трудно. Предполагая, что код изменяется настолько, что он компилируется, у вас все еще будет гонка данных в 'addToQueue (int, float)' между проверкой текущего размера очереди и добавлением данных в очередь. – msandiford

+0

Я не уверен, верно ли это. Аналогичную проблему можно найти здесь http://stackoverflow.com/questions/10077937/using-atomicinteger-as-a-static-shared-counter, но я просто хочу, чтобы каждый вызов addToQueue занимал только одну позицию, поскольку эта позиция в floatQueue и intQueue зависят от incrementAndGet(), почему возникает проблема? Эти записи не обязательно должны быть последовательными, единственным требованием является то, что одна и та же позиция в этих массивах записывается только один раз на рабочую нагрузку. – user1876508

+0

Итак, если я закрою блок else с помощью try catch, тогда я в безопасности? – user1876508

Смежные вопросы