2017-01-21 1 views
1

Используя библиотеку futures-rs, я столкнулся с ситуацией, когда поток должен отображаться через неопределенное количество других потоков до того, как он будет возвращен пользователю. Поскольку точный тип выходного потока неизвестен в конце этой операции, я использовал объект-атрибут BoxStream при сохранении потока в структуре и при его возврате.futures-rs с использованием комбинаторов потоков на `BoxStream`s

Несмотря на то, что этот подход работает нормально, он имеет неприятный побочный эффект, в результате чего внутренний объект Stream будет нечувствительным. Это проблема, потому что каждыйstream combinators требует Self: Sized в своих подписях, что означает, что я не могу даже wait() по возвращенному BoxStream, чтобы преобразовать его в блокирующий итератор.

Вот пример ситуации, которая может привести к этому вопросу:

struct Server { 
    receiver: Option<Box<Stream<Item = usize, Error =()> + Send>>, 
} 

impl Server { 
    pub fn new() -> Server { 
     let (tx, rx) = channel(0); 
     // do things with the tx (subscribe to tcp socket, connect to database, etc.) 
     Server { receiver: Some(rx.boxed()) } 
    } 

    /// Maps the inner `Receiver` through another stream, essentially duplicating it. 
    pub fn get_stream(&mut self) -> Result<Box<Stream<Item = usize, Error =()> + Send>,()> { 
     let (tx, rx) = channel(0); 

     let strm = self.receiver.take().unwrap(); 
     let mut tx_opt = Some(tx); 
     let new_strm = strm.map(move |msg| { 
      // unfortunate workaround needed since `send()` takes `self` 
      let mut tx = tx_opt.take().unwrap(); 
      tx = tx.send(msg.clone()).wait().unwrap(); 
      tx_opt = Some(tx); 
      msg 
     }); 
     simbroker.receiver = Some(new_strm.boxed()); 

     Ok(rx.boxed()) 
    } 
} 

pub fn main() { 
    let server = Server::new(); 

    // possible that this may happen 0..n times 
    let rx: BoxStream<usize,()> = server.get_stream(); 

    // can't do this since the inner `Stream` trait object isn't `Sized` and `wait()` 
    // (along with all other stream combinators) requires that in their signatures. 
    for msg in rx.wait() { 
     // compiler error here 
     // ... 
    } 
} 

Как показано на приведенном выше коде, BoxStream s необходимы, так как вызова map() на поток меняет свой тип от Receiver к Map, который будет сделать невозможным сохранение обратно в структуру. С возвращенным BoxStream сделать невозможно, так как это ?Sized. Фактически, функция только, которая доступна для объекта-объекта Stream, равна poll(), который, как предполагается, никогда не будет вызываться за пределами Task.

Есть ли способ избежать этой проблемы, не делая что-то вроде возврата перечисления, содержащего любую из возможных разновидностей потока, которая может произойти? Будет ли писать мою собственную структуру, которая реализует Stream, как-нибудь исправить проблему?

+2

Я не смог воспроизвести вашу проблему (после устранения непонятных проблем). Там 'impl Stream для Box ' в фьючерсах-rs, которые должны заставить это работать. Убедитесь, что ваш вопрос содержит [MCVE] (http://stackoverflow.com/help/mcve) и сообщение об ошибке, которое вы получаете. –

+0

Ohhh; в этом конкретном тесте я не делал 'use futures :: Stream'; rustc столкнулся с нестандартной ошибкой из-за того, что 'impl ' живет внутри модуля 'Stream' как' wait() ', поэтому компиляция с ошибкой до того, как она даже заметила' wait() ', не была в области. Большое спасибо @ FrancisGagné! Если вы хотите сделать это в ответ, я соглашусь на это как на решение; если нет, я сделаю это сам. – Ameo

ответ

0

Как отметил @ FrancisGagné в комментарии, futures-rs объявляет impl<S: ?Sized + Stream> Stream for Box<S> в модуле futures::Stream. В тесте, в котором был мой код, мне не удалось импортировать Stream, чтобы черта не была в области.

Компилятор не вызвал ошибку из-за отсутствия функции wait(), потому что у нее была проблема с неизвестностью.

Это решение было разрешено путем добавления use futures::Stream; в начало функции.

Смежные вопросы