2010-02-19 4 views
4

У меня есть класс работает модель Producer-Consumer так:C#, как только основной сон нити, все нити остановились

public class SyncEvents 
{ 
    public bool waiting; 

    public SyncEvents() 
    { 
     waiting = true; 
    } 
} 

public class Producer 
{ 
    private readonly Queue<Delegate> _queue; 
    private SyncEvents _sync; 
    private Object _waitAck; 

    public Producer(Queue<Delegate> q, SyncEvents sync, Object obj) 
    { 
     _queue = q; 
     _sync = sync; 
     _waitAck = obj; 
    } 

    public void ThreadRun() 
    { 
     lock (_sync) 
     { 
      while (true) 
      { 
       Monitor.Wait(_sync, 0); 
       if (_queue.Count > 0) 
       { 
        _sync.waiting = false; 
       } 
       else 
       { 
        _sync.waiting = true; 
        lock (_waitAck) 
        { 
         Monitor.Pulse(_waitAck); 
        } 
       } 
       Monitor.Pulse(_sync); 
      } 
     } 
    } 

} 

public class Consumer 
{ 
    private readonly Queue<Delegate> _queue; 
    private SyncEvents _sync; 

    private int count = 0; 

    public Consumer(Queue<Delegate> q, SyncEvents sync) 
    { 
     _queue = q; 
     _sync = sync; 
    } 

    public void ThreadRun() 
    { 
     lock (_sync) 
     { 
      while (true) 
      { 
       while (_queue.Count == 0) 
       { 
        Monitor.Wait(_sync); 
       } 

       Delegate query = _queue.Dequeue(); 
       query.DynamicInvoke(null); 

       count++; 

       Monitor.Pulse(_sync); 
      } 
     } 
    } 
} 

/// <summary> 
/// Act as a consumer to the queries produced by the DataGridViewCustomCell 
/// </summary> 
public class QueryThread 
{ 
    private SyncEvents _syncEvents = new SyncEvents(); 
    private Object waitAck = new Object(); 
    private Queue<Delegate> _queryQueue = new Queue<Delegate>(); 

    Producer queryProducer; 
    Consumer queryConsumer; 

    public QueryThread() 
    { 
     queryProducer = new Producer(_queryQueue, _syncEvents, waitAck); 
     queryConsumer = new Consumer(_queryQueue, _syncEvents); 

     Thread producerThread = new Thread(queryProducer.ThreadRun); 
     Thread consumerThread = new Thread(queryConsumer.ThreadRun); 

     producerThread.IsBackground = true; 
     consumerThread.IsBackground = true; 

     producerThread.Start(); 
     consumerThread.Start(); 
    } 

    public bool isQueueEmpty() 
    { 
     return _syncEvents.waiting; 
    } 

    public void wait() 
    { 
     lock (waitAck) 
     { 
      while (_queryQueue.Count > 0) 
      { 
       Monitor.Wait(waitAck); 
      } 
     } 
    } 

    public void Enqueue(Delegate item) 
    { 
     _queryQueue.Enqueue(item); 
    } 
} 

код запуск гладко, но функция ожидания(). В каком-то случае я хочу подождать, пока все функции в очереди не будут завершены, поэтому я сделал функцию wait().

Производитель будет запускать импульс waitAck в соответствующее время.

Однако, когда линия «Monitor.Wait (waitAck)»; запускается в функции wait(), все остановки потока, включая поток производителя и потребителя.

Зачем это происходит и как я могу его решить? благодаря!

+2

Я не могу сказать, не зная кода для двух других классов. – 2010-02-19 06:35:25

+0

@ mr.LiKaShing дайте мне знать, если мое решение поможет вам очистить все это. Это действительно простой и чистый код, который я написал для одного из моих других проектов, с небольшими изменениями, которые я сделал, чтобы он делал то, что вам кажется интересным. С немного большей модификацией, и она может работать в вашем проекте;). – Kiril

+0

@ mr.LiKaShing Кстати, я только заметил, что во всем вашем коде вы всегда запираетесь перед циклом while: 'lock (sync) {while (true) {...}}'. Это ОЧЕНЬ плохая практика, потому что вы НИКОГДА не освобождаете блокировку, и если кто-то еще пытается заблокировать один и тот же объект 'sync', то он всегда будет вызывать тупик. – Kiril

ответ

1

кажется очень маловероятным, что все потоки будут фактически остановить, хотя я должен отметить, что, чтобы избежать ложного пробуждения окна, вы должны, вероятно, время цикла вместо если заявления:

lock (waitAck) 
{ 
    while(queryProducer.secondQueue.Count > 0) 
    { 
     Monitor.Wait(waitAck); 
    } 
} 

Того факт, что вы звоните Monitor.Wait означает, что waitAck должен быть выпущен, чтобы он не предотвращал блокировку потребительских потоков ...

Не могли бы вы дать более подробную информацию о том, как потоки производителей/потребителей «останавливаются»? Похоже, они только что зашли в тупик?

Ваш производитель использует Notify или NotifyAll? Теперь у вас есть лишний поток ожидания, поэтому, если вы используете только Notify, это будет только выпуск одного потока ... трудно понять, есть ли проблема без подробностей ваших классов Producer и Consumer.

Если бы вы могли показать короткую, но полную программу, чтобы продемонстрировать проблему, это поможет.

EDIT: Хорошо, теперь вы разместили код, который я могу увидеть целый ряд вопросов:

  • Имея так много общих переменных является рецепт катастрофы. Ваши классы должны инкапсулировать свои функциональные возможности, чтобы другой код не должен был сотрясаться для реализации бит и частей. (Например, ваш код вызова здесь действительно не должен иметь доступ к очереди.)

  • Вы добавляете элементы непосредственно ко второй очереди, что означает, что вы не можете эффективно разбудить производителя, чтобы добавить их в первая очередь. Почему у вас даже несколько очередей?

  • Вы всегда ждете на _sync в потоке производителя ... почему? С чего это будет уведомлять? Вообще говоря, производитель нить не должно ждать, пока вы не имеете ограниченный буфер

  • У вас есть статических переменной (_waitAck), который переписывается каждый раз, когда вы создаете новый экземпляр. Это плохая идея.

Вы также не показали свой класс SyncEvents - это то, что имел в виду, что делать что-нибудь интересное?

Честно говоря, кажется, что у вас есть довольно странный дизайн - вы вполне можете быть лучше снова начинать с нуля. Попробуйте инкапсулировать всю очередь производитель/потребитель в одном классе, который имеет Produce и Consume методы, а также WaitForEmpty (или что-то подобное). Я думаю, вы найдете логику синхронизации намного проще.

+0

Теперь я изменил код. thanks =] –

+0

извините за грязный код. На самом деле оригинальная версия не такая, я испортил ее после того, как попал в тупик. Теперь я убираю код и надеюсь, что вы сможете заглянуть в него, поскольку проблема все еще там. –

+0

И в классе производителя он не должен ждать, так как Monitor.Wait (_sync, 0) задал параметр таймаута 0. Это для выпуска блокировки для потребительского класса. –

1

Вот мой взгляд на ваш код:

public class ProducerConsumer 
{ 
    private ManualResetEvent _ready; 
    private Queue<Delegate> _queue; 
    private Thread _consumerService; 
    private static Object _sync = new Object(); 

    public ProducerConsumer(Queue<Delegate> queue) 
    { 
     lock (_sync) 
     { 
      // Note: I would recommend that you don't even 
      // bother with taking in a queue. You should be able 
      // to just instantiate a new Queue<Delegate>() 
      // and use it when you Enqueue. There is nothing that 
      // you really need to pass into the constructor. 
      _queue = queue; 
      _ready = new ManualResetEvent(false); 
      _consumerService = new Thread(Run); 
      _consumerService.IsBackground = true; 
      _consumerService.Start(); 
     } 
    } 

    public override void Enqueue(Delegate value) 
    { 
     lock (_sync) 
     { 
      _queue.Enqueue(value); 
      _ready.Set(); 
     } 
    } 

    // The consumer blocks until the producer puts something in the queue. 
    private void Run() 
    { 
     Delegate query; 
     try 
     { 
      while (true) 
      { 
       _ready.WaitOne(); 
       lock (_sync) 
       { 
        if (_queue.Count > 0) 
        { 
         query = _queue.Dequeue(); 
         query.DynamicInvoke(null); 
        } 
        else 
        { 
         _ready.Reset(); 
         continue; 
        } 
       } 
      } 
     } 
     catch (ThreadInterruptedException) 
     { 
      _queue.Clear(); 
      return; 
     } 
    } 


    protected override void Dispose(bool disposing) 
    { 
     lock (_sync) 
     { 
      if (_consumerService != null) 
      { 
       _consumerService.Interrupt(); 
      } 
     } 
     base.Dispose(disposing); 
    } 


} 

Я не совсем уверен, что вы пытаетесь достичь с помощью функции ожидания ... Я предполагаю, что вы пытаетесь поставить некоторые тип ограничения количества элементов, которые могут быть поставлены в очередь. В этом случае просто бросайте исключение или возвращайте сигнал сбоя, когда у вас слишком много элементов в очереди, клиент, вызывающий Enqueue, будет продолжать повторять попытку, пока очередь не сможет взять больше элементов. Принятие оптимистического подхода избавит вас от множества головных болей, и это просто поможет вам избавиться от множества сложной логики.

Если вы действительно хотите иметь ждать там, то я, вероятно, может помочь вам понять лучший подход. Дайте мне знать, чего вы пытаетесь достичь с ожиданием, и я помогу вам.

Примечание: Я взял этот код из одного из моих проектов, немного изменил его и разместил здесь ... могут быть некоторые незначительные синтаксические ошибки, но логика должна быть правильной.

UPDATE: На основании ваших комментариев, которые я сделал некоторые изменения: Я добавил еще ManualResetEvent к классу, поэтому, когда вы звоните BlockQueue() это дает вам событие, которое вы можете ждать, и устанавливает флаг, чтобы остановить функцию Епдиеей из упорядочивая больше элементов. Когда все запросы в очереди обслуживаются, флаг устанавливается в true, и событие _wait установлено так, что любой, кто его ждет, получает сигнал.

public class ProducerConsumer 
{ 
    private bool _canEnqueue; 
    private ManualResetEvent _ready; 
    private Queue<Delegate> _queue; 
    private Thread _consumerService; 

    private static Object _sync = new Object(); 
    private static ManualResetEvent _wait = new ManualResetEvent(false); 

    public ProducerConsumer() 
    { 
     lock (_sync) 
     { 
      _queue = new Queue<Delegate> _queue; 
      _canEnqueue = true; 
      _ready = new ManualResetEvent(false); 
      _consumerService = new Thread(Run); 
      _consumerService.IsBackground = true; 
      _consumerService.Start(); 
     } 
    } 

    public bool Enqueue(Delegate value) 
    { 
     lock (_sync) 
     { 
      // Don't allow anybody to enqueue 
      if(_canEnqueue) 
      { 
       _queue.Enqueue(value); 
       _ready.Set(); 
       return true; 
      } 
     } 
     // Whoever is calling Enqueue should try again later. 
     return false; 
    } 

    // The consumer blocks until the producer puts something in the queue. 
    private void Run() 
    { 
     try 
     { 
      while (true) 
      { 
       // Wait for a query to be enqueued 
       _ready.WaitOne(); 

       // Process the query 
       lock (_sync) 
       { 
        if (_queue.Count > 0) 
        { 
         Delegate query = _queue.Dequeue(); 
         query.DynamicInvoke(null); 
        } 
        else 
        { 
         _canEnqueue = true; 
         _ready.Reset(); 
         _wait.Set(); 
         continue; 
        } 
       } 
      } 
     } 
     catch (ThreadInterruptedException) 
     { 
      _queue.Clear(); 
      return; 
     } 
    } 

    // Block your queue from enqueuing, return null 
    // if the queue is already empty. 
    public ManualResetEvent BlockQueue() 
    { 
     lock(_sync) 
     { 
      if(_queue.Count > 0) 
      { 
       _canEnqueue = false; 
       _wait.Reset(); 
      } 
      else 
      { 
       // You need to tell the caller that they can't 
       // block your queue while it's empty. The caller 
       // should check if the result is null before calling 
       // WaitOne(). 
       return null; 
      } 
     } 
     return _wait; 
    } 

    protected override void Dispose(bool disposing) 
    { 
     lock (_sync) 
     { 
      if (_consumerService != null) 
      { 
       _consumerService.Interrupt(); 
       // Set wait when you're disposing the queue 
       // so that nobody is left with a lingering wait. 
       _wait.Set(); 
      } 
     } 
     base.Dispose(disposing); 
    } 
} 
+0

Спасибо! Это более аккуратная реализация. Моя ситуация: работает основной поток (UI), фоновый поток, запрашивающий данные из базы данных. В сценарии пользовательский интерфейс хочет экспортировать данные, однако фоновый поток все еще работает для извлечения данных, я хочу подождать, пока фоновый поток завершит всю свою работу, поэтому мне нужна функция ожидания. Я попытался улучшить свой код в соответствии с советами Джона, и я нашел, что он работает ..... Только если функция, указанная делегатом, не мешает компоненту, лежит в основном потоке .... может быть, потому что основной поток блокируется Monitor.Wait() –

+0

И я пытаюсь использовать другой подход для решения проблемы. –

+0

@ mr.LiKaShing Я предполагаю, что у вас есть способ сохранить поток пользовательского интерфейса, когда вы заставляете его ждать запросы для заполнения. Также помните о сложной ситуации здесь: вы не останавливаете функцию Enqueue от очередей запросов, поэтому ваше ожидание может быть расширено совсем немного. – Kiril

Смежные вопросы