2015-11-24 2 views
0

У меня возникло странное поведение с использованием ArrayBlockingQueue, который я использую для связи между определенными шагами в приложении Java.Странное поведение arrayBlockingQueue с элементами массива

Я использую 1 статический ArrayBlockingQueue, как инициализируется так:

protected static BlockingQueue<long[]> commandQueue; 

Вслед за конструктором, который имеет это как один из его линий:

commandQueue = new ArrayBlockingQueue<long[]>(amountOfThreads*4); 

Где amountOfThreads дается в качестве аргумента конструктора ,

Я тогда производитель, который создает массив long[2] дает ему некоторые значения, а затем предлагает его в очередь, то я изменить одно из значений массива непосредственно после него и предложить его еще раз в очереди:

long[] temp = new long[2]; 
temp[0] = currentThread().getId(); 
temp[1] = gyrAddress;//Address of an i2c sensor 
CommunicationThread.commandQueue.offer(temp);//CommunicationThread is where the commandqueue is located 
temp[1] = axlAddress;//Change the address to a different sensor 
CommunicationThread.commandQueue.offer(temp); 

Потребитель затем берет эти данные и открывает подключение i2c к определенному датчику, получает некоторые данные от упомянутого датчика и передает данные обратно с использованием другой очереди. На данный момент я установил, что потребитель просто потребляет голову и печатает данные.

long[] command = commandQueue.take();//This will hold the program until there is at least 1 command in the queue 
if (command.length!=2){ 
    throw new ArrayIndexOutOfBoundsException("The command given is of incorrect format"); 
}else{ 
    System.out.println("The thread with thread id " + command[0] + " has given the command to get data from address " +Long.toHexString(command[1])); 
} 

Теперь для тестирования у меня есть продюсер нить с этими адресами (byte) 0x34, (byte)0x44 Если все идет правильно, мой вывод должен быть:

The thread with thread id 14 has given the command to get data from address 44 
The thread with thread id 14 has given the command to get data from address 34 

Однако я получаю:

The thread with thread id 14 has given the command to get data from address 34 
The thread with thread id 14 has given the command to get data from address 34 

Какой бы означает, что он отправляет массив temp после его изменения.

Вещи, которые я сделал, чтобы исправить: Я пробовал спать, если бы я добавил 150 мс сна, тогда ответ правильный. Однако этот метод совершенно очевидно влияет на производительность ... Поскольку метод возвращает предложение истинный Я попробовал следующий фрагмент кода

boolean tempBool = false; 
while(!tempBool){ 
    tempBool = CommunicationThread.commandQueue.offer(temp); 
    System.out.println(tempBool); 
} 

, которая печатает правду. Это не повлияло.

Я пытался печати temp[1] после этого в то время как петли и в этот момент он является правильное значение. (Он печатает 44 однако потребитель получает 34)

Что, скорее всего, это дело является проблемой syncronisation, однако я подумал, что целью объекта BlockingQueue было бы решение этого.

Любая помощь или предложение относительно работы этого BlockingQueue были бы весьма полезны.Позвольте мне напомнить о том, что это мой первый опыт работы с очередями между потоками в java и окончательная программа будет работать на малине pi, используя библиотеку pi4j для связи с датчиками.

+0

Я с подозрением, что у вас есть статические переменные, которая инициализируется в конструктор. Статические переменные не принадлежат экземпляру, поэтому они не должны инициализироваться при построении экземпляра; значение будет перезаписано при следующем создании экземпляра. –

+0

Вы изменяете массивы, которые совместно используются потоками без какой-либо синхронизации. Не. И что еще хуже, вы, кажется, всегда сохраняете один и тот же массив в очереди. Это не может работать. Вместо использования массивов из 2 длин, создайте ** неизменный ** класс, содержащий два длинных поля, и сохраните их в очереди. Или, по крайней мере, не изменяйте массив, как только он находится в очереди. –

+0

@AndyTurner Я делаю это, так как мне нужно установить размер в зависимости от количества потоков, я создаю только один экземпляр из этого класса, поэтому это не имеет большого значения. – Ylva

ответ

0

Поскольку вы спрашивали о том, как BlockingQueue работает точно, начнем с того, что:

Очередь блокировки - это очередь, которая блокируется, когда вы пытаетесь удалить ее из очереди, когда очередь пуста, или когда вы пытаетесь вставить в нее объекты, пока очередь уже заполнена , Поток, пытающийся удалить из пустой очереди, блокируется до тех пор, пока какой-либо другой поток не вставит элемент в очередь.

Эти блокирующие очереди предотвращают разные потоки от чтения/записи в очередь, пока это еще не возможно, потому что оно либо пустое, либо полное.

Как Энди Тернер и JB Nizet уже пояснялось, переменные статически разделяют в памяти. Это означает, что когда ваш поток, который читает очередь, находит ссылку (A.K.A. указатель) на эту переменную (в памяти) и использует этот указатель в следующем коде. Однако, прежде чем ему удастся прочитать эти данные, вы уже изменили эту переменную, как правило, в приложениях, не связанных с потоком, это не будет проблемой, поскольку только один поток попытается прочитать из памяти, и он всегда будет выполняться в хронологическом порядке. Способ обойти это состоит в том, чтобы создать новую переменную/массив (который присваивает себя новой памяти) с переменными данными каждый раз, когда вы добавляете запись в очередь, таким образом вы убедитесь, что вы не перезаписываете переменную в памяти до он обрабатывается другим потоком. Простой способ сделать это:

long[] tempGyr = new long[2]; 
tempGyr[0] = currentThread().getId(); 
tempGyr[1] = gyrAddress; 
CommunicationThread.commandQueue.offer(tempGyr);//CommunicationThread is where the commandqueue is located 

long[] tempAxl = new long[2]; 
tempAxl[0] = currentThread().getId(); 
tempAxl[1] = axlAddress; 
CommunicationThread.commandQueue.offer(tempAxl); 

Надеется, что это объясняет тему, если нет: не стесняйтесь задавать дополнительные вопросы :)

Смежные вопросы