Смотрите эту демо: https://jsbin.com/todude/10/edit?js,console
Обратите внимание, что я пытаюсь получить кэшированные результаты в 1200ms
когда дело аннулируется, а затем на 1300ms
, когда предыдущий запрос все еще находится на рассмотрении (требуется 200ms
). Оба результата получены так, как они должны.
Это происходит потому, что, когда вы подписываетесь и publishReplay()
не содержит никакого действительного значения, он ничего не испустит и не завершится немедленно (спасибо take(1)
), поэтому ему необходимо подписаться на его источник, который делает HTTP-запросы (это фактически происходит в refCount()
).
Тогда второй абонент ничего не получит и будет добавлен к массиву наблюдателей в publishReplay()
.Он не будет делать другую подписку, потому что он уже подписан на свой источник (refCount()
) и ждет ответа.
Так что ситуация, о которой вы описываете, не должна произойти, я думаю. Со временем вы получите демоверсию, демонстрирующую вашу проблему.
РЕДАКТИРОВАТЬ:
испуская как деталь признана недействительной и свежие продукты
Следующий пример показывает немного другой, чем функциональность связанного примера. Если кешированный отклик недействителен, он будет выходить в любом случае, а затем получит также новое значение. Это означает, что абонент получает один или два значения:
- 1 Значение: кэшируются значение значения
- 2: аннулированное кэшированное значение, а затем новым свежее значение, которое будет кэшированным с этим момента.
Код может выглядеть следующим образом:
let counter = 1;
const RECACHE_INTERVAL = 1000;
function mockDataFetch() {
return Observable.of(counter++)
.delay(200);
}
let source = Observable.defer(() => {
const now = (new Date()).getTime();
return mockDataFetch()
.map(response => {
return {
'timestamp': now,
'response': response,
};
});
});
let updateRequest = source
.publishReplay(1)
.refCount()
.concatMap(value => {
if (value.timestamp + RECACHE_INTERVAL > (new Date()).getTime()) {
return Observable.from([value.response, null]);
} else {
return Observable.of(value.response);
}
})
.takeWhile(value => value);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 0:", val)), 0);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 50:", val)), 50);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 200:", val)), 200);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1200:", val)), 1200);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1300:", val)), 1300);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1500:", val)), 1500);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3500:", val)), 3500);
Смотреть демо: https://jsbin.com/ketemi/2/edit?js,console
Печатает на консоль следующий вывод:
Response 0: 1
Response 50: 1
Response 200: 1
Response 1200: 1
Response 1300: 1
Response 1200: 2
Response 1300: 2
Response 1500: 2
Response 3500: 2
Response 3500: 3
Уведомление 1200
и 1300
получил сначала старое кешированное значение 1
немедленно a затем другое значение со свежим значением 2
.
С другой стороны, 1500
получил только новое значение, потому что 2
уже кэширован и действителен.
Самая путаная вещь, вероятно, почему я использую concatMap().takeWhile()
. Это связано с тем, что я должен убедиться, что новый ответ (а не недействительный) является последним значением перед отправкой полного уведомления, и, вероятно, для этого нет оператора (ни first()
, ни takeWhile()
не применимы для этого прецедента).
испуская только текущий элемент, не дожидаясь обновлений
еще один потребительной случай может быть, когда мы хотим, чтобы излучать только кэшированные значения в то время, не дожидаясь свежим ответ от запроса HTTP.
let counter = 1;
const RECACHE_INTERVAL = 1000;
function mockDataFetch() {
return Observable.of(counter++)
.delay(200);
}
let source = Observable.defer(() => {
const now = (new Date()).getTime();
return mockDataFetch()
.map(response => {
return {
'timestamp': now,
'response': response,
};
});
});
let updateRequest = source
.publishReplay(1)
.refCount()
.concatMap((value, i) => {
if (i === 0) {
if (value.timestamp + RECACHE_INTERVAL > (new Date()).getTime()) { // is cached item valid?
return Observable.from([value.response, null]);
} else {
return Observable.of(value.response);
}
}
return Observable.of(null);
})
.takeWhile(value => value);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 0:", val)), 0);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 50:", val)), 50);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 200:", val)), 200);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1200:", val)), 1200);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1300:", val)), 1300);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1500:", val)), 1500);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3500:", val)), 3500);
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3800:", val)), 3800);
Смотреть демо: https://jsbin.com/kebapu/2/edit?js,console
Этот пример выводит на консоль:
Response 0: 1
Response 50: 1
Response 200: 1
Response 1200: 1
Response 1300: 1
Response 1500: 2
Response 3500: 2
Response 3800: 3
Обратите внимание, что оба 1200
и 1300
получить значение 1
, потому что это кэшированное значение, хотя сейчас это недопустимо. Первый вызов в 1200
просто порождает новый HTTP-запрос без ожидания его ответа и испускает только кешированное значение. Затем в 1500
свежая ценность кэшируется, так что она просто переиздается. То же самое относится к 3500
и 3800
.
Обратите внимание, что подписчик на 1200
получит уведомление next
, но уведомление complete
будет отправлено только после завершения HTTP-запроса. Нам нужно подождать, потому что если мы отправим complete
сразу после next
, это сделает цепочку для размещения своих расходных материалов, которая также должна отменить HTTP-запрос (это то, что мы определенно не хотим делать).
Спасибо за предложение, это очень хорошо. Есть две проблемы: мне нужно удалить оператор '.take (1)' из цепочки, иначе он никогда не сделает второй HTTP-запрос. Вторая проблема заключается в том, что я хотел бы, чтобы recache был вызван, когда другая subsribe() вызывается после RECACHE_INTERVAL, а не каждый RECACHE_INTERVAL. – Martin
@Martin Я обновил свой ответ, чтобы запросы выполнялись только тогда, когда другой абонент подписывается после RECACHE_INTERVAL. Надеюсь, это поможет – Ashley