2017-02-08 5 views
3

У меня есть наблюдаемые события: ElementAdded (A), ElementRemoved (R), ActionStarted (S) и ActionFinished (F). Некоторые из добавлений и удалений зажаты между ActionStarted и ActionFinished. Я хочу заменить эту подпоследовательность событий одним событием ElementMoved (M), позволяя беззаботным образом летать без задержки. События ElementMoved должны содержать массив со всеми событиями, которые он заменяет. Вот пример:RxJava: подставить подпоследовательность элементов с одним элементом

---A--A--R--S-A-R-F-R-A-A-- 
    (my transformation) 
---A--A--R--------M-R-A-A-- 

ElementMoved должен появиться в тот момент, ActionFinished события.

Кроме того, если не ActionFinished событие не срабатывает после тайм-аута T после последнего зажатой события, то все оригинальные события должны стрелять вместо:

     -----T 
---A1--A2--R3--S4-A5-R6------------R7-A8-A9-- 
    (my transformation) 
---A1--A2--R3---------------S4A5R6-R7-A8-A9-- 

Там может быть ActionFinished событие, которое вызывается после тайм-аута или это никогда не произойдет (как в примере). Если этого не произойдет, нечего делать. Это происходит, и нет открытого окна, событие ActionFinished, чтобы самостоятельно входить в новый поток. Например:

     -----T 
---A1--A2--R3--S4-A5-R6------------F7-A8-A9-- 
    (my transformation) 
---A1--A2--R3---------------S4A5R6-F7-A8-A9-- 

В принципе, если преобразование не может закрыть окно в данном тайм-аут, он должен очистить все удержанные события нетронутым.

Эта вспышка событий также должна произойти, если новое событие S запускается перед соответствующим событием F. (Это новое событие S должно быть удержано в соответствии с приведенной выше логикой). Например,

---A1--A2--R3--S4-A5-R6--S7---R9-A9-A10-F11-A12-- 
    (my transformation) 
---A1--A2--R3------------S4A5R6---------M7- A12-- 

Я играл с оператором окна некоторое время без везения. Оператор буфера вводит задержку для свободно плавающих событий, что неприемлемо в моем случае. Сканирование испускает так много событий, как исходный поток, чего я не хочу. Я, конечно, потерян, поэтому любая помощь будет очень оценена.

Edit 1: Добавлено дело о промывке при появлении нового события S в то время как окно открыто

Edit 2: Уточнить, что двигают события должны содержать список событий, он заменяющие.

Edit 3: Измененный тег из приёмного Java ПРМ-java2

Редактировать 4:. Уточнить, что произойдет, если придет ActionFinished событие после тайм-аута пинков в

Спасибо!

+0

Что произойдет, если у вас есть S A F или S R F, разрешены ли эти шаблоны? – akarnokd

+0

Да, любая произвольная последовательность событий A и R между S и F допускается, включая ноль A и R событий. – luisobo

+0

Сделано пару исправлений, см. Выше. – luisobo

ответ

2

Поскольку мой последний ответ был удален «рецензентами», вот снова ответ с полным исходным кодом. Если это удаляется из-за длинной части кода, я не знаю, что делать. Следуют отметить, что вопрос ФПА требует сложного оператора: модель

package hu.akarnokd.rxjava; 

import java.util.*; 
import java.util.concurrent.*; 
import java.util.concurrent.atomic.AtomicInteger; 
import java.util.concurrent.atomic.AtomicLong; 

import io.reactivex.internal.util.BackpressureHelper; 
import org.reactivestreams.*; 

import io.reactivex.*; 
import io.reactivex.Scheduler.Worker; 
import io.reactivex.disposables.*; 
import io.reactivex.schedulers.Schedulers; 

public class Main { 

    public static void main(String[] args) { 
     Flowable<String> source = Flowable.just(
       "A", "A", "R", "S", "A", "R", "F", "R", "A", "A"); 

     source.lift(new ConditionalCompactor(
       500, TimeUnit.SECONDS, Schedulers.computation())) 
       .subscribe(System.out::println, Throwable::printStackTrace); 

    } 

    static final class ConditionalCompactor implements FlowableOperator<String, String> { 
     final Scheduler scheduler; 

     final long timeout; 

     final TimeUnit unit; 

     ConditionalCompactor(long timeout, TimeUnit unit, 
          Scheduler scheduler) { 
      this.scheduler = scheduler; 
      this.timeout = timeout; 
      this.unit = unit; 
     } 

     @Override 
     public Subscriber<? super String> apply(Subscriber<? super String> t) { 
      return new ConditionalCompactorSubscriber(
        t, timeout, unit, scheduler.createWorker()); 
     } 

     static final class ConditionalCompactorSubscriber 
       implements Subscriber<String>, Subscription { 
      final Subscriber<? super String> actual; 

      final Worker worker; 

      final long timeout; 

      final TimeUnit unit; 

      final AtomicInteger wip; 

      final SerialDisposable mas; 

      final Queue<String> queue; 

      final List<String> batch; 

      final AtomicLong requested; 

      Subscription s; 

      static final Disposable NO_TIMER; 
      static { 
       NO_TIMER = Disposables.empty(); 
       NO_TIMER.dispose(); 
      } 

      volatile boolean done; 
      Throwable error; 

      boolean compacting; 

      int lastLength; 

      ConditionalCompactorSubscriber(Subscriber<? super String> actual, 
              long timeout, TimeUnit unit, Worker worker) { 
       this.actual = actual; 
       this.worker = worker; 
       this.timeout = timeout; 
       this.unit = unit; 
       this.batch = new ArrayList<>(); 
       this.wip = new AtomicInteger(); 
       this.mas = new SerialDisposable(); 
       this.mas.set(NO_TIMER); 
       this.queue = new ConcurrentLinkedQueue<>(); 
       this.requested = new AtomicLong(); 
      } 

      @Override 
      public void onSubscribe(Subscription s) { 
       this.s = s; 
       actual.onSubscribe(this); 
      } 

      @Override 
      public void onNext(String t) { 
       queue.offer(t); 
       drain(); 
      } 

      @Override 
      public void onError(Throwable e) { 
       error = e; 
       done = true; 
       drain(); 
      } 

      @Override 
      public void onComplete() { 
       done = true; 
       drain(); 
      } 

      @Override 
      public void cancel() { 
       s.cancel(); 
       worker.dispose(); 
      } 

      @Override 
      public void request(long n) { 
       BackpressureHelper.add(requested, n); 
       s.request(n); 
       drain(); 
      } 

      void drain() { 
       if (wip.getAndIncrement() != 0) { 
        return; 
       } 
       int missed = 1; 
       for (;;) { 

        long r = requested.get(); 
        long e = 0L; 

        while (e != r) { 
         boolean d = done; 
         if (d && error != null) { 
          queue.clear(); 
          actual.onError(error); 
          worker.dispose(); 
          return; 
         } 
         String s = queue.peek(); 
         if (s == null) { 
          if (d) { 
           actual.onComplete(); 
           worker.dispose(); 
           return; 
          } 
          break; 
         } 

         if (compacting) { 
          batch.clear(); 
          batch.addAll(queue); 
          int n = batch.size(); 
          String last = batch.get(n - 1); 
          if ("S".equals(last)) { 
           if (n > 1) { 
            actual.onNext(queue.poll()); 
            mas.set(NO_TIMER); 
            lastLength = -1; 
            compacting = false; 
            e++; 
            continue; 
           } 
           // keep the last as the start of the new 
           if (lastLength <= 0) { 
            lastLength = 1; 
            mas.set(worker.schedule(() -> { 
             queue.offer("T"); 
             drain(); 
            }, timeout, unit)); 
            this.s.request(1); 
           } 
           break; 
          } else 
          if ("T".equals(last)) { 
           actual.onNext(queue.poll()); 
           compacting = false; 
           mas.set(NO_TIMER); 
           lastLength = -1; 
           e++; 
           continue; 
          } else 
          if ("F".equals(last)) { 
           actual.onNext("M"); 
           while (n-- != 0) { 
            queue.poll(); 
           } 
           compacting = false; 
           mas.set(NO_TIMER); 
           lastLength = -1; 
           e++; 
          } else { 
           if (lastLength != n) { 
            lastLength = n; 
            mas.set(worker.schedule(() -> { 
             queue.offer("T"); 
             drain(); 
            }, timeout, unit)); 
            this.s.request(1); 
           } 
           break; 
          } 
         } else { 
          if ("A".equals(s) || "F".equals(s) || "R".equals(s)) { 
           queue.poll(); 
           actual.onNext(s); 
           e++; 
          } else 
          if ("T".equals(s)) { 
           // ignore timeout markers outside the compacting mode 
           queue.poll(); 
          } else { 
           compacting = true; 
          } 
         } 
        } 

        if (e != 0L) { 
         BackpressureHelper.produced(requested, e); 
        } 

        if (e == r) { 
         if (done) { 
          if (error != null) { 
           queue.clear(); 
           actual.onError(error); 
           worker.dispose(); 
           return; 
          } 
          if (queue.isEmpty()) { 
           actual.onComplete(); 
           worker.dispose(); 
           return; 
          } 
         } 
        } 

        missed = wip.addAndGet(-missed); 
        if (missed == 0) { 
         break; 
        } 
       } 
      } 
     } 
    } 
} 

оператора является типичной очередью стока, но фаза сливы содержит логику для объединения некоторых последующие моделей, которые также требуют другого режима работы.

Edit обновлена ​​RxJava 2.

Edit 2 обновляется с поддержкой противодавления.

+0

Это потрясающе, спасибо. Мне было интересно, есть ли у вас какие-либо указатели, чтобы сделать прочное давление стабильным. Я предполагаю, что каждый раз, когда оператор буферизует, он должен запрашивать еще один восходящий поток. Вопрос касается случая, когда оператор сбрасывает буфер из-за тайм-аута или разбитого окна, он должен очищать только столько элементов, сколько было запрошено. Еще раз спасибо. – luisobo

+0

Я написал два неудачных теста для случаев, которые я имею в виду: https://github.com/luisobo/rxjava-conditional-compacted-operator/pull/2 – luisobo

+0

Да, при обратном нажатии это немного связано и требует шаблонов для тестирования и отладка против вас находится в гораздо лучшем положении. Я рассмотрю тестовые примеры и выработаю решение. – akarnokd

Смежные вопросы