2013-02-25 4 views
-1

У меня есть список объектов в моем приложении C# 4.0. Предположим, что этот список содержит 100 объектов студенческого класса. Есть ли способ в Reactive Framework для параллельного выполнения по 10 объектов каждый раз за раз?Возможно ли это с помощью Reactive Framework?

Каждый объект-ученик запускает метод, который занимает некоторое время в течение примерно 10-15 секунд. Итак, первый раз, возьмите первые 10 студенческих объектов из списка и дождитесь, пока все 10 студенческих объектов закончат свою работу, а затем возьмут следующие 10 объектов-учеников и так далее, пока они не завершат полные элементы в списках?

  1. У меня есть List<Student> со 100 счетами.
  2. Сначала возьмите 10 элементов из списков и вызовите метод длинного запуска каждого объекта параллельно.
  3. Получает каждое возвращаемое значение объектов и обновляет пользовательский интерфейс [часть подписки].
  4. Следующий раунд начинается только в том случае, если первые 10 раундов завершают и освобождают всю память.
  5. Повторите тот же процесс для всех элементов в списках.
  6. Как поймать ошибки в каждом процессе?
  7. Как освободить ресурсы каждого ресурса объекта и другие ресурсы из памяти?
  8. Каков наилучший способ сделать все это в Reactive Framework?

ответ

0

Моя попытка ....

var students = new List<Student>(); 
{....} 
var cancel = students 
    .ToObservable(Scheduler.Default) 
    .Window(10) 
    .Merge(1) 
    .Subscribe(tenStudents => 
    { 
     tenStudents.ObserveOn(Scheduler.Default) 
      .Do(x => DoSomeWork(x)) 
      .ObserverOnDispatcher() 
      .Do(tenStudents => UpdateUI(tenStudents)) 
      .Subscribe();    
    }); 
+0

Спасибо Арон. Не могли бы вы объяснить код ur? большое спасибо – user2017793

+0

Довольно просто. Окно (10) преобразует произведение в куски 10. Слияние (1) работает на одном потоке. Преобразуйте 10 учеников во внутреннее наблюдение. Сделай, эм ... сделай некоторую работу. ObserveOnDispatcher() вернется к потоку пользовательского интерфейса на следующем бите. Ум ... работайте над UpdatingUI. Наконец, подпишитесь на внутреннее наблюдаемое. Промыть и повторить. – Aron

+0

Еще раз спасибо Арон. Мое сомнение заключалось в том, как выпустить каждые 10 объектных ресурсов учеников. Ваше объяснение было очень полезно и большое спасибо за это. Я беспокоюсь о проблемах с памятью. Помогите мне. – user2017793

1

Эта версия всегда будет иметь 10 студентов, работающих одновременно. Когда студент закончит, начнется другая. И по мере того как каждый студент заканчивает работу, вы можете справиться с любой его ошибкой, а затем очистить ее (это произойдет до того, как следующий студент начнет работу).

students 
    .ToObservable() 
    .Select(student => Observable.Defer(() => Observable.Start(() => 
     { 
      // do the work for this student, then return a Tuple of the student plus any error 
      try 
      { 
       student.DoWork(); 
       return { Student = student, Error = (Exception)null }; 
      } 
      catch (Exception e) 
      { 
       return { Student = student, Error = e }; 
      } 
     }))) 
    .Merge(10) // let 10 students be executing in parallel at all times 
    .Subscribe(studentResult => 
    { 
     if (studentResult.Error != null) 
     { 
      // handle error 
     } 

     studentResult.Student.Dispose(); // if your Student is IDisposable and you need to free it up. 
    }); 

Это не точно, что вы просили, так как он не заканчивает первую партию 10 до начала следующей партии. Это всегда ведет 10 параллельно. Если вы действительно хотите партии из 10, я настрою код для этого.

+0

Спасибо за ответ. Я очень новичок в RX и пытаюсь изучить его. Вы все очень помогаете. В этом решении я вижу, что он принимает только 10 учеников, и после этого 10 учеников заканчивают, не могут видеть код для следующих 10 учеников (например, партии), пока он не охватит 100 учеников. В подборке учащегося всего 100 человек. Поскольку все 100 учеников, работающих параллельно, могут вызвать исключение из памяти, мой план состоял в том, чтобы запустить учеников в партиях по 10. Пожалуйста, помогите. – user2017793

+0

'.Merge (10)' делает это. Подумайте об этом, как вышибала в баре. Это позволяет только 10 ученикам одновременно. Как только один из этих учеников окончится и уйдет, осталось всего 9 студентов, поэтому «Слияние» позволит другому. Он будет продолжать делать это до тех пор, пока все 100 студентов не будут обработаны. – Brandon

+0

Спасибо за помощь. Это действительно занимает 10 студентов @ время и принять еще 10 в следующий раз. Когда он принимает партию из 10, существует ли какой-либо метод в Rx для запуска этих 10 учеников, которые могут работать параллельно. Вышеупомянутое решение ожидает завершения каждой партии, а затем только вызывает подписку. В подписке у меня есть другое, чтобы запустить некоторое событие для обновления пользовательского интерфейса в клиенте wcf. Но ожидание полного выселения каждой партии убивает время. Я ищу что-то более быстрое в RX. Есть ли способ запустить элементы в параллельной параллели? – user2017793

0

Это для меня очень похоже на проблему для TPL. У вас есть известный набор данных в состоянии покоя. Вы хотите разбить некоторую тяжелую обработку, чтобы работать параллельно, и вы хотите иметь возможность пакетно обрабатывать нагрузку.

Я не вижу нигде в вашей проблеме источника, который является асинхронным, источником данных в движении или потребителем, который должен быть реактивным. Это моё обоснование для предположения, что вместо этого вы используете TPL.

Отдельное примечание, почему магическое число 10 для обработки параллельно? Это бизнес-требование или потенциальная попытка оптимизировать производительность? Как правило, лучше всего позволить TaskPool выработать то, что лучше всего подходит для клиентского ЦП, основанного на количестве ядер и текущей нагрузке. Я полагаю, что это становится все более важным с большими вариациями в Устройствах и их структурах процессора (одноядерный, многоядерный, многоядерный, маломощный/отключенный ядро ​​и т. Д.).

Вот один из способов вы можете сделать это в LINQPad (но обратите внимание на отсутствие Rx)

void Main() 
{ 
    var source = new List<Item>(); 
    for (int i = 0; i < 100; i++){source.Add(new Item(i));} 

    //Put into batches of ten, but only then pass on the item, not the temporary tuple construct. 
    var batches = source.Select((item, idx) =>new {item, idx}) 
         .GroupBy(tuple=>tuple.idx/10, tuple=>tuple.item); 

    //Process one batch at a time (serially), but process the items of the batch in parallel (concurrently). 
    foreach (var batch in batches) 
    { 
     "Processing batch...".Dump(); 
     var results = batch.AsParallel().Select (item => item.Process()); 
     foreach (var result in results) 
     { 
      result.Dump(); 
     } 
     "Processed batch.".Dump(); 
    } 
} 


public class Item 
{ 
    private static readonly Random _rnd = new Random(); 
    private readonly int _id; 
    public Item(int id) 
    { 
     _id = id; 
    } 

    public int Id { get {return _id;} } 

    public double Process() 
    { 
     var threadId = Thread.CurrentThread.ManagedThreadId; 
     string.Format("Processing on thread:{0}", threadId).Dump(Id); 
     var loopCount = _rnd.Next(10000,1000000); 
     Thread.SpinWait(loopCount); 
     return _rnd.NextDouble(); 
    } 
    public override string ToString() 
    { 
     return string.Format("Item:{0}", _id); 
    } 
} 

мне было бы интересно узнать, если у вас есть проблемы данных в движении или реакционноспособные потребительская проблема, но просто «опрокинул» вопрос, чтобы было легче объяснить.

Смежные вопросы