2014-01-31 3 views
2

Я ищу коллекцию java, которая поддерживает блокировку read() s на предикате. Я написал простую версию, но похоже, что это уже было придумано?Продюсер-потребитель с предикатом

Например:

interface PredicateConsumerCollection<T> { 

    public void put(T t); 

    @Nullable 
    public T get(Predicate<T> p, long millis) throws InterruptedException; 
} 

put() подает свой аргумент к ожидающему потребителю с соответствующим предикатом, или тайниками его в магазине. A get() немедленно возвращается, если подходящий номер T уже находится в магазине или блокируется до тех пор, пока не будет выбрано подходящее значение() или не истечет время ожидания. Потребители конкурируют, но справедливость в моем случае не является критичной.

Кто-нибудь знает о такой коллекции?

ответ

1

Нет немедленного класса, который может решить вашу проблему, но комбинация ConcurrentHashMap и BlockingQueue может быть решением.

хэш-карта определяется как:

final ConcurrentHashMap<Predicate, LinkedBlockingQueue<Result>> lookup; 

Нанесенные необходимо обеспечить, чтобы для каждого предиката очереди добавляется к карте, это можно сделать поточно-с помощью putIfAbsent.

Если у вас есть фиксированный набор Предикаты, вы можете просто предварительно заполнить список, то потребитель может просто вызвать lookup.get(Predicate).take()

Если сумма Предикаты неизвестна/слишком много, вам нужно написать выжидательную/уведомлять о реализации для Потребителей в случае, если Предикат еще не внесен в список по своему усмотрению.

+0

это работает только, если каждый предикат описывает отчетливый набор 'Result'. Если у вас есть, например, два предиката «x <3» и «x <5», то в результате наборы результатов перекрываются, и вам нужно сохранить «Результат» в двух очередях, если «Result.x = 1». – Ralf

+0

Хорошее предложение и спасибо! Но, к сожалению, в моем случае предикаты не известны заранее. – tariksbl

0

Мне также нужно что-то очень похожее для тестирования того, что определенное асинхронное сообщение JMS получено в течение определенного таймаута. Оказывается, ваш вопрос относительно легко реализовать, используя базовый wait/notify, как описано в Oracle tutorials. Идея заключается в том, чтобы синхронизировать методы put и query и позволить методу запроса ждать. Метод put вызывает notifyAll для пробуждения любых ожидающих потоков в методе запроса. Затем метод запроса должен проверить соответствие совпадения. Самое сложное - получить тайм-аут прямо из-за пробуждения, когда предикат не соответствует и из-за возможных «побочных пробуждений». Я нашел this stackoverflow post, который дает ответ.

Вот реализация я придумал:

import java.util.ArrayList; 
import java.util.List; 

// import net.jcip.annotations.GuardedBy; 

import com.google.common.base.Predicate; 
import com.google.common.collect.Iterables; 

public class PredicateConsumerCollectionImpl<T> implements 
     PredicateConsumerCollection<T> { 

    // @GuardedBy("this") 
    private List<T> elements = new ArrayList<>(); 

    @Override 
    public synchronized void put(T t) { 
     elements.add(t); 
     notifyAll(); 
    } 

     @Override 
public synchronized T query(Predicate<T> p, long millis) 
     throws InterruptedException { 
    T match = null; 
    long nanosOfOneMilli = 1000000L; 
    long endTime = System.nanoTime() + millis * nanosOfOneMilli; 
    while ((match = Iterables.find(elements, p, null)) == null) { 
     long sleepTime = endTime - System.nanoTime(); 
     if (sleepTime <= 0) { 
      return null; 
     } 
     wait(sleepTime/nanosOfOneMilli, 
       (int) (sleepTime % nanosOfOneMilli)); 
    } 
    return match; 
} 

    synchronized boolean contains(T t) { 
     return elements.contains(t); 
    } 
} 

А вот тест JUnit, который доказывает, что код работает как задумано:

import static org.junit.Assert.assertEquals; 
import static org.junit.Assert.assertFalse; 
import static org.junit.Assert.assertTrue; 
import static org.junit.Assert.fail; 

import org.junit.Before; 
import org.junit.Test; 

import com.google.common.base.Predicate; 

/** 
* Unit test for the {@link PredicateConsumerCollection} implementation. 
* 
* <p> 
* The tests act as consumers waiting for the test Producer to put a certain 
* String. 
*/ 
public class PredicateConsumerCollectionTest { 

    private static class Producer implements Runnable { 

     private PredicateConsumerCollection<String> collection; 

     public Producer(PredicateConsumerCollection<String> collection) { 
      this.collection = collection; 
      collection.put("Initial"); 
     } 

     @Override 
     public void run() { 
      try { 
       int millis = 50; 
       collection.put("Hello"); 
       Thread.sleep(millis); 
       collection.put("I"); 
       Thread.sleep(millis); 
       collection.put("am"); 
       Thread.sleep(millis); 
       collection.put("done"); 
       Thread.sleep(millis); 
       collection.put("so"); 
       Thread.sleep(millis); 
       collection.put("goodbye!"); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
       fail("Unexpected InterruptedException"); 
      } 
     } 

    } 

    private PredicateConsumerCollectionImpl<String> collection; 
    private Producer producer; 

    @Before 
    public void setup() { 
     collection = new PredicateConsumerCollectionImpl<>(); 
     producer = new Producer(collection); 
    } 

    @Test(timeout = 2000) 
    public void wait_for_done() throws InterruptedException { 
     assertTrue(collection.contains("Initial")); 
     assertFalse(collection.contains("Hello")); 

     Thread producerThread = new Thread(producer); 
     producerThread.start(); 

     String result = collection.query(new Predicate<String>() { 
      @Override 
      public boolean apply(String s) { 
       return "done".equals(s); 
      } 
     }, 1000); 
     assertEquals("done", result); 
     assertTrue(collection.contains("Hello")); 
     assertTrue(collection.contains("done")); 

     assertTrue(producerThread.isAlive()); 
     assertFalse(collection.contains("goodbye!")); 

     producerThread.join(); 

     assertTrue(collection.contains("goodbye!")); 
    } 

    @Test(timeout = 2000) 
    public void wait_for_done_immediately_happens() throws InterruptedException { 
     Thread producerThread = new Thread(producer); 
     producerThread.start(); 

     String result = collection.query(new Predicate<String>() { 
      @Override 
      public boolean apply(String s) { 
       return "Initial".equals(s); 
      } 
     }, 1000); 
     assertEquals("Initial", result); 
     assertFalse(collection.contains("I")); 

     producerThread.join(); 

     assertTrue(collection.contains("goodbye!")); 
    } 

    @Test(timeout = 2000) 
    public void wait_for_done_never_happens() throws InterruptedException { 
     Thread producerThread = new Thread(producer); 
     producerThread.start(); 

     assertTrue(producerThread.isAlive()); 

     String result = collection.query(new Predicate<String>() { 
      @Override 
      public boolean apply(String s) { 
       return "DONE".equals(s); 
      } 
     }, 1000); 

     assertEquals(null, result); 
     assertFalse(producerThread.isAlive()); 
     assertTrue(collection.contains("goodbye!")); 
    } 

} 
+0

Спасибо. Мой был похож, я использовал Guava 'SettableFuture' (обещание), чтобы избежать цикла ожидания ожидания. – tariksbl

+0

@tariksbl интересно, добавив, что шаблонный шаблон, похоже, дублирует то, что уже должно быть в некоторой утилите параллелизма. У вас есть реализация для того же интерфейса, который проходит один и тот же модульный тест? Я не вижу, как SettableFuture может сделать код проще. Должен ли метод put затем называть его, когда приходит элемент, для которого соответствует предикат? Это означает, что вам нужно поддерживать коллекцию будущих и предикатов для каждого выполняемого в настоящее время запроса. –

+0

Я не могу опубликовать код, но да, мой impl поддерживает коллекцию официантов, по которой проходит каждый 'put()'. – tariksbl

Смежные вопросы