2015-08-05 27 views
1

Я реализую простые аналитические функции данных с помощью RXJava, где абонент темы асинхронно обрабатывает данные, опубликованные в теме, депонируя выход в Redis.Обработка RxJava-Aynchronous Stream

Когда сообщение получено, компонент Spring публикует его в Observable. Чтобы избежать блокировки представления, я использовал RxJava Async, чтобы сделать это асинхронно.

@Override 
public void onMessage(final TransactionalMessage message) { 
    Async.start(new Func0<Void>() { 
     @Override 
     public Void call() { 
      analyser.process(message); 
      return null; 
     } 
    }); 
} 

У меня есть два путаницы при реализации других обрабатывающих деталей; 1) создание асинхронного наблюдения с буферизацией 2) вычисление различных логик параллельно на основе типа сообщения в списке сообщений.

После долгих экспериментов я нашел два способа создания Async Observable и не уверен, какой из них является правильным и лучшим подходом.

Путь один,

private static final class Analyzer { 

private Subscriber<? super TransactionalMessage> subscriber; 

public Analyzer() { 
    OnSubscribe<TransactionalMessage> f = subscriber -> this.subscriber = subscriber; 
    Observable.create(f).observeOn(Schedulers.computation()) 
      .buffer(5, TimeUnit.SECONDS, 5, Schedulers.io()) 
      .skipWhile((list) -> list == null || list.isEmpty()) 
      .subscribe(t -> compute(t)); 
} 
public void process(TransactionalMessage message) { 
     subscriber.onNext(message); 
    } 

}

Way два

private static final class Analyser { 

private PublishSubject<TransactionalMessage> subject; 

public Analyser() { 
    subject = PublishSubject.create(); 
    Observable<List<TransactionalMessage>> observable = subject 
      .buffer(5, TimeUnit.SECONDS, 5, Schedulers.io()) 
      .observeOn(Schedulers.computation()); 
    observable.subscribe(new Observer<List<TransactionalMessage>>() { 

    @Override 
    public void onCompleted() { 
     log.debug("[Analyser] onCompleted(), completed!"); 
    } 

    @Override 
    public void onError(Throwable e) { 
     log.error("[Analyser] onError(), exception, ", e); 
    } 

    @Override 
    public void onNext(List<TransactionalMessage> t) { 
     compute(t); 
    } 
    }); 
} 

public void process(TransactionalMessage message) { 
    subject.onNext(message); 
} 
} 

TransactionalMessage поставляется в различных типов, поэтому я хочу, чтобы выполнять различные вычисления, основанные на типах. Один из подходов, который я пробовал, - это фильтровать список на основе каждого типа и обрабатывать его отдельно, но это выглядит так плохо, и я думаю, что он не работает параллельно. Какой способ обрабатывать их параллельно?

protected void compute(List<TransactionalMessage> messages) { 
     Observable<TransactionalMessage> observable = Observable 
       .from(messages); 
     Observable<String> observable2 = observable 
       .filter(new Func1<TransactionalMessage, Boolean>() { 

        @Override 
        public Boolean call(TransactionalMessage t) { 
         return t.getMsgType() 
           .equals(OttMessageType.click.name()); 
        } 
       }).flatMap(
         new Func1<TransactionalMessage, Observable<String>>() { 

          @Override 
          public Observable<String> call(
            TransactionalMessage t) { 
           return Observable.just(
             t.getMsgType() + t.getAppId()); 
          } 
         }); 

     Observable<String> observable3 = observable 
       .filter(new Func1<TransactionalMessage, Boolean>() { 

        @Override 
        public Boolean call(TransactionalMessage t) { 
         return t.getMsgType() 
           .equals(OttMessageType.image.name()); 
        } 
       }).flatMap(
         new Func1<TransactionalMessage, Observable<String>>() { 

          @Override 
          public Observable<String> call(
            TransactionalMessage t) { 
           return Observable.just(
             t.getMsgType() + t.getAppId()); 
          } 
         }); 

     // I sense some code smell in filtering on type and processing it. 

     Observable.merge(observable2, observable3) 
       .subscribe(new Action1<String>() { 

        @Override 
        public void call(String t) { 
         // save it to redis 
         System.out.println(t); 
        } 
       }); 
    } 

ответ

0

Я предлагаю думать о Subject сек, прежде чем пытаться использовать create.

Если вы хотите параллельная обработка выполняется на основе некоторой категоризации, вы могли бы использовать groupBy вместе с observeOn для достижения желаемого эффекта:

Observable.range(1, 100) 
.groupBy(v -> v % 3) 
.flatMap(g -> 
    g.observeOn(Schedulers.computation()) 
    .reduce(0, (a, b) -> a + b) 
    .map(v -> g.getKey() + ": " + v) 
) 
.toBlocking().forEach(System.out::println); 
Смежные вопросы