2011-12-21 4 views
6

имеет проблемы с межпоточной связью и «решает» ее, используя «фиктивные сообщения» повсюду. Это плохая идея? Каковы возможные решения?Связь между потоками между производителями и потребителями

Пример Проблема у меня есть.

Основная тема начинает поток для обработки и вставки записей в базу данных. Основной поток читает возможно огромный файл и помещает одну запись (объект) за другой в блокирующий разряд. обрабатывает поток, читает из очереди и работает.

Как сказать «обрабатывая поток» для остановки? Очередь может быть пуста, но работа не выполняется, и основной поток не работает ни при завершении обработки потока, ни при его прерывании.

Так обработка резьбы делает

while (queue.size() > 0 || !Thread.currentThread().isInterrupted()) { 
    MyObject object= queue.poll(100, TimeUnit.MILLISECONDS); 
    if (object != null) { 
     String data = object.getData(); 
     if (data.equals("END")) { 
      break; 
     } 
    // do work 
    } 
} 
// clean-up 
synchronized queue) { 
    queue.notifyAll(); 
} 
return; 

и основной поток

// ...start processing thread... 
while(reader.hasNext(){ 
    // ...read whole file and put data in queue... 
} 
MyObject dummy = new MyObject(); 
dummy.setData("END"); 
queue.put(dummy); 
//Note: empty queue here means work is done 
while (queue.size() > 0) { 
    synchronized (queue) { 
     queue.wait(500); // over-cautios locking prevention i guess 
    } 
} 

Обратите внимание, что вставка должна быть в той же транзакции, и транзакция не может быть обработан по основной нити.

Что было бы лучшим способом сделать это? (Я учусь и не хочу начинать «делать это неправильно»)

ответ

4

Эти фиктивные сообщения действительны. Это называется «яд». Что-то, что производитель посылает потребителю, чтобы остановить его.

Другая возможность - вызвать Thread.interrupt() где-нибудь в основном потоке и поймать и обработать InterruptedException соответственно в рабочем потоке.

+0

Ok. Как я могу использовать это, если тип переданных объектов не имеет поля String, которое можно легко использовать для этого? Что, если какой-либо текст может быть достоверным и, следовательно, может равняться яду? Использовать GUID? –

+1

В идеале вы должны изменить класс MyObject и добавить туда метод isPoison(). Как этот метод определяет, является ли объект ядом или нет, зависит от вас, может быть простой вещи, такой как сравнение, если сообщение пустое, пустое или если сообщение было явно создано как яд. Идея должна была бы реорганизовать MyObject как интерфейс с двумя реализациями, MessageObject, который всегда возвращает false в методе isPoison() и PoisonMessage, которые всегда возвращают true. Но это глубоко зависит от того, что именно вы делаете, и лучшие решения могут существовать с учетом вашего реального контекста. –

2

«разрешил» его, используя «фиктивные сообщения» повсюду. Это плохая идея ? Каковы возможные решения?

Неплохая идея, это называется «Ядовитые таблетки» и является разумным способом остановить службу на основе потоков.

Но это работает только тогда, когда известно количество производителей и потребителей.

В коде, который вы отправили, есть два потока, один из них - «основной поток», который производит данные, другой - «обрабатывающий поток», который потребляет данные, «ядовитые таблетки» хорошо работают для этого обстоятельства.

Но если у вас есть другие производители, как потребитель узнает, когда остановиться (только когда все производители отправляют «Ядовитые таблетки»), вам нужно точно знать количество всех производителей и проверить количество «ядовитых таблеток» у потребителя, если оно равно числу производителей, что означает, что все производители перестали работать, а затем потребитель остановился.

В «основной теме» вам нужно поймать InterruptedException, так как в противном случае «основная нить» может не установить «Ядовитую таблетку». Вы можете сделать это, как показано ниже,

... 
try { 
    // do normal processing 
} catch (InterruptedException e) { /* fall through */ } 
finally { 
    MyObject dummy = new MyObject(); 
    dummy.setData("END"); 
    ... 
} 
... 

Кроме того, вы можете попробовать использовать ExecutorService, чтобы решить все ваши проблемы.

(Это работает, когда вам просто нужно сделать некоторые работы, а затем останавливается, когда все закончены)

void doWorks(Set<String> works, long timeout, TimeUnit unit) 
    throws InterruptedException { 
    ExecutorService exec = Executors.newCachedThreadPool(); 
    try { 
     for (final String work : works) 
      exec.execute(new Runnable() { 
        public void run() { 
         ... 
        } 
       }); 
    } finally { 
     exec.shutdown(); 
     exec.awaitTermination(timeout, unit); 
    } 
} 

Я учусь и не хотят, чтобы начать «делать это неправильный путь»

Возможно, вам потребуется прочитать книгу: Java Concurrency in Practice. Поверьте мне, это самое лучшее.

+0

Я знаю об услуге исполнителя, но чтение данных и обработка (проверка) и вставка не могут быть в одной и той же задаче или методе, потому что все они должны выполняться в одной транзакции базы данных. Если вы поймете, что я имею в виду. –

+0

ОК, я понимаю, что «ExecutorService» - это только моя общая рекомендация, используйте его или нет в соответствии с вашими требованиями. Но будьте осторожны, чтобы поймать все исключения, если некоторые исключения не пойманы, вы можете потерять возможность установить «Ядовитые таблетки». –

0

Что вы могли бы сделать (что я сделал в недавнем проекте) - это обернуть очередь, а затем добавить метод 'isOpen()'.

class ClosableQ<T> { 

    boolean isOpen = true; 

    private LinkedBlockingQueue<T> lbq = new LinkedBlockingQueue<T>(); 

    public void put(T someObject) { 
     if (isOpen) { 
     lbq.put(someObject); 
     } 
    } 

    public T get() { 
     if (isOpen) { 
     return lbq.get(0); 
     } 
    } 

    public boolean isOpen() { 
     return isOpen; 
    } 

    public void open() { 
     isOpen = true; 
    } 

    public void close() { 
     isOpen = false; 
    } 
} 

Так ваш автор нить становится чем-то вроде:

while (reader.hasNext()) { 
    // read the file and put it into the queue 
    dataQ.put(someObject); 
} 
// now we're done 
dataQ.close(); 

и читателя резьбы:

while (dataQ.isOpen) { 
    someObject = dataQ.get(); 
} 

Конечно, вы можете расширить список, но вместо этого, что дает пользователю уровень доступ вам может не понравиться. И вам нужно добавить некоторые параллельные вещи в этот код, например AtomicBoolean.

+0

Я передаю очередь методу. Чтение вашей идеи заставило меня сделать следующее. Метод также может принимать AtomicBoolean keepProcessing. Затем он может работать до тех пор, пока queue.size> 0 || keepProcessing.get() –

Смежные вопросы