2013-08-14 2 views
1

Большинство опубликованных реализаций подписки использует некоторую форму итерации для уведомления подписчиков либо с помощью цикла for/while, чтобы вызвать их методы прослушивания, либо путем размещения новой темы, представляющей интерес, в каждой из их блокирующих очередей ,Java-реализация Публикация подписки без использования итерации

Я хочу реализовать подписку на публикацию, которая скорее «смотрит» что-то, то есть местоположение, переменную или файл или что-то другое, а затем вытаскивает новую тему из ее сохраненного местоположения. Я хочу, чтобы все они были уведомлены «приблизительно» одновременно.

Это связано с тем, что предыдущие решения являются медленными, потому что у меня есть большое количество подписчиков и итерация через них требует времени.

Любые идеи?

+2

Если проблема блокирует изменяющийся поток, возможно, вы запустите цикл уведомлений в фоновом потоке. – kiheru

+0

Взгляните на Акку (или рамки других актеров). – kaos

+0

Проблема - это итерация, а не блокировка. Спасибо, что предложили Акку. Это учебное упражнение, поэтому я хотел бы реализовать его без библиотек. –

ответ

1

Вы можете сделать что-то вроде этого:

public class Producer { 
    public static Object publishedObject; 
    public static ConcurrentLinkedDeque<Object> previousObjects; 
    private int size; 

    public publishObject(Object object) { 
     previousObjects.offerLast(publishedObject); 
     publishedObject = object; 
     size++; 
     if(size > 50) { 
      previousObjects.removeFirst(); 
     } 
    } 
} 

public class Consumer implements Runnable { 
    private Object lastConsumedObject; 
    public void run() { 
     while(true) { 
      if(Producer.publishedObject == lastConsumedObject) { 
       sleep(500); 
      } else { 
       Iterator<Object> iterator = Producer.descendingIterator(); 
       Object next = iterator.next(); 
       Object newLastConsumedObject = next; 
       do { 
        this.process(next); 
       } while(iterator.hasNext() && 
        (next = iterator.next()) != lastConsumedObject); 
       lastConsumedObject = newLastConsumedObject; 
      } 
     } 
    } 
} 

Идея заключается в том, что Producer имеет «опубликовать местоположение», что Consumers может опрашивать, и он также хранит ранее опубликованные объекты в LinkedList в случае, если Consumer не замечает этого. Потребитель проверяет местоположение публикации и спит, если изменений не было, иначе он обрабатывает опубликованный объект и итерации через ранее опубликованные объекты, если он пропустил событие публикации.

Ожидание в полузабытии может привести к гибели вашего выступления здесь. Альтернативой является наличие двух классов потребителей: SinkConsumer и RelayConsumer. У вас будет один RelayConsumer за аппаратную нить (например, 8 RelayConsumers, если у вас четыре ядра и гиперпоточность). Publisher публикует сообщение RelayConsumers, который затем публикует сообщение на номерOfSinkConsumers/numberOfRelayConsumers SinkConsumers.

+0

Спасибо за это. Второй альтернативный 2-этап - это то, что у меня есть сейчас, но его все еще довольно медленно - отсюда и myquest. Проблема с вашим общедоступным publishObject заключается в том, что это приведет к большому количеству промахов в кеше, если все пользователи просыпаются одновременно. Если потребители спят случайно, что может также привести к условиям гонки ... Я думал, что это очень распространенная проблема, и будет шаблон/решение того, что сделали люди –

+0

@Farouk Alhassan Я не думаю, что у вас есть гонка условие, пока производитель добавляет новые объекты в список предыдущих объектов до того, как он обновит опубликованное местоположение. Кроме того, если вы замените LinkedList на ConcurrentLinkedDeque, то вы не столкнетесь с каким-либо ConcurrentModificationExceptions –

+0

, второй вариант выглядит как лучшее, что я могу сделать. Техас –

Смежные вопросы