2015-03-30 3 views
27

У меня есть некоторые проблемы с пониманием того, как subscribeOn/observOn работает в RxJava. Я создал простое приложение с наблюдаемым, которое испускает имена планет солнечной системы, выполняет некоторые сопоставления, фильтрует и печатает результаты.RxJava Наблюдение за вызывающей/подписной нитью

Как я понимаю, планирование работы в фоновом потоке осуществляется с помощью оператора subscribeOn (и, похоже, оно работает нормально).

Наблюдение за фоновым потоком также отлично работает с observeOn.

Но у меня возникли проблемы с пониманием, как наблюдать за вызовом потока (будь то основной поток или любой другой). Это легко сделать на Android с оператором AndroidSchedulers.mainThread(), но я не знаю, как добиться этого в чистой java.

Вот мой код:

public class Main { 

    public static void main(String[] args) throws InterruptedException { 

     ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 3000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); 

     System.out.println("Main thread: " + getCurrentThreadInfo()); 

     Observable<String> stringObservable = Observable.from(Arrays.asList("Merkury", "Wenus", "Ziemia", "Mars", "Jowisz", "Saturn", "Uran", "Neptun", "Pluton")) 
       .map(in -> { 
        System.out.println("map on: " + getCurrentThreadInfo()); 
        return in.toUpperCase(); 
       }) 
       .filter(in -> { 
        System.out.println("filter on: " + getCurrentThreadInfo()); 
        return in.contains("A"); 
       }) 
       .subscribeOn(Schedulers.from(executor)); 

     for (int i = 0; i < 5; i++) { 
      Thread thread = new Thread("Thread-" + i) { 
       @Override 
       public void run() { 
        stringObservable 
          .buffer(5) 
          .subscribe(s -> System.out.println("Result " + s + " on: " + getCurrentThreadInfo())); 
       } 
      }; 
      thread.start(); 
     } 

    } 

    private static String getCurrentThreadInfo() { 
     return Thread.currentThread().getName() + "(" + Thread.currentThread().getId() + ")"; 
    } 
} 

Наблюдаемые в созданном и работа подписался на одну из трех ниток от исполнителя. Это работает так, как ожидалось. Но как наблюдать результаты по тем динамически созданным потокам в цикле? Есть ли способ создания Планировщика из текущего потока?

Кроме того, я узнал, что после запуска этого кода он никогда не заканчивается, и я не знаю почему? :(

+1

Основная тема в Android не * обязательно * вызывающая нить. Это важное различие здесь. Это больше похоже на EDT в Java Swing. –

+0

Конечно, он не всегда вызывает поток, но обычно он используется как поток, по которому должны быть доставлены результаты. Я должен был быть более точным. –

ответ

62

Чтобы ответить на ваш вопрос, позвольте мне начать с самого начала, это позволяет другим людям понять, что вы уже знаете.

Планировщики

Планировщики играют ту же роль, как Исполнителей для Java. Кратко - они принимают решение о том, какие действия потока выполняются.

Обычно наблюдаемый и операторы выполняются в текущем потоке. Иногда вы можете передать планировщик наблюдаемому или оператор в качестве параметра (например, Observable.timer()).

Дополнительно RxJava предоставляет 2 оператора указать Планировщик:

  • subscribeOn - указать планировщик, на котором Наблюдаемые будет работать
  • observeOn - указать планировщик, на котором наблюдатель будет наблюдать этот Observable

Чтобы понять их быстро, я использую код примера:

На всех образцах я буду использовать вспомогательные креа teObservable, который излучает название нити, на которой Наблюдаемые работает:

public static Observable<String> createObservable(){ 
     return Observable.create((Subscriber<? super String> subscriber) -> { 
       subscriber.onNext(Thread.currentThread().getName()); 
       subscriber.onCompleted(); 
      } 
     ); 
    } 

Без планировщиков:

createObservable().subscribe(message -> { 
     System.out.println("Case 1 Observer thread " + message); 
     System.out.println("Case 1 Observable thread " + Thread.currentThread().getName()); 
    }); 
    //will print: 
    //Case 1 Observer thread main 
    //Case 1 Observable thread main 

SubscribeOn:

createObservable() 
      .subscribeOn(Schedulers.newThread()) 
      .subscribe(message -> { 
       System.out.println("Case 2 Observer thread " + message); 
       System.out.println("Case 2 Observable thread " + Thread.currentThread().getName()); 
      }); 
      //will print: 
      //Case 2 Observer thread RxNewThreadScheduler-1 
      //Case 2 Observable thread RxNewThreadScheduler-1 

SubscribeOn и ObserveOn:

reateObservable() 
      .subscribeOn(Schedulers.newThread()) 
      .observeOn(Schedulers.newThread()) 
      .subscribe(message -> { 
       System.out.println("Case 3 Observer thread " + message); 
       System.out.println("Case 3 Observable thread " + Thread.currentThread().getName()); 
      }); 
      //will print: 
      //Case 3 Observer thread RxNewThreadScheduler-2 
      //Case 3 Observable thread RxNewThreadScheduler-1 

ObserveOn :

createObservable() 
      .observeOn(Schedulers.newThread()) 
      .subscribe(message -> { 
       System.out.println("Case 4 Observer thread " + message); 
       System.out.println("Case 4 Observable thread " + Thread.currentThread().getName()); 
      }); 
      //will print: 
      //Case 4 Observer thread main 
      //Case 4 Observable thread RxNewThreadScheduler-1 

Ответ:

AndroidSchedulers.mainThread() возвращает Sheduler, которые делегаты работать в MessageQueue, связанный с основным потоком.
Для этого он использует android.os.Looper.getMainLooper() и android.os.Handler.

Другими словами, если вы хотите указать конкретный поток, вы должны предоставить средства для планирования и выполнения задач в потоке.

Под ним может использоваться любой тип MQ для хранения задач и логики, которые петли Quee и выполняют задачи.

В java у нас есть Executor, который предназначен для выполнения таких задач. RxJava может легко создавать Планировщик от такого Исполнителя.

Ниже приведен пример, который показывает, как вы можете наблюдать за основной нитью (не особенно полезно, но показывать все необходимые детали).

public class RunCurrentThread implements Executor { 

private BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>(); 

public static void main(String[] args) throws InterruptedException { 
    RunCurrentThread sample = new RunCurrentThread(); 
    sample.observerOnMain(); 
    sample.runLoop(); 
} 

private void observerOnMain() { 
    createObservable() 
      .subscribeOn(Schedulers.newThread()) 
      .observeOn(Schedulers.from(this)) 
      .subscribe(message -> { 
       System.out.println("Observer thread " + message); 
       System.out.println("Observable thread " + Thread.currentThread().getName()); 
      }); 
    ; 
} 

public Observable<String> createObservable() { 
    return Observable.create((Subscriber<? super String> subscriber) -> { 
       subscriber.onNext(Thread.currentThread().getName()); 
       subscriber.onCompleted(); 
      } 
    ); 
} 

private void runLoop() throws InterruptedException { 
    while(!Thread.interrupted()){ 
     tasks.take().run(); 
    } 
} 

@Override 
public void execute(Runnable command) { 
    tasks.add(command); 
} 

}

И последний вопрос, почему ваш код не прекращается:

ThreadPoolExecutor использует не Deamon темы, созданные defult, таким образом, ваша программа не заканчивается, пока они не существуют. Чтобы закрыть потоки, вы должны использовать метод shutdown.

+1

в примере «SubscribeOn и ObserveOn:», вы не можете получать результаты, возможно, из-за использования newThread в 'subscribeOn'. Есть ли удобный способ блокировки на нем? использование метода 'toBlocking' не позволяет использовать метод 'subscribe'. – freakman

+1

Вы можете решить проблему по-разному, проще всего использовать Thread.sleep (1000) –

+6

Не ваши примеры назад? «Сообщение», получаемое подписчиком, должно быть именем потока Observable, а «Thread.currentThread(). GetName()» - это имя потока Observer. –

0

Ниже приведен упрощенный пример, обновленный для RxJava 2. Это та же концепция, что и ответ Марека: Исполнитель, который добавляет исполняемые файлы к BlockingQueue, потребляемому в потоке вызывающего.

public class ThreadTest { 

    @Test 
    public void test() throws InterruptedException { 

     final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>(); 

     System.out.println("Caller thread: " + Thread.currentThread().getName()); 

     Observable.fromCallable(new Callable<Integer>() { 
      @Override 
      public Integer call() throws Exception { 
       System.out.println("Observable thread: " + Thread.currentThread().getName()); 
       return 1; 
      } 
     }) 
      .subscribeOn(Schedulers.io()) 
      .observeOn(Schedulers.from(new Executor() { 
       @Override 
       public void execute(@NonNull Runnable runnable) { 
        tasks.add(runnable); 
       } 
      })) 
      .subscribe(new Consumer<Integer>() { 
       @Override 
       public void accept(@NonNull Integer integer) throws Exception { 
        System.out.println("Observer thread: " + Thread.currentThread().getName()); 
       } 
      }); 
     tasks.take().run(); 
    } 

} 

// Output: 
// Caller thread main 
// Observable thread RxCachedThreadScheduler-1 
// Observer thread main 
Смежные вопросы