2017-02-21 1 views
3

Прежде всего, чтобы избежать вопроса о вопросе как дубликата ребятами, которые не любят читать до конца, я прочитал вопрос Producer-Consumer Logging service with Unreliable way to shutdown . Но он не полностью отвечает на вопрос, и ответ противоречит учебному тексту.Почему состояние гонки в LogWriter приводит к блокировке производителя? [Параллелизм на практике]

В книге предоставляется следующий код:

public class LogWriter { 
    private final BlockingQueue<String> queue; 
    private final LoggerThread logger; 
    private static final int CAPACITY = 1000; 

    public LogWriter(Writer writer) { 
     this.queue = new LinkedBlockingQueue<String>(CAPACITY); 
     this.logger = new LoggerThread(writer); 
    } 

    public void start() { 
     logger.start(); 
    } 

    public void log(String msg) throws InterruptedException { 
     queue.put(msg); 
    } 

    private class LoggerThread extends Thread { 
     private final PrintWriter writer; 

     public LoggerThread(Writer writer) { 
      this.writer = new PrintWriter(writer, true); // autoflush 
     } 

     public void run() { 
      try { 
       while (true) 
        writer.println(queue.take()); 
      } catch (InterruptedException ignored) { 
      } finally { 
       writer.close(); 
      } 
     } 
    } 
} 

Теперь мы должны понять, как остановить этот процесс. Мы должны остановить регистрацию, но не должны пропускать уже отправленные сообщения.

Автор исследования подход:

public void log(String msg) throws InterruptedException { 
    if(!shutdownRequested) 
      queue.put(msg); 
    else 
      throw new IllegalArgumentException("logger is shut down"); 
} 

и прокомментировать это так:

Другой подход к закрытию LogWriter будет установить «завершение запрошенной» флаг для предотвращения дальнейших сообщений от того, , как показано в листинге 7.14. Затем потребитель мог слить в очередь при уведомлении о том, что было запрошено выключение, , выписывая любые ожидающие сообщения и разблокируя любых заблокированных производителей в журнале. Однако этот подход имеет условия гонки, которые делают его ненадежным. Реализация журнала - это последовательность контрольных действий: производители могут заметить, что служба еще не была отключена , но все еще очереди сообщений после выключения, опять же с риском, что производитель может быть заблокирован в журнале и никогда разблокировать. Есть трюки, которые уменьшают вероятность этого (например, если покупатель ждет несколько секунд, прежде чем объявить остывание очереди), но не меняют фундаментальную проблему, а просто вероятность , что это приведет к сбою.

Эта фраза достаточно сложна для меня.

Я понимаю, что

if(!shutdownRequested) 
      queue.put(msg); 

не является атомарным и сообщение может быть добавлено в очередь после shutdowning. Да, это не очень точно, но я не вижу проблемы. очередь просто будет истощаться, и когда очередь будет пустой, мы можем остановить LoggerThread. Особенно Я не понимаю, почему производители могут быть заблокированы.

Автор не предоставил полный код, поэтому я не могу понять все детали. Я считаю, что эта книга была прочитана большинством сообщества, и этот пример содержит подробное объяснение.

Пожалуйста, объясните с полным примером кода.

ответ

3

Первое, что нужно понять, заключается в том, что при запросе на остановку продюсеру необходимо прекратить принимать любые запросы, а потребитель (LoggerThread в этом случае) должен разрядить очередь. Код, который вы представляете в вопросе, демонстрирует только одну сторону истории; производитель отклоняет любые дальнейшие запросы, когда shutdownRequested является true.После этого примера, автор переходит сказать:

Потребитель может затем слейте очереди на уведомления, что отключения было предложено, выписывая все ожидающие сообщения и разблокирования любых производителей заблокированных в журнале

Прежде всего, queue.take в LoggerThread, как показано в вашем вопросе, будет бесконечно блокировать новые сообщения, которые будут доступны в очереди; однако, если мы хотим выключить LoggerThread (изящно), нам нужно убедиться, что код завершения в LoggerThread получает шанс выполнить, когда shutdownRequested является истинным, а не бесконечно блокируется queue.take.

Когда автор говорит о том, что потребитель может стечь очередь, то, что он означает, что LogWritter может проверить shutdownRequested и если это правда, это может вызвать не блокирующий drainTo метод стечь текущее содержимое очереди в отдельной коллекции вместо вызова queue.take (или вместо этого вызывается аналогичный неблокирующий метод). Альтернативный вариант, если shutdownRequested является ложным, LogWriter может позвонить по телефону queue.take, как обычно.

Реальная проблема с этим подходом заключается в том, как реализуется метод log (называемый производителями). Поскольку он не является атомарным, возможно, что несколько потоков могут пропустить установку shutdownRequested в true. Что произойдет, если количество потоков, которые пропустят это обновление, больше, чем CAPACITY из queue. Давайте снова рассмотрим метод log. (Добавлено фигурные скобки для удобства объяснения):

public void log(String msg) throws InterruptedException { 
    if(!shutdownRequested) {//A. 1001 threads see shutdownRequested as false and pass the if condition. 

      //B. At this point, shutdownRequested is set to true by client code 
      //C. Meanwhile, the LoggerThread which is the consumer sees that shutdownRequested is true and calls 
      //queue.drainTo to drain all existing messages in the queue instead of `queue.take`. 
      //D. Producers insert the new message into the queue.  
      queue.put(msg);//Step E 
    } else 
      throw new IllegalArgumentException("logger is shut down"); 
    } 
} 

Как показано на стадии Х, это возможно для нескольких потоков производителей для вызова put в то время как LoggerThread закончил сливы очереди и вышел из W. Не должно быть никаких проблем до 1000th нить вызывает put. Реальная проблема заключается в том, что поток 1001th вызывает put. Он будет блокироваться, так как емкость очереди составляет только 1000, а LoggerThread может больше не оставаться в живых или не подписаться на метод queue.take.

+0

Кажется довольно надуманным ожидать 1000 потоков, ожидающих между двумя одинаковыми строками кода. Хотя я понимаю, что это по-прежнему технически неверно, если это теоретически возможно. – shmosel

+0

Мне интересно узнать, можете ли вы продемонстрировать, как процесс останова фактически истощает очередь * и всех заблокированных производителей *. Из моего тестирования 'drainTo()' только сливает уже поставленные в очередь элементы. – shmosel

+0

@shmosel Согласен. Это немного странно, но я думаю, вы неправильно поняли мой ответ. 1000 потоков не ждут 'put'. Это 1001-й поток, который ждет. Для 'put' для блокирования вам нужно 1000 элементов в очереди и блок на 1001-й попытке« поместить »новый элемент. Все это при обеспечении того, чтобы потребитель не занимал места для 1001-го элемента, который может быть возможен только в том случае, если потоки 1001 попытаются поместить новый элемент в очередь ** после того, как ** потребитель закончил слить очередь и теперь выходит из цикл while. Я не вижу другого способа объяснить этот параграф! Вы? – CKing

Смежные вопросы