2017-01-27 4 views
2

Я видел этот вопрос спросили раньше, но ответы всегда были неудовлетворительными для меня, поэтому я постараюсь быть точными:ограничения числа одновременной HTTP-запросы с RxJS и Angular2

Я использую https://www.npmjs.com/package/rxjs и хочу очереди много запросов HTTP и я хочу цепи тех, в других операциях, как это:

urls$.flatMap(x => fetchUrls(x)).subscribe(x => console.log(x)); 

URL $ будучи Observable и fetchUrls быть метод, возвращающий новый Observable. Это будет хорошо работать, за исключением того, что первый поток, исходящий из URL-адресов, довольно быстрый, и метод fetchUrls работает медленно (выполняя HTTP-запрос/ответ). В итоге я получаю 200+ запросов HTTP одновременно.

Я думаю, что оставляя 200+ ожидающих http-запросов на клиенте, будет задавать проблемы, поэтому я хочу как-то объединить запросы, и я прочитал, что можно установить максимальные одновременные запросы при использовании mergeMap/flatMap, что действительно хороший.

Я создал площадку для вещи здесь: http://www.webpackbin.com/EkQXtBEwz У меня есть метод, называемый fakeHttpLookup, который задерживает ответ на второй и кормить его со списком 200 URLs - если запустить последовательно потребуется 200 секунд. С maxConcurrent, установленным на 5, я думаю, в лучшем случае он позаботится обо всем потоке за 40 секунд.

Дело в том, что я не могу заставить его работать. Это занимает первые 5, но затем останавливается. Он никогда не завершает весь поток.

Я бы ожидал увидеть счетчик с шагом 5 за каждую прошедшую секунду, что иллюстрирует 5 одновременных HTTP-запросов за раз.

Может ли кто-нибудь помочь? Я могу жить только с одним запросом за один раз - я мог бы буферизировать 5 из них за раз, но главное здесь не в том, чтобы ставить в очередь все 200 сразу - или только принимать первые несколько.

TLDR: Я хочу, чтобы иметь возможность отправлять/отправлять HTTP-запросы, а поток должен иметь возможность ограничить количество запросов, не догадываясь о задержке с жестким кодированием.

ответ

1

.flatMap(), также известный как .mergeMap() имеет дополнительный аргумент 'параллелизмом'

mergeMap (проект: Функция (значение: T, индекс: номер): Наблюдаемое, resultSelector: функция (outerValue: Т, innerValue: Я , outerIndex: номер, innerIndex: номер): любой, одновременно: номер)

Таким образом, ее просто вопрос:

function fetch(id) { 
 
    return Rx.Observable.of(id) 
 
    .do(i => console.log(`fetching request ${i}`)) 
 
    .delay(2 * 1000); 
 
} 
 

 
Rx.Observable.range(1, 20) 
 
    .flatMap(id => fetch(id), null, 5) 
 
    .subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>

выполнить 5 одновременных запросов

+0

Вот что я подумал, не работает, хотя ... –

+0

я посмотрел на вашу скрипку, но, кажется, что вы кладете параллелизм '5', как арг для функция 'resultSelector'? –

+0

Я немного поиграл с ним - вы можете попробовать и отредактировать, если считаете, что можете взломать его на работу?Я пробовал как второй, так и третий аргумент - ни для меня не работал –