2015-12-21 4 views
1

Я в затруднении. У меня есть BlockingDequeBlockingDeque не разблокируется после того, как элемент вставлен в очередь

private class Consumer extends Thread { 

    @Override 
    public void run() { 
     try { 
      while (!Thread.currentThread().isInterrupted()) { 
       if (connection.isReady()) { 
        final Item item = queue.takeFirst(); 
        try { 
         ListenableFuture<Result> listenableFuture = connection.submitItem(item); 
         Futures.addCallback(listenableFuture, new FutureCallBackImpl<Result>(item)); 
        } catch (RejectedExecutionException e) { 
         LOGGER.debug("Slow down submission of tasks we have a queue full in connection"); 
         queue.addFirst(item); 

        } 
       } 
      } 
     } catch (InterruptedException e) { 
      LOGGER.debug("Interrupted. I will not propagate up because I own this thread"); 

     } 
    } 
} 

Этот код обычно блокируется на queue.takeFirst() когда никакие пункты не находятся в очереди. Тем не менее, он не разблокируется, как только я добавляю элемент, как ожидалось. Во время отладки я вижу элементы, находящиеся в queue, а также когда я останавливаю Tomcat I, сериализую queue. После запуска я де-сериализую очередь, и в этот момент queue.takeFirst() извлекает элемент (тот же, который ранее не извлекал), и отправляет его.

У кого-нибудь есть идеи?

EDIT

Чтобы подчеркнуть свою точку немного больше. Если я изменил queue.takeFirst() на queue.pollFirst() и немного изменил код, чтобы игнорировать проходы, которые дают nullitems, тогда код работает так, как ожидалось.

+1

Я могу себе представить, что (I), вы не добавляете в очереди или (б) вы не взяв из одной очереди или (iii) вы фактически забираете из очереди, но считаете, что это не так, или (iv) код, который принимает, никогда не выполняется ... – assylias

+0

Как я уже сказал в своем редактировании, если я изменил код на pollFirst, тогда оно работает. Я уверен, что (i), (ii) не применяются. Для (iii) отладчик должен остановиться в точке останова, которую я поставил внутри try. Снова для (iv) код выполняется, если я останавливаю и запускаю код с сериализацией/десериализацией, которую я объяснил. – idipous

+1

Без фрагмента кода, который воспроизводит вашу проблему, мы можем только догадываться ... Вы должны попытаться создать [mcve] (http://stackoverflow.com/help/mcve) – assylias

ответ

0

Возможно, ваш код не вводит if, потому что connection.isReady() возвращает false.

Проверьте его, чтобы он действительно прекратил ждать первого элемента в очереди.

+0

Во время отладки я вижу, что потоки блокируются в queue.takeFirst(). Также почему pollFirst() работает правильно в этом случае? – idipous

+0

Пока вы заблокированы на takeFirst в отладочной проверке размера очереди –

+0

У меня есть и размер увеличивается по мере того, как я отправляю элементы. Я даже вижу элементы в очереди, и я вижу их сериализованными, когда останавливаю tomcat.Также, как только я начинаю снова tomcat и десериализую очередь, они фактически отправляются через один и тот же код. – idipous

0

Итак, позвольте мне объяснить, почему этот кусок кода не будет работать, когда элементы добавляются в очередь в следующий раз (то есть после того, как из очереди все элементы первого)

Когда очередь пустеет в первый раз, вызов этого кода final Item item = queue.takeFirst(); будет вызывать InterruptedException, который попадает в код за пределами цикла while, поэтому код больше никогда не вернется к циклу while. Вставка первой попытки блокировки внутри цикла будет решить первую проблему.

Во-вторых, ему нужно было позвонить Thread.currentThread().interrupted() внутри блока catch, чтобы в следующий раз передать условие while, чтобы он был готов читать для добавления будущих элементов в очередь. Infact Я не понял причины для вызова этой строки кода while (!Thread.currentThread().isInterrupted()).

В настоящее время я пишу интересный блог для BlockingDeque (WIP), в скором времени вы найдете более подробную информацию в своем блоге http://singletonjava.blogspot.com

+0

Причина, по которой pollFirst() будет работать, потому что она никогда не выбрасывает InterruptedException, и код никогда не выходит из цикла while. –

+0

Спасибо за ваш вклад. К тому времени, когда прерванное исключение выбрано, я действительно намерен оставить цикл while. Кроме того, я не устанавливал 'Thread.currentThread(). Interrupt();' специально внутри catch. Я нашел ошибку, и я обновлю ответ, чтобы объяснить это, но я собираюсь сделать это немного, так как мне нужно также найти решение. – idipous

+0

Возможно, вам стоит попытаться объяснить более подробно то, что вы предназначенный для достижения этой части кода, что поможет другим быть более ориентированным на ваше мышление, а не на предположение. –

Смежные вопросы