2009-12-30 3 views
0

У меня есть программа, которая имеет несколько объектов типа PublisherTask и SubscriberTask. Данный абонент может быть подписан на одного или нескольких издателей.
Чтобы описать мою проблему, я буду размещать код ....Возможно ли peek() в элементах Java ConcurrentLinkedQueue

abstract class Publication { 
    // some published information 
} 

class ConcretePublicationA extends Publication { 

} 

class ConcretePublicationB extends Publication { 

} 

abstract class Subscription { 
    private final long id; 
    private final Subscriber s; 
    // PLUS some other members relating to the subscription 

    protected Subscription(long id, Subscriber s){ 
     this.id = id; 
     this.s =s; 
    } 

    public Subscriber getSubscriber() { 
     return this.s; 
    } 


} 

class ConcreteSubscriptionA extends Subscription { 

    protected ConcreteSubscriptionA(long id, Subscriber s) { 
     super(id, s); 
     // TODO Auto-generated constructor stub 
    } 

} 

class ConcreteSubscriptionB extends Subscription { 

    protected ConcreteSubscriptionB(long id, Subscriber s) { 
     super(id, s); 
     // TODO Auto-generated constructor stub 
    } 

} 

interface Subscriber { 
    public void update(Publication pub); 
} 

interface Publisher { 
    public Subscription subscribe(Subscriber subscriber); 
} 

abstract class PublisherTask implements Runnable, Publisher { 
    private final ConcurrentHashMap<Long, Subscription> subscribers = 
     new ConcurrentHashMap<Long, Subscription>(); 
    Long subscriptionId = 0L; 

    @Override 
    public void run() { 
     /*obviously this is a different variable in a real program*/ 
     boolean some_condition = true; 

     while(some_condition) { 
      // do some work 
      Publication pub = /* new ConcretePublication(....) */ null; 

      for (Subscription s : subscribers.values()) { 
       s.getSubscriber().update(pub); 
      } 
     } 

    } 

    @Override 
    public Subscription subscribe(Subscriber subscriber) { 
     Subscription sub; 

     synchronized(subscriptionId) { 
      /* the lines below are in a function in the sub-class, 
      * but for brevity I'm showing them here 
      */ 
      sub = new ConcreteSubscriptionA(++subscriptionId, subscriber); 
        subscribers.put(subscriptionId, sub); 
     } 
     return sub ; 
    } 

} 


abstract class SubscriberTask implements Runnable, Subscriber { 

    protected ConcurrentLinkedQueue<Publication> newPublications = 
     new ConcurrentLinkedQueue<Publication>(); 

    @Override 
    public void run() { 
     /*obviously this is a different variable in a real program*/ 
     boolean some_condition = true; 

     while(some_condition) { 
      // do some work 
      Publication pub = newPublications.peek(); 

     /* the lines below are in a function in the sub-class, 
     * but for brevity I'm showing them here 
     */ 
     { 
      if (pub instanceof ConcretePublicationA) { 
       // Do something with the published data 
      } else if (pub instanceof ConcretePublicationB) { 
       // Do something with the published data 
      } 
     } 
     } 
    } 

    @Override 
    public void update(Publication pub) { 

     /* My question relates to this method: 
      * Bascially to avoid memory issues I would like existing 
      * unprocessed publications **Of Tth Same Type As The New One** 
      * to be discarded 
      */ 
     Publication existing = null; 

     do { 
      //This won't work coz peek() only looks at the head of the queue 
        existing = newPublications.peek(); 

      if ((existing != null) && (existing.getClass().equals(pub))) { 
       newPublications.remove(existing); 
      } 
     } while (existing != null); 
     newPublications.add(pub); 
    } 

Итак, теперь, когда вы имели возможность тщательно изучить мой код. Я хотел бы задать следующий вопрос:
В приведенном выше методе обновления можно просмотреть все элементы в ConcurrentLinkedQueue и удалить те из данного типа?
Также, пожалуйста, сообщите мне, если вы думаете, что улучшения могут быть внесены в классы и как они взаимодействуют друг с другом.
Спасибо

ответ

0

Я решил эту проблему (или аналогичную) раньше. Способ, которым я решил это, - эффективно встраивать счетчик в каждый новый объект Publication (для того, что вы делаете, вам понадобится счетчик для каждого типа класса). Когда вы читаете объекты публикации вне очереди, если счетчик внутри потребляемого объекта меньше текущего значения, то отбрасывается, поскольку в очереди находится более поздний.

Однако вы должны быть осторожны с этим, что не собираетесь голодать ваши публикации процессора, поскольку, если вы будете добавлять новые объекты публикации достаточно быстро, вы никогда не будете обрабатывать их, так как все они будут отмечены как (я знал, что добавляю очень медленно, как каждый раз каждые 10 минут, и мои задачи занимают пару минут каждый, чтобы работать, при этом старые задачи бесполезны, если за ним стояла новая).

Ваше решение выглядит, если вы добавляете A1, A2, B1, A3, A4, B2, а затем, когда вы добавляете B2, вы не удалите B1, так как A1 находится во главе очереди и не имеет тот же тип, который не похоже на то, что ваш комментарий подразумевает, что вы хотите.

Вы могли бы реализовать свою идею, поставив некоторый вспомогательный код, как:

class ValidCounter { 
    private Map<Class, AtomicLong> counters = new ConcurrentHashMap<Class, AtomicLong>(); 
    public int getAtomicLongFor(Class clz) { 
     AtomicLong ans = counters.get(clz); 
     if (ans == null) { 
      counters.put(clz, new AtomicLong(0)); 
      return 0; 
     } 
     return ans.get(); 
    } 
} 

Для использования при добавлении вы могли бы сделать в конструкторе

long myValidCounter = validCounterInstance.getAtomicLongFor(getClass()).incrementAndGet(); 

Для использования при проверке, если по-прежнему должно обрабатывать вы могли бы использование

long currValidCounter = validCounterInstance.getAtomicLongFor(getClass()).get(); 
if (pub.getValidCounter() >= currValidCounter) { 
    // still valid so process 
} else { 
    // superceeded, so ignore 
} 

Помните, что проблема голода - возможно, вы захотите добавить некоторые вещь, которая заботится о том, сколько или как долго вы отбрасываете вещи и просто берете старую в любом случае.

Я бы также предложил вам не использовать peek вообще и просто использовать опрос, или вы, скорее всего, получите проблемы с потоками (элементы обрабатываются несколько раз), если у вас несколько пользователей.

1

Да, можно просмотреть все элементы ConcurrentLinkedQueue с помощью итератора.

Iterator<Publication> itr = newPublications.iterator(); 
while (itr.hasNext()) { 
    Publication existing = itr.next(); 
    if (existing.getClass().equals(pub)) { 
     itr.remove(); 
    } 
} 

Поскольку итератор, возвращаемый ConcurrentLinkedQueue гарантирует траверс элементы, которые существовали при строительстве итератора, и может (но не гарантируется) отражать любые изменения, проведенные после строительства, вы можете внешне заблокировать вокруг:

newPublications 

в общем, хотя это, кажется, не очень эффективным, так альтернативное решение в целом, чтобы сократить дублирующие публикации должны быть исследованы.

Смежные вопросы