2015-09-23 3 views
1

Я новичок в использовании rxcpp и пытается получить что-то функциональное вместе в следующем сценарии:Планирование и тайм-аута обращения с rxcpp

У меня есть один источник данных, который будет извлекать команды из отдельного источника, код I» m write вернет эти команды в наблюдаемый rxcpp. У него есть особое условие: если никакая команда не была получена в течение определенного периода времени, функция onError для подписчиков будет запущена вместо onNext, но таймаут может произойти только до получения первой команды. После получения первой команды тайм-аут не может произойти независимо от того, сколько времени потребуется для получения каких-либо дополнительных команд.

Я пытаюсь сделать это с чем-то вроде этого:

auto timeout = rxcpp::observable<>::timer(std::chrono::steady_clock::now() + timeout, 
          rxcpp::observe_on_event_loop()).map([](int val) // Note, converts the value type of the timer observable and converts timeouts to errors 
{ 
    std::cout << "TIMED OUT!" << std::endl; 
    throw std::runtime_error("timeout"); 
    return command_type(); 
}); 
auto commands = timeout.amb(rxcpp::observe_on_event_loop(), createCommandSource(event_loop_scheduler, ...)); 

У меня есть проблема в том, что тайм-аут происходит до того, как команды мь, даже если они inserterd далеко, прежде чем произойдет тайм-аут. Я экспериментировал с расписанием от 1000 до 5000 мс, и это не имеет никакого значения. Если я удалю код тайм-аута, команда будет получена немедленно. Я подозреваю, что, вероятно, я просто неправильно понял, как использовать планировщики в rxcpp, хотя мне интересно, как это можно сделать.

+0

Привет! Мне нужно посмотреть, как createCommandSource создает наблюдаемое его возвращение –

ответ

1

Я написал простой createCommandSource. Это сработало для меня:

#include "rxcpp/rx.hpp" 
using namespace rxcpp; 
using namespace rxcpp::sources; 
using namespace rxcpp::util; 

using namespace std; 

struct command_type {}; 

int main() 
{ 
    auto eventloop = rxcpp::observe_on_event_loop(); 
    auto createCommandSource = [=]() { 
     return rxcpp::observable<>::interval(std::chrono::seconds(1), eventloop).map([](long) {return command_type(); }); 
    }; 
    auto timeout = rxcpp::observable<>::timer(eventloop.now() + std::chrono::seconds(2), eventloop).map([](long) // Note, converts the value type of the timer observable and converts timeouts to errors 
    { 
     std::cout << "TIMED OUT!" << std::endl; 
     throw std::runtime_error("timeout"); 
     return command_type(); 
    }); 
    auto commands = timeout.amb(eventloop, createCommandSource().take(5)); 

    commands 
     .as_blocking().subscribe(
     [](command_type) {printf("command\n"); }, 
     [](std::exception_ptr) {printf("execption\n"); }); 

    std::this_thread::sleep_for(std::chrono::seconds(2)); 

    return 0; 
} 
Смежные вопросы