2017-01-17 6 views
0

У меня есть входящие запросы обработки, из которых я хочу, не слишком много обрабатывать одновременно из-за истощения общих ресурсов. Я также предпочел бы запросы, которые разделяют некоторые уникальный ключ, чтобы не быть выполнены одновременно:Объединение groupBy и flatMap (maxConcurrent, ...) в RxJava/RxScala

def process(request: Request): Observable[Answer] = ??? 

requestsStream 
    .groupBy(request => request.key) 
    .flatMap(maxConcurrentProcessing, { case (key, requestsForKey) => 
     requestsForKey 
     .flatMap(1, process) 
    }) 

Однако выше не работает, потому что наблюдаемый за ключ никогда не завершается. Каков правильный способ достичь этого?

Что не работает:

.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) => 
     // Take(1) unsubscribes after the first, causing groupBy to create a new observable, causing the next request to execute concurrently 
     requestsForKey.take(1) 
     .flatMap(1, process) 
    }) 

.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) => 
     // The idea was to unsubscribe after 100 milliseconds to "free up" maxConcurrentProcessing 
     // This discards all requests after the first if processing takes more than 100 milliseconds 
     requestsForKey.timeout(100.millis, Observable.empty) 
     .flatMap(1, process) 
    }) 

ответ

1

Вот как мне удалось этого добиться. Для каждого уникального ключа я назначаю выделенный единственным планировщик потоков (так что сообщения с тем же ключом обрабатывается в произвольном порядке):

@Test 
public void groupBy() throws InterruptedException { 
    final int NUM_GROUPS = 10; 
    Observable.interval(1, TimeUnit.MILLISECONDS) 
      .map(v -> { 
       logger.info("received {}", v); 
       return v; 
      }) 
      .groupBy(v -> v % NUM_GROUPS) 
      .flatMap(grouped -> { 
       long key = grouped.getKey(); 
       logger.info("selecting scheduler for key {}", key); 
       return grouped 
         .observeOn(assignScheduler(key)) 
         .map(v -> { 
          String threadName = Thread.currentThread().getName(); 
          Assert.assertEquals("proc-" + key, threadName); 
          logger.info("processing {} on {}", v, threadName); 
          return v; 
         }) 
         .observeOn(Schedulers.single()); // re-schedule 
      }) 
      .subscribe(v -> logger.info("got {}", v)); 

    Thread.sleep(1000); 
} 

В моем случае количество ключей (NUM_GROUPS) мало, поэтому я создать специальный планировщик для каждый ключ:

Scheduler assignScheduler(long key) { 
    return Schedulers.from(Executors.newSingleThreadExecutor(
     r -> new Thread(r, "proc-" + key))); 
} 

в случае количество ключей бесконечно или слишком большое, чтобы выделить поток для каждого из них, вы можете создать пул планировщиков и использовать их так:

Scheduler assignScheduler(long key) { 
    // assign randomly 
    return poolOfSchedulers[random.nextInt(SIZE_OF_POOL)]; 
} 
+0

N я не буду работать в моем случае, поскольку поток запросов является жарким и долговечным и содержит много разных ключей. Кроме того, я бы предпочел не создавать дополнительные потоки, но использовать тот же threadpool. – dtech

+0

@dtech Идея состоит в том, что нельзя назначать задания для определенных потоков, но вы можете планировать их для определенных планировщиков, например. однопоточные. Я добавил решение для большого количества ключей в моем ответе. –

Смежные вопросы