2016-02-22 2 views
3

Я использую следующий, чтобы получить ExecutorService для подписок:RXJava - нить приоритет

public static ExecutorService getThreadPoolExecutorService(int threads) 
{ 
    int NUMBER_OF_CORES = Runtime.getRuntime().availableProcessors(); 
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
      NUMBER_OF_CORES * 2, 
      NUMBER_OF_CORES * 2, 
      60L, 
      TimeUnit.SECONDS, 
      new LinkedBlockingQueue<Runnable>() 
    ); 
    return Executors.newFixedThreadPool(threads, threadPoolExecutor.getThreadFactory()); 
} 

И использовать его как следующее:

public static Scheduler getBackgroundSchedular() 
{ 
    if (mBackgroundExecutorService == null) 
     mBackgroundExecutorService = ThreadUtil.getThreadPoolExecutorService(4); 
    return Schedulers.from(mBackgroundExecutorService); 
} 

И я использую этот Schedular для моих наблюдаемых, что должна бежать в фоновом режиме.

Вопрос

Как использовать PriorityBlockingQueue для RXJava? Обычно я использую некоторые специальные runnables, которые реализуют функцию сравнения и используют соответствующую функцию сравнения для PriorityBlockingQueue и заменяют LinkedBlockingQueue в примере выше на PriorityBlockingQueue, но как это сделать с помощью RXJava-наблюдаемых?

ответ

2

Планировщики в RxJava являются ортогональными к потоку данных и не знают о них вообще. Все, что они делают, это выполнить экземпляр Runnable, и все Runnables, созданные операторами, рождаются равными. Таким образом, не имеет смысла использовать очереди приоритетов с Schedulers.

Кроме того, потоки данных последовательны и, если нет какой-либо границы, такой как обмен нитями, они сохраняют свой порядок. Если вы не соберете события и не сделаете заказ вручную, есть вероятность, что оператор с приоритетной очередью будет переупорядочивать данные, как вы ожидаете.

Редактировать

Немного нетрадиционный, но вы могли бы также заложить PriorityBlockingQueue через субъект и некоторые операторы, так что вы можете кормить последовательность с задачами:

PriorityBlockingQueue<Task> q = ... 

Subject<Integer, Integer> subject = PublishSubject.<Integer>create().toSerialized(); 

subject 
.map(v -> q.poll()) 
.doOnNext(v -> v.execute()) 
.subscribe(); 

q.offer(new Task(...)); 
subject.onNext(1); 
+0

Рассмотрим следующий пример: У меня есть один который загружает все папки и содержимое с SD-карты. Я уже показываю папки, как только я знаю их имя, наблюдаемый продолжит работу в фоновом режиме. Когда я нажимаю на разгруженную папку, я создаю вторую наблюдаемую, которая загружает только одно содержимое папок => Я хочу, чтобы этот поток имел более высокий приоритет, чем поток основных наблюдаемых. Отмена подписки на основной наблюдаемый является опцией, но мне все равно нужны данные от нее, поэтому отказ от подписки - это не самый лучший вариант ... – prom85

+0

Что не так с использованием планировщика ввода-вывода для запуска этого единственного каталога и разрешения на работу в ОС? – akarnokd

+0

Я вижу значительные улучшения скорости, когда я сначала отменяю подписку на основные наблюдаемые ... Я хотел проверить, имеет ли более высокий приоритет для наблюдаемой одной папки одинаковый эффект ... – prom85

Смежные вопросы