Я новичок в использовании rxcpp и пытается получить что-то функциональное вместе в следующем сценарии:Планирование и тайм-аута обращения с rxcpp
У меня есть один источник данных, который будет извлекать команды из отдельного источника, код I» m write вернет эти команды в наблюдаемый rxcpp. У него есть особое условие: если никакая команда не была получена в течение определенного периода времени, функция onError для подписчиков будет запущена вместо onNext, но таймаут может произойти только до получения первой команды. После получения первой команды тайм-аут не может произойти независимо от того, сколько времени потребуется для получения каких-либо дополнительных команд.
Я пытаюсь сделать это с чем-то вроде этого:
auto timeout = rxcpp::observable<>::timer(std::chrono::steady_clock::now() + timeout,
rxcpp::observe_on_event_loop()).map([](int val) // Note, converts the value type of the timer observable and converts timeouts to errors
{
std::cout << "TIMED OUT!" << std::endl;
throw std::runtime_error("timeout");
return command_type();
});
auto commands = timeout.amb(rxcpp::observe_on_event_loop(), createCommandSource(event_loop_scheduler, ...));
У меня есть проблема в том, что тайм-аут происходит до того, как команды мь, даже если они inserterd далеко, прежде чем произойдет тайм-аут. Я экспериментировал с расписанием от 1000 до 5000 мс, и это не имеет никакого значения. Если я удалю код тайм-аута, команда будет получена немедленно. Я подозреваю, что, вероятно, я просто неправильно понял, как использовать планировщики в rxcpp, хотя мне интересно, как это можно сделать.
Привет! Мне нужно посмотреть, как createCommandSource создает наблюдаемое его возвращение –