Прежде всего, чтобы избежать вопроса о вопросе как дубликата ребятами, которые не любят читать до конца, я прочитал вопрос Producer-Consumer Logging service with Unreliable way to shutdown . Но он не полностью отвечает на вопрос, и ответ противоречит учебному тексту.Почему состояние гонки в LogWriter приводит к блокировке производителя? [Параллелизм на практике]
В книге предоставляется следующий код:
public class LogWriter {
private final BlockingQueue<String> queue;
private final LoggerThread logger;
private static final int CAPACITY = 1000;
public LogWriter(Writer writer) {
this.queue = new LinkedBlockingQueue<String>(CAPACITY);
this.logger = new LoggerThread(writer);
}
public void start() {
logger.start();
}
public void log(String msg) throws InterruptedException {
queue.put(msg);
}
private class LoggerThread extends Thread {
private final PrintWriter writer;
public LoggerThread(Writer writer) {
this.writer = new PrintWriter(writer, true); // autoflush
}
public void run() {
try {
while (true)
writer.println(queue.take());
} catch (InterruptedException ignored) {
} finally {
writer.close();
}
}
}
}
Теперь мы должны понять, как остановить этот процесс. Мы должны остановить регистрацию, но не должны пропускать уже отправленные сообщения.
Автор исследования подход:
public void log(String msg) throws InterruptedException {
if(!shutdownRequested)
queue.put(msg);
else
throw new IllegalArgumentException("logger is shut down");
}
и прокомментировать это так:
Другой подход к закрытию LogWriter будет установить «завершение запрошенной» флаг для предотвращения дальнейших сообщений от того, , как показано в листинге 7.14. Затем потребитель мог слить в очередь при уведомлении о том, что было запрошено выключение, , выписывая любые ожидающие сообщения и разблокируя любых заблокированных производителей в журнале. Однако этот подход имеет условия гонки, которые делают его ненадежным. Реализация журнала - это последовательность контрольных действий: производители могут заметить, что служба еще не была отключена , но все еще очереди сообщений после выключения, опять же с риском, что производитель может быть заблокирован в журнале и никогда разблокировать. Есть трюки, которые уменьшают вероятность этого (например, если покупатель ждет несколько секунд, прежде чем объявить остывание очереди), но не меняют фундаментальную проблему, а просто вероятность , что это приведет к сбою.
Эта фраза достаточно сложна для меня.
Я понимаю, что
if(!shutdownRequested)
queue.put(msg);
не является атомарным и сообщение может быть добавлено в очередь после shutdowning. Да, это не очень точно, но я не вижу проблемы. очередь просто будет истощаться, и когда очередь будет пустой, мы можем остановить LoggerThread. Особенно Я не понимаю, почему производители могут быть заблокированы.
Автор не предоставил полный код, поэтому я не могу понять все детали. Я считаю, что эта книга была прочитана большинством сообщества, и этот пример содержит подробное объяснение.
Пожалуйста, объясните с полным примером кода.
Кажется довольно надуманным ожидать 1000 потоков, ожидающих между двумя одинаковыми строками кода. Хотя я понимаю, что это по-прежнему технически неверно, если это теоретически возможно. – shmosel
Мне интересно узнать, можете ли вы продемонстрировать, как процесс останова фактически истощает очередь * и всех заблокированных производителей *. Из моего тестирования 'drainTo()' только сливает уже поставленные в очередь элементы. – shmosel
@shmosel Согласен. Это немного странно, но я думаю, вы неправильно поняли мой ответ. 1000 потоков не ждут 'put'. Это 1001-й поток, который ждет. Для 'put' для блокирования вам нужно 1000 элементов в очереди и блок на 1001-й попытке« поместить »новый элемент. Все это при обеспечении того, чтобы потребитель не занимал места для 1001-го элемента, который может быть возможен только в том случае, если потоки 1001 попытаются поместить новый элемент в очередь ** после того, как ** потребитель закончил слить очередь и теперь выходит из цикл while. Я не вижу другого способа объяснить этот параграф! Вы? – CKing