2013-11-07 3 views
2

Я старая собака, пытающаяся выучить новый трюк. Я очень хорошо знаком с языком PowerBuilder и на этом языке, когда вы хотите делать асинхронно, вы создаете объект в новом потоке. Я повторяю, что: весь объект создается в отдельном потоке и имеет другой контекст выполнения. Любые и все методы этого объекта выполняются в контексте этого отдельного потока.Резьбовые и асинхронные операции в C#

Ну, теперь я пытаюсь реализовать некоторое асинхронное выполнение с использованием C#, а модель потоковой передачи в .NET чувствует себя совершенно по-другому. Похоже, что я создаю объекты в одном потоке, но могу указать (по принципу «по вызову»), что некоторые методы выполняются в другом потоке.

Разница кажется тонкой, но меня это расстраивает. Мое старинное мышление говорит: «У меня есть помощник по имени Боб. Боб уходит и делает все». Если я это понимаю, новое школьное мышление - это «I am Боб. Если мне нужно, я иногда могу погладить живот и погладить мою голову в то же время».

Моя проблема с кодированием в реальном мире: я пишу интерфейс, который принимает сообщения через TCP, анализирует их на полезные данные, а затем помещает эти данные в базу данных. Сообщение «Parsing» занимает примерно одну секунду. В зависимости от анализируемых данных операция базы данных может занимать менее секунды или может занять десять секунд. (Все время составлены, чтобы прояснить проблему.)

Мое мышление старой школы говорит мне, что мой класс базы данных должен жить в отдельной теме и иметь что-то вроде ConcurrentQueue. Он просто вращается в этой очереди, обрабатывая все, что может быть там. С другой стороны, Parser должен будет вводить сообщения в эту очередь. Этими сообщениями были бы (делегаты?) Такие вещи, как «Создать заказ на основе данных в this object» или «Обновить заказ на основе данных в this object». Возможно, стоит отметить, что я действительно хочу обработать «сообщения» в «очереди» в строгом однопоточном порядке FIFO.

В принципе, мое соединение с базой данных не всегда может идти в ногу с моим парсером. Мне нужен способ убедиться, что мой парсер не замедляется, пока мои процессы базы данных пытаются догнать. Совет?

- изменить: с кодом! Все и все говорят мне использовать BlockingCollection. Итак, вот Краткое описание: пояснение цели и кода для этого:

Это будет сервис Windows. При запуске он будет порождать несколько «сред», причем каждая «среда» содержит один «dbworker» и один «интерфейс». В «интерфейсе» будет один «парсер» и один «слушатель».

class cEnvironment { 
    private cDBWorker MyDatabase; 
    private cInterface MyInterface; 

    public void OnStart() { 
     MyDatabase = new cDBWorker(); 
     MyInterface = new cInterface(); 

     MyInterface.OrderReceived += this.InterfaceOrderReceivedEventHandler; 

     MyDatabase.OnStart(); 
     MyInterface.OnStart(); 
    } 

    public void OnStop() { 
     MyInterface.OnStop(); 
     MyDatabase.OnStop(); 

     MyInterface.OrderReceived -= this.InterfaceOrderReceivedEventHandler; 
    } 

    void InterfaceOrderReceivedEventHandler (object sender, OrderReceivedEventArgs e) { 
     MyDatabase.OrderQueue.Add (e.Order); 
    } 
} 

class cDBWorker { 
    public BlockingCollection<cOrder> OrderQueue = new BlockingCollection<cOrder>(); 
    private Task ProcessingTask; 

    public void OnStart() { 
     ProcessingTask = Task.Factory.StartNew (() => Process(), TaskCreationOptions.LongRunning); 
    } 

    public void OnStop() { 
     OrderQueue.CompleteAdding(); 
     ProcessingTask.Wait(); 
    } 

    public void Process() { 
     foreach (cOrder Order in OrderQueue.GetConsumingEnumerable()) { 
      switch (Order.OrderType) { 
       case 1: 
        SuperFastMethod (Order); 
        break; 

       case 2: 
        ReallySlowMethod (Order); 
        break; 
      } 
     } 
    } 

    public void SuperFastMethod (cOrder Order) { 
    } 

    public void ReallySlowMethod (cOrder Order) { 
    } 
} 

class cInterface { 
    protected cListener MyListener; 
    protected cParser MyParser; 

    public void OnStart() { 
     MyListener = new cListener(); 
     MyParser = new cParser(); 

     MyListener.DataReceived += this.ListenerDataReceivedHandler; 
     MyListener.OnStart(); 
    } 

    public void OnStop() { 
     MyListener.OnStop(); 
     MyListener.DataReceived -= this.ListenerDataReceivedHandler; 
    } 

    public event OrderReceivedEventHandler OrderReceived; 

    protected virtual void OnOrderReceived (OrderReceivedEventArgs e) { 
     if (OrderReceived != null) 
      OrderReceived (this, e); 
    } 

    void ListenerDataReceivedHandler (object sender, DataReceivedEventArgs e) { 
     foreach (string Message in MyParser.GetMessages (e.RawData)) { 
      OnOrderReceived (new OrderReceivedEventArgs (MyParser.ParseMessage (Message))); 
     } 
    } 

Он компилируется. (СУДЬЯ ЭТО!) Но значит ли это, что я делаю это правильно?

+0

Итак, вы пробовали реализовать это? Похоже, вы хотя бы знаете, с чего начать. На стороне примечание 'BlockingCollection' может быть более подходящим, чем явная параллельная очередь (он будет использовать один внутри). – Servy

+0

Также обратите внимание, что потоки практически всегда будут использовать одно и то же пространство памяти.Нити на самом низком уровне не «владеют» объектом, но программисты часто (и оправданно) применяют свои собственные ограничения, что данный объект используется только в одном потоке, что упрощает рассуждение об этом объекте. Хотя вы * можете * использовать данный объект в нескольких потоках, часто это не рекомендуется делать (за исключением объектов * built *, которые будут использоваться несколькими потоками, например, «BlockingCollection»), или данных, которые «только для чтения». – Servy

+0

Для моего понимания очередь уже отделяет обработку - с одной стороны вы ее асинхронно заполняете, а с другой стороны вы асинхронно вытягиваете ее. Кажется безопасным - два компонента не блокируют друг друга. – pasty

ответ

3

BlockingCollection делает ввод такого рода вещь вместе довольно легко:

// the queue 
private BlockingCollection<Message> MessagesQueue = new BlockingCollection<Message>(); 


// the consumer 
private MessageParser() 
{ 
    foreach (var msg in MessagesQueue.GetConsumingEnumerable()) 
    { 
     var parsedMessage = ParseMessage(msg); 
     // do something with the parsed message 
    } 
} 

// In your main program 
// start the consumer 
var consumer = Task.Factory.StartNew(() => MessageParser(), 
    TaskCreationOptions.LongRunning); 

// the main loop 
while (messageAvailable) 
{ 
    var msg = GetMessageFromTcp(); 
    // add it to the queue 
    MessagesQueue.Add(msg); 
} 

// done receiving messages 
// tell the consumer that no more messages will be added 
MessagesQueue.CompleteAdding(); 

// wait for consumer to finish 
consumer.Wait(); 

Потребитель делает незанятое ожидание по очереди, так что это не есть ресурсы процессора, когда нет ничего доступно.

+0

Дополнительный престиж [Servy] (http://stackoverflow.com/users/1159478/servy), [Эрик Липперт] (http://stackoverflow.com/users/88656/eric-lippert) и [Скотт Чемберлен] (http://stackoverflow.com/users/80274/scott-chamberlain)! Исходное сообщение отредактировано с окончательным кодом. – Jason

Смежные вопросы