2017-01-23 2 views
1

Я пытаюсь использовать Future.rs для управления некоторыми задачами в отдельном процессе. Я вижу, как ждать каждого созданного будущего и как обрабатывать их один за другим, но я не смог опросить будущее во время его исполнения, чтобы узнать его состояние. У меня всегда есть ошибка:Как опросить будущее состояние, не дожидаясь?

thread 'main' panicked at 'no Task is currently running'

Я хочу что-то сделать во время будущей обработки, пока она не закончится. Возможно, я не использую это правильно? Мне удалось заставить его работать, используя канал, но я думаю, что должно быть возможным опросить будущее и когда он будет готов получить результат. код я использую, чтобы проверить это:

fn main() { 
    println!("test future"); 
    let thread_pool = CpuPool::new(4); 
    let mut future_execution_list = vec![]; 
    let mutex = Arc::new(AtomicUsize::new(0)); 
    //create the future to process 
    for _ in 0..10 { 
     let send_mutex = mutex.clone(); 
     let future = thread_pool.spawn_fn(move || { 
      //Simulate long processing 
      thread::sleep(time::Duration::from_millis(10)); 
      let num = send_mutex.load(Ordering::Relaxed); 
      send_mutex.store(num + 1, Ordering::Relaxed); 
      let res: Result<usize,()> = Ok(num); 
      res 

     }); 
     future_execution_list.push(future); 
    } 
    // do the job 
    loop { 
     for future in &mut future_execution_list { 
      match future.poll() { 
       Ok(Async::NotReady) =>(), //do nothing 
       Ok(Async::Ready(num)) => { 
        //update task status 
        println!(" future {:?}", num); 
       } 
       Err(_) => { 
        //log error and set task status to err 
        () 
       } 
      }; 
     } 
     //do something else 
    } 
} 

Так я заканчиваю свой вопрос после Shepmaster ответа. Ваши замечания очень интересны, но я до сих пор не могу найти решение проблемы. Я добавлю некоторые сведения о своей проблеме. Я хочу запланировать задачи на автоматике, которые могут управлять несколькими задачами одновременно. Существует цикл, в котором управляются события, и вычисляется планирование задач. Когда задание запланировано, оно появляется. Когда задача заканчивается, выполняется новое планирование. Во время выполнения задачи управление событиями. Код speudo может быть:

loop { 
    event.try_recv() { ...} //manage user command for exemple 
    if (schedule) { 
     let tasks_to_spawn = schedule_task(); 
     let futures = tasks_to_spawn.map(|task| { 
      thread_pool.spawn_fn(....)}); 
     let mut one = future::select_all(futures); 
     while let Ok((value, _idx, remaining)) = one.wait() {..} //wait here 
    } 
    //depend on task end state and event set schedule to true or false. 

} 

я могу совместное планирование и задачи в будущем, как:

let future = schedule.and_them(|task| execute_task); 

Но мне еще нужно ждать окончания выполнения первой задачи. Я могу поместить все в будущее (управление событиями, расписание, задание) и ждать первого, который заканчивается, как вы предлагаете. Я пытаюсь, но я не видел, как сделать vec будущего с разными типами Item и Error. И с этой концепцией мне нужно управлять большими данными между потоками. Управление событиями и планирование не должны выполняться в другом потоке.

Я вижу другую проблему, select_all берет на себя ответственность за vec. Если новая задача должна быть запланирована во время выполнения другого, как я могу изменить vec и добавить новое будущее?

Не знаю, есть ли у вас простое решение. Я думал, что было просто получить состояние будущего во время его выполнения с помощью метода, такого как isDone(), не дожидаясь. Возможно, это запланировано, я не видел PR об этом. Если у вас есть простое решение, было бы здорово, иначе я переосмыслил свою концепцию.

+0

Существует 99,9% вероятность того, что вы сделаете **, а не ** хотите использовать атомные переменные таким образом. Вместо этого вы хотите 'fetch_add', и подавляющее большинство людей не хотят упорядочивать« Relaxed ». – Shepmaster

+0

Справа я просто скопирую/вставлю код, чтобы показать, что я хочу получить результат от будущего исполнения, который зависит от исполнения других фьючерсов. –

+0

Это [* крайне плохая форма *, чтобы изменить ваш вопрос ** после получения ответов **, особенно если изменение отменяет эти ответы] (http://meta.stackoverflow.com/q/309237/155423). Вопрос о том, чтобы задать вопрос, который включает в себя любые соответствующие детали с самого начала, задается вопросом. – Shepmaster

ответ

2

Для опроса Future у вас должен быть Task. Чтобы получить Task, вы можете опросить a Future, переведённый на futures::executor::spawn(). Если вы переписывания loop вашего примера вот так:

futures::executor::spawn(futures::lazy(|| { 
    // existing loop goes here 
})).wait_future(); 

Он работает.

Как почемуFuture можно опрашивать только в задаче, я считаю, что это так, что опрос может назвать Task::Unpark.

0

I want to do something during the future processing

Как я понимаю, это то, что фьючерсы являются - вещи, которые могут произойти параллельно. Если вы хотите сделать что-то еще, сделайте еще одно будущее и бросьте его!

Вы в основном уже делаете это - каждый из ваших потоков «делает что-то еще».

poll the future and when it's ready get the result

Использование future::select_all, вы можете объединить несколько фьючерсов и получить в зависимости от того финиширует первым. Тогда вам решать подождать следующего.

Один потенциал реализации:

extern crate rand; 
extern crate futures; 
extern crate futures_cpupool; 

use rand::Rng; 
use futures::{future, Future}; 
use futures_cpupool::CpuPool; 

use std::{thread, time}; 

fn main() { 
    let thread_pool = CpuPool::new(4); 

    let futures = (0..10).map(|i| { 
     thread_pool.spawn_fn(move || -> Result<usize,()> { 
      let mut rng = rand::thread_rng(); 
      // Simulate long processing 
      let sleep_time = rng.gen_range(10, 100); 
      let sleep_time = time::Duration::from_millis(sleep_time); 
      for _ in 0..10 { 
       println!("Thread {} sleeping", i); 
       thread::sleep(sleep_time); 
      } 
      Ok(i) 
     }) 
    }); 

    let mut one = future::select_all(futures); 
    while let Ok((value, _idx, remaining)) = one.wait() { 
     println!("Future #{} finished", value); 
     if remaining.is_empty() { 
      break; 
     } 
     one = future::select_all(remaining); 
    } 
} 

Во время вызова wait, несколько вещей, которые происходят одновременно! Это можно увидеть на чередование выхода:

Thread 2 sleeping 
Thread 0 sleeping 
Thread 3 sleeping 
Thread 1 sleeping 
Thread 3 sleeping 
Thread 0 sleeping 
Thread 1 sleeping 
Thread 2 sleeping 
Thread 3 sleeping 

Вы можете убедиться в том, что вещи происходят параллельно, установив время сна на 1 секунду для каждого потока и синхронизации всей программы. Поскольку 10 фьючерсов, взяв 1 секунду, с параллелизмом 4, общая программа занимает 3 секунды для запуска.


Bonus отзыв Код:

  1. Не разрывать и погрузочные установки атомные переменный для реализации приращения - сохраненного значения может быть изменен другим потоком между двумя вызовами. Используйте fetch_add.
  2. Вы действительно, действительно должен знать what the orderings mean перед использованием. Я их не знаю, поэтому я всегда использую SeqCst.
  3. Так как это не важно для этого примера, я полностью удалил атомную переменную.
  4. Всегда предпочитайте собирать в Vec вместо того, чтобы вставлять его внутрь петли. Это позволяет использовать более оптимизированное распределение.
  5. В этом случае нет необходимости в Vec, поскольку select_all принимает все, что может быть преобразовано в итератор.
Смежные вопросы