2017-02-16 4 views
19

Я использую RsJS 5 (5.0.1) для кэширования в Angular 2. Он работает хорошо.Реактивное кэширование HTTP-сервиса

Мясо функции кэширования является:

const observable = Observable.defer(
    () => actualFn().do(() => this.console.log('CACHE MISS', cacheKey)) 
) 
    .publishReplay(1, this.RECACHE_INTERVAL) 
    .refCount().take(1) 
    .do(() => this.console.log('CACHE HIT', cacheKey)); 

actualFn является this.http.get('/some/resource').

Как я уже сказал, это отлично работает для меня. Кэш возвращается из наблюдаемого в течение RECACHE_INTERVAL. Если после этого интервала будет сделан запрос, будет вызван actualFn().

Что я пытаюсь выяснить, когда истекает RECACHE_INTERVAL, а actualFn() называется —, как вернуть последнее значение. Существует промежуток времени между истечением RECACHE_INTERVAL и повторным воспроизведением actualFn(), что наблюдаемое не возвращает значение. Я хотел бы избавиться от этого разрыва во времени и всегда возвращать последнее значение.

Я мог бы использовать побочный эффект и сохранить последний вызов хорошего значения .next(lastValue), ожидая ответа HTTP-ответа, но это кажется наивным. Я хотел бы использовать способ «RxJS», чистое решение функции —, если это возможно.

ответ

2

Обновленный ответ:

Если всегда хотите использовать предыдущее значение в то время как новый запрос делается затем может поставить еще один объект в цепи, которая держит самое последнее значение.

Затем вы можете повторить значение, чтобы можно было узнать, исходило ли оно из кеша или нет. Затем абонент может отфильтровать кешированные значения, если они не заинтересованы в них.

// Take values while they pass the predicate, then return one more 
// i.e also return the first value which returned false 
const takeWhileInclusive = predicate => src => 
    src 
    .flatMap(v => Observable.from([v, v])) 
    .takeWhile((v, index) => 
    index % 2 === 0 ? true : predicate(v, index) 
) 
    .filter((v, index) => index % 2 !== 1); 

// Source observable will still push its values into the subject 
// even after the subscriber unsubscribes 
const keepHot = subject => src => 
    Observable.create(subscriber => { 
    src.subscribe(subject); 

    return subject.subscribe(subscriber); 
    }); 

const cachedRequest = request 
    // Subjects below only store the most recent value 
    // so make sure most recent is marked as 'fromCache' 
    .flatMap(v => Observable.from([ 
    {fromCache: false, value: v}, 
    {fromCache: true, value: v} 
    ])) 
    // Never complete subject 
    .concat(Observable.never()) 
    // backup cache while new request is in progress 
    .let(keepHot(new ReplaySubject(1))) 
    // main cache with expiry time 
    .let(keepHot(new ReplaySubject(1, this.RECACHE_INTERVAL))) 
    .publish() 
    .refCount() 
    .let(takeWhileInclusive(v => v.fromCache)); 

    // Cache will be re-filled by request when there is another subscription after RECACHE_INTERVAL 
    // Subscribers will get the most recent cached value first then an updated value 

https://acutmore.jsbin.com/kekevib/8/edit?js,console

Оригинальный ответ:

Вместо того чтобы устанавливать размер окна на replaySubject - вы можете изменить источник наблюдаемую повторить после задержки.

const observable = Observable.defer(
    () => actualFn().do(() => this.console.log('CACHE MISS', cacheKey)) 
) 
    .repeatWhen(_ => _.delay(this.RECACHE_INTERVAL)) 
    .publishReplay(1) 
    .refCount() 
    .take(1) 
    .do(() => this.console.log('CACHE HIT', cacheKey)); 

Оператор repeatWhen требует RxJs-beta12 или выше https://github.com/ReactiveX/rxjs/blob/master/CHANGELOG.md#500-beta12-2016-09-09

+0

Спасибо за предложение, это очень хорошо. Есть две проблемы: мне нужно удалить оператор '.take (1)' из цепочки, иначе он никогда не сделает второй HTTP-запрос. Вторая проблема заключается в том, что я хотел бы, чтобы recache был вызван, когда другая subsribe() вызывается после RECACHE_INTERVAL, а не каждый RECACHE_INTERVAL. – Martin

+0

@Martin Я обновил свой ответ, чтобы запросы выполнялись только тогда, когда другой абонент подписывается после RECACHE_INTERVAL. Надеюсь, это поможет – Ashley

2

Смотрите эту демо: https://jsbin.com/todude/10/edit?js,console

Обратите внимание, что я пытаюсь получить кэшированные результаты в 1200ms когда дело аннулируется, а затем на 1300ms, когда предыдущий запрос все еще находится на рассмотрении (требуется 200ms). Оба результата получены так, как они должны.

Это происходит потому, что, когда вы подписываетесь и publishReplay() не содержит никакого действительного значения, он ничего не испустит и не завершится немедленно (спасибо take(1)), поэтому ему необходимо подписаться на его источник, который делает HTTP-запросы (это фактически происходит в refCount()).

Тогда второй абонент ничего не получит и будет добавлен к массиву наблюдателей в publishReplay().Он не будет делать другую подписку, потому что он уже подписан на свой источник (refCount()) и ждет ответа.

Так что ситуация, о которой вы описываете, не должна произойти, я думаю. Со временем вы получите демоверсию, демонстрирующую вашу проблему.

РЕДАКТИРОВАТЬ:

испуская как деталь признана недействительной и свежие продукты

Следующий пример показывает немного другой, чем функциональность связанного примера. Если кешированный отклик недействителен, он будет выходить в любом случае, а затем получит также новое значение. Это означает, что абонент получает один или два значения:

  • 1 Значение: кэшируются значение значения
  • 2: аннулированное кэшированное значение, а затем новым свежее значение, которое будет кэшированным с этим момента.

Код может выглядеть следующим образом:

let counter = 1; 
const RECACHE_INTERVAL = 1000; 

function mockDataFetch() { 
    return Observable.of(counter++) 
    .delay(200); 
} 

let source = Observable.defer(() => { 
    const now = (new Date()).getTime(); 

    return mockDataFetch() 
    .map(response => { 
     return { 
     'timestamp': now, 
     'response': response, 
     }; 
    }); 
}); 

let updateRequest = source 
    .publishReplay(1) 
    .refCount() 
    .concatMap(value => { 
    if (value.timestamp + RECACHE_INTERVAL > (new Date()).getTime()) { 
     return Observable.from([value.response, null]); 
    } else { 
     return Observable.of(value.response); 
    } 
    }) 
    .takeWhile(value => value); 


setTimeout(() => updateRequest.subscribe(val => console.log("Response 0:", val)), 0); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 50:", val)), 50); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 200:", val)), 200); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1200:", val)), 1200); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1300:", val)), 1300); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1500:", val)), 1500); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3500:", val)), 3500); 

Смотреть демо: https://jsbin.com/ketemi/2/edit?js,console

Печатает на консоль следующий вывод:

Response 0: 1 
Response 50: 1 
Response 200: 1 
Response 1200: 1 
Response 1300: 1 
Response 1200: 2 
Response 1300: 2 
Response 1500: 2 
Response 3500: 2 
Response 3500: 3 

Уведомление 1200 и 1300 получил сначала старое кешированное значение 1 немедленно a затем другое значение со свежим значением 2.
С другой стороны, 1500 получил только новое значение, потому что 2 уже кэширован и действителен.

Самая путаная вещь, вероятно, почему я использую concatMap().takeWhile(). Это связано с тем, что я должен убедиться, что новый ответ (а не недействительный) является последним значением перед отправкой полного уведомления, и, вероятно, для этого нет оператора (ни first(), ни takeWhile() не применимы для этого прецедента).

испуская только текущий элемент, не дожидаясь обновлений

еще один потребительной случай может быть, когда мы хотим, чтобы излучать только кэшированные значения в то время, не дожидаясь свежим ответ от запроса HTTP.

let counter = 1; 
const RECACHE_INTERVAL = 1000; 

function mockDataFetch() { 
    return Observable.of(counter++) 
    .delay(200); 
} 

let source = Observable.defer(() => { 
    const now = (new Date()).getTime(); 

    return mockDataFetch() 
    .map(response => { 
     return { 
     'timestamp': now, 
     'response': response, 
     }; 
    }); 
}); 

let updateRequest = source 
    .publishReplay(1) 
    .refCount() 
    .concatMap((value, i) => { 
    if (i === 0) { 
     if (value.timestamp + RECACHE_INTERVAL > (new Date()).getTime()) { // is cached item valid? 
     return Observable.from([value.response, null]); 
     } else { 
     return Observable.of(value.response); 
     } 
    } 
    return Observable.of(null); 
    }) 
    .takeWhile(value => value); 


setTimeout(() => updateRequest.subscribe(val => console.log("Response 0:", val)), 0); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 50:", val)), 50); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 200:", val)), 200); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1200:", val)), 1200); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1300:", val)), 1300); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 1500:", val)), 1500); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3500:", val)), 3500); 
setTimeout(() => updateRequest.subscribe(val => console.log("Response 3800:", val)), 3800); 

Смотреть демо: https://jsbin.com/kebapu/2/edit?js,console

Этот пример выводит на консоль:

Response 0: 1 
Response 50: 1 
Response 200: 1 
Response 1200: 1 
Response 1300: 1 
Response 1500: 2 
Response 3500: 2 
Response 3800: 3 

Обратите внимание, что оба 1200 и 1300 получить значение 1, потому что это кэшированное значение, хотя сейчас это недопустимо. Первый вызов в 1200 просто порождает новый HTTP-запрос без ожидания его ответа и испускает только кешированное значение. Затем в 1500 свежая ценность кэшируется, так что она просто переиздается. То же самое относится к 3500 и 3800.

Обратите внимание, что подписчик на 1200 получит уведомление next, но уведомление complete будет отправлено только после завершения HTTP-запроса. Нам нужно подождать, потому что если мы отправим complete сразу после next, это сделает цепочку для размещения своих расходных материалов, которая также должна отменить HTTP-запрос (это то, что мы определенно не хотим делать).

+0

Мартин, спасибо за ответ. Я полностью понимаю, как работает моя текущая реализация. То, о чем я не понимаю, последнее предложение - это «ситуация», вы имеете в виду мое желаемое поведение или мое существующее рабочее поведение? – Martin

+0

@Martin Мое плохое, я хотел написать «никогда не должно быть». Я не думаю, что у вас может быть пустое окно, где ваши подписчики ничего не получают. – martin

+0

OK Я вижу - я не был достаточно конкретным в своем вопросе. «Разрыв» возникает, когда новый наблюдатель подписывается после RECACHE_INTERVAL. Не обновляйте вопрос, чтобы уточнить. Спасибо за это. – Martin

2

Практически любая сложная логика быстро выходит из-под контроля, если вы используете простой rxjs. Я бы предпочел реализовать пользовательский оператор cache с нуля, вы можете использовать это gist в качестве примера.

Смежные вопросы