Используя библиотеку futures-rs
, я столкнулся с ситуацией, когда поток должен отображаться через неопределенное количество других потоков до того, как он будет возвращен пользователю. Поскольку точный тип выходного потока неизвестен в конце этой операции, я использовал объект-атрибут BoxStream
при сохранении потока в структуре и при его возврате.futures-rs с использованием комбинаторов потоков на `BoxStream`s
Несмотря на то, что этот подход работает нормально, он имеет неприятный побочный эффект, в результате чего внутренний объект Stream
будет нечувствительным. Это проблема, потому что каждыйstream combinators требует Self: Sized
в своих подписях, что означает, что я не могу даже wait()
по возвращенному BoxStream
, чтобы преобразовать его в блокирующий итератор.
Вот пример ситуации, которая может привести к этому вопросу:
struct Server {
receiver: Option<Box<Stream<Item = usize, Error =()> + Send>>,
}
impl Server {
pub fn new() -> Server {
let (tx, rx) = channel(0);
// do things with the tx (subscribe to tcp socket, connect to database, etc.)
Server { receiver: Some(rx.boxed()) }
}
/// Maps the inner `Receiver` through another stream, essentially duplicating it.
pub fn get_stream(&mut self) -> Result<Box<Stream<Item = usize, Error =()> + Send>,()> {
let (tx, rx) = channel(0);
let strm = self.receiver.take().unwrap();
let mut tx_opt = Some(tx);
let new_strm = strm.map(move |msg| {
// unfortunate workaround needed since `send()` takes `self`
let mut tx = tx_opt.take().unwrap();
tx = tx.send(msg.clone()).wait().unwrap();
tx_opt = Some(tx);
msg
});
simbroker.receiver = Some(new_strm.boxed());
Ok(rx.boxed())
}
}
pub fn main() {
let server = Server::new();
// possible that this may happen 0..n times
let rx: BoxStream<usize,()> = server.get_stream();
// can't do this since the inner `Stream` trait object isn't `Sized` and `wait()`
// (along with all other stream combinators) requires that in their signatures.
for msg in rx.wait() {
// compiler error here
// ...
}
}
Как показано на приведенном выше коде, BoxStream
s необходимы, так как вызова map()
на поток меняет свой тип от Receiver
к Map
, который будет сделать невозможным сохранение обратно в структуру. С возвращенным BoxStream
сделать невозможно, так как это ?Sized
. Фактически, функция только, которая доступна для объекта-объекта Stream
, равна poll()
, который, как предполагается, никогда не будет вызываться за пределами Task
.
Есть ли способ избежать этой проблемы, не делая что-то вроде возврата перечисления, содержащего любую из возможных разновидностей потока, которая может произойти? Будет ли писать мою собственную структуру, которая реализует Stream
, как-нибудь исправить проблему?
Я не смог воспроизвести вашу проблему (после устранения непонятных проблем). Там 'impl Stream для Box
' в фьючерсах-rs, которые должны заставить это работать. Убедитесь, что ваш вопрос содержит [MCVE] (http://stackoverflow.com/help/mcve) и сообщение об ошибке, которое вы получаете. –Ohhh; в этом конкретном тесте я не делал 'use futures :: Stream'; rustc столкнулся с нестандартной ошибкой из-за того, что 'impl' живет внутри модуля 'Stream' как' wait() ', поэтому компиляция с ошибкой до того, как она даже заметила' wait() ', не была в области. Большое спасибо @ FrancisGagné! Если вы хотите сделать это в ответ, я соглашусь на это как на решение; если нет, я сделаю это сам. –
Ameo