У меня есть программа, которая имеет несколько объектов типа PublisherTask и SubscriberTask. Данный абонент может быть подписан на одного или нескольких издателей.
Чтобы описать мою проблему, я буду размещать код ....Возможно ли peek() в элементах Java ConcurrentLinkedQueue
abstract class Publication {
// some published information
}
class ConcretePublicationA extends Publication {
}
class ConcretePublicationB extends Publication {
}
abstract class Subscription {
private final long id;
private final Subscriber s;
// PLUS some other members relating to the subscription
protected Subscription(long id, Subscriber s){
this.id = id;
this.s =s;
}
public Subscriber getSubscriber() {
return this.s;
}
}
class ConcreteSubscriptionA extends Subscription {
protected ConcreteSubscriptionA(long id, Subscriber s) {
super(id, s);
// TODO Auto-generated constructor stub
}
}
class ConcreteSubscriptionB extends Subscription {
protected ConcreteSubscriptionB(long id, Subscriber s) {
super(id, s);
// TODO Auto-generated constructor stub
}
}
interface Subscriber {
public void update(Publication pub);
}
interface Publisher {
public Subscription subscribe(Subscriber subscriber);
}
abstract class PublisherTask implements Runnable, Publisher {
private final ConcurrentHashMap<Long, Subscription> subscribers =
new ConcurrentHashMap<Long, Subscription>();
Long subscriptionId = 0L;
@Override
public void run() {
/*obviously this is a different variable in a real program*/
boolean some_condition = true;
while(some_condition) {
// do some work
Publication pub = /* new ConcretePublication(....) */ null;
for (Subscription s : subscribers.values()) {
s.getSubscriber().update(pub);
}
}
}
@Override
public Subscription subscribe(Subscriber subscriber) {
Subscription sub;
synchronized(subscriptionId) {
/* the lines below are in a function in the sub-class,
* but for brevity I'm showing them here
*/
sub = new ConcreteSubscriptionA(++subscriptionId, subscriber);
subscribers.put(subscriptionId, sub);
}
return sub ;
}
}
abstract class SubscriberTask implements Runnable, Subscriber {
protected ConcurrentLinkedQueue<Publication> newPublications =
new ConcurrentLinkedQueue<Publication>();
@Override
public void run() {
/*obviously this is a different variable in a real program*/
boolean some_condition = true;
while(some_condition) {
// do some work
Publication pub = newPublications.peek();
/* the lines below are in a function in the sub-class,
* but for brevity I'm showing them here
*/
{
if (pub instanceof ConcretePublicationA) {
// Do something with the published data
} else if (pub instanceof ConcretePublicationB) {
// Do something with the published data
}
}
}
}
@Override
public void update(Publication pub) {
/* My question relates to this method:
* Bascially to avoid memory issues I would like existing
* unprocessed publications **Of Tth Same Type As The New One**
* to be discarded
*/
Publication existing = null;
do {
//This won't work coz peek() only looks at the head of the queue
existing = newPublications.peek();
if ((existing != null) && (existing.getClass().equals(pub))) {
newPublications.remove(existing);
}
} while (existing != null);
newPublications.add(pub);
}
Итак, теперь, когда вы имели возможность тщательно изучить мой код. Я хотел бы задать следующий вопрос:
В приведенном выше методе обновления можно просмотреть все элементы в ConcurrentLinkedQueue и удалить те из данного типа?
Также, пожалуйста, сообщите мне, если вы думаете, что улучшения могут быть внесены в классы и как они взаимодействуют друг с другом.
Спасибо