У меня есть входящие запросы обработки, из которых я хочу, не слишком много обрабатывать одновременно из-за истощения общих ресурсов. Я также предпочел бы запросы, которые разделяют некоторые уникальный ключ, чтобы не быть выполнены одновременно:Объединение groupBy и flatMap (maxConcurrent, ...) в RxJava/RxScala
def process(request: Request): Observable[Answer] = ???
requestsStream
.groupBy(request => request.key)
.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) =>
requestsForKey
.flatMap(1, process)
})
Однако выше не работает, потому что наблюдаемый за ключ никогда не завершается. Каков правильный способ достичь этого?
Что не работает:
.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) =>
// Take(1) unsubscribes after the first, causing groupBy to create a new observable, causing the next request to execute concurrently
requestsForKey.take(1)
.flatMap(1, process)
})
.flatMap(maxConcurrentProcessing, { case (key, requestsForKey) =>
// The idea was to unsubscribe after 100 milliseconds to "free up" maxConcurrentProcessing
// This discards all requests after the first if processing takes more than 100 milliseconds
requestsForKey.timeout(100.millis, Observable.empty)
.flatMap(1, process)
})
N я не буду работать в моем случае, поскольку поток запросов является жарким и долговечным и содержит много разных ключей. Кроме того, я бы предпочел не создавать дополнительные потоки, но использовать тот же threadpool. – dtech
@dtech Идея состоит в том, что нельзя назначать задания для определенных потоков, но вы можете планировать их для определенных планировщиков, например. однопоточные. Я добавил решение для большого количества ключей в моем ответе. –