2017-01-31 1 views
1

Я испытываю странное поведение с cpu pools:ржавчины фьючерсы cpupool: противоречивые объяснения поведения

#[macro_use] 
extern crate lazy_static; 
extern crate tokio_core; 
extern crate futures; 
extern crate futures_cpupool; 

use std::time::Duration; 

use futures_cpupool::{CpuPool, Builder, CpuFuture}; 
use futures::Stream; 
use futures::{Future, future, lazy}; 
use futures::sync::mpsc; 
use futures::Sink; 

lazy_static! { 
    static ref CPU_POOL: CpuPool = { 
     Builder::new() 
     .pool_size(10) 
     .after_start(|| { 
      println!("Pool started one thread"); 
     }) 
     .before_stop(|| { 
      println!("Pool stopped one thread"); 
     }) 
     .create() 
    }; 
    } 

struct Producer {} 

impl Producer { 
    fn search_names(&self) -> Box<Stream<Item = String, Error = String> + Send> { 
     let (mut tx, rx) = mpsc::channel::<Result<String, String>>(1); 

     println!("Creating producer thread..."); 
     let producer_cpu: CpuFuture<(),()> = CPU_POOL.spawn(lazy(move || { 
       println!(" -- Begin to produce names"); 
       for i in 0..10 { 
        match tx.send(Ok("name".to_string())).wait() { 
         Ok(t) => { 
          println!(" -- sent the name"); 
          tx = t 
         } 
         Err(err) => { 
          println!(" -- Error occured sending name! {:?}", err); 
          break; 
         } 
        } 
        std::thread::sleep(Duration::from_secs(1)); 
       } 
       future::ok::<(),()>(()) 
      }) 
      .then(|result| { 
       match result { 
        Ok(data) => println!("Producer finished with data: {:?}", data), 
        Err(err) => println!("Producer finished with error: {:?}", err), 
       } 
       future::ok::<(),()>(()) 
      })); 

     rx.then(|r| r.unwrap()).boxed() 
    } 
} 

fn main() { 
    let producer = Producer {}; 

    let names = CPU_POOL.spawn(producer.search_names() 
     .map(|name| { 
      println!("name = {:?}", name); 
      name 
     }) 
     .collect() 
     .then(|result| { 
      match result { 
       Ok(data) => println!("Finished to read producer {:?}", data), 
       Err(err) => println!("Error reading stream of producer! {:?}", err), 
      } 
      future::ok::<(),()>(()) 
     })); 

    names.wait(); 
} 

Вот соответствующий Cargo.toml

[package] 
name = "example" 
version = "0.1.0" 

[dependencies] 
lazy_static = "^0.1.*" 

tokio-core = "^0.1" 
futures = "^0.1" 
futures-cpupool = "^0.1" 

Я бегу по ржавчине ночам (1.16.0-nightly (df8debf6d 2017-01-25))

Я ожидаю, что эта программа будет генерировать 10 String s, вывести ее через println и выйти. Тем не менее, большую часть времени программа не генерирует String s и выходит нормально, в других случаях корректно генерируются String.

Вот выход первого случая:

Creating producer thread... 
Pool started one thread 
Finished to read producer [] 
Pool started one thread 
Pool started one thread 
Pool started one thread 
Pool started one thread 

И выход при String s получить сгенерированные

Pool started one thread 
Pool started one thread 
Pool started one thread 
Pool started one thread 
Creating producer thread... 
-- Begin to produce names 
-- sent the name 
name = "name" 
Pool started one thread 
-- sent the name 
name = "name" 
Producer finished with data:() 
Finished to read producer ["name", "name"] 

У меня возникло ощущение, что в первом случае, производитель резьбы Безразлично По какой-то причине вы не планируете пул потоков. Я должен что-то упустить, но я не знаю, что.

ответ

0

Причиной проблемы является раннее падение будущего производителя.

По методу search_namesCpuFuture, который производит значение, отбрасывается при возврате search_names. При отбрасывании отменяется CpuFuture, тем самым пропуская производственные значения. Разница в поведении, безусловно, исходит из гонки между падением будущего и ее исполнением.

Решение состоит в том, чтобы ссылаться на будущее производителя по всей заявке, как это:

#[macro_use] 
extern crate lazy_static; 
extern crate tokio_core; 
extern crate futures; 
extern crate futures_cpupool; 

use std::time::Duration; 

use futures_cpupool::{CpuPool, Builder, CpuFuture}; 
use futures::Stream; 
use futures::{Future, future, lazy}; 
use futures::sync::mpsc; 
use futures::Sink; 

lazy_static! { 
static ref CPU_POOL: CpuPool = { 
    Builder::new() 
    .pool_size(5) 
    .after_start(|| { 
     println!("Pool started one thread"); 
    }) 
    .before_stop(|| { 
     println!("Pool stopped one thread"); 
    }) 
    .create() 
}; 
} 

struct Producer {} 

impl Producer { 
    fn search_names(&self) -> (CpuFuture<(),()>, Box<Stream<Item = String, Error = String> + Send>) { 
     let (mut tx, rx) = mpsc::channel::<Result<String, String>>(1); 

     println!("Creating producer thread..."); 
     let producer_cpu: CpuFuture<(),()> = CPU_POOL.spawn(
      lazy(move || { 
       println!(" -- Begin to produce names"); 
       for i in 0..2 { 
        match tx.send(Ok("name".to_string())).wait() { 
         Ok(t) => { 
          println!(" -- sent the name"); 
          tx = t 
         }, 
         Err(err) => { 
          println!(" -- Error occured sending name! {:?}", err); 
          break 
         }, 
        } 
        std::thread::sleep(Duration::from_secs(1)); 
       } 
       future::ok::<(),()>(()) 
      }).then(|result| { 
       match result { 
        Ok(data) => println!("Producer finished with data: {:?}", data), 
        Err(err) => println!("Producer finished with error: {:?}", err), 
       } 
       future::ok::<(),()>(()) 
      }) 
     ); 

     (
      producer_cpu, 
      rx.then(|r| r.unwrap()).boxed() 
     ) 
    } 
} 

fn main() { 
    let producer = Producer {}; 

    let (future, stream) = producer.search_names(); 
    let names = CPU_POOL.spawn(
     stream 
      .map(|name| { 
       println!("name = {:?}", name); 
       name 
      }) 
      .collect() 
      .then(|result| { 
       match result { 
        Ok(data) => println!("Finished to read producer {:?}", data), 
        Err(err) => println!("Error reading stream of producer! {:?}", err) 
       } 
       future::ok::<(),()>(()) 
      }) 
    ); 

    names.wait(); 
} 
+1

Это было бы, почему компилятор говорит вам * предупреждение: не используется переменная 'producer_cpu' * – Shepmaster

Смежные вопросы