2013-03-18 2 views
2

У меня есть моя C# форма, запускающая два потока, один поток прослушивает данные для входа, а другой обрабатывает данные, поэтому я могу их использовать. по какой-то причине, как только поток процесса начинается, поток прослушивания больше не выполняется.Многопоточные конфликты

Thread th1 = new Thread(new ThreadStart(zeroMQConn.Listen)); 
th1.Start(); 
Thread th2 = new Thread(() => ProcessData(zeroMQConn)); 
th2.Start(); 

, когда я отладки этого начинается Th1 переходит в него, а затем начинает Th2 и он никогда не возвращается к Th1 и мои данные возвращается нуль.

public void Listen() 
    { 
     while (true) 
     { 
      try 
      { 
       byte[] zmqBuffer = new byte[102400]; 
       int messageLength; 
       lockForZMQ.EnterWriteLock(); 
       messageLength = socket.Receive(zmqBuffer); 
       lockForZMQ.ExitWriteLock(); 
       byte[] message = new byte[messageLength]; 
       Buffer.BlockCopy(zmqBuffer, 0, message, 0, messageLength); 
       PriceBookData priceBook = PriceBookData.CreateBuilder().MergeFrom(message).Build(); 
       double Type = priceBook.GetPb(0).QuoteType; 
       if (Type == 0.0) 
       { 
        lockForList.EnterWriteLock(); 
        CachedBidBooks = priceBook; 
        lockForList.ExitWriteLock(); 
       } 
       else 
       { 
        lockForList.EnterWriteLock(); 
        CachedAskBooks = priceBook; 
        lockForList.ExitWriteLock(); 
       } 
      } 
      catch (ZmqException ex) 
      { 
       MessageBox.Show(ex.Message); 
      } 
     } 
    } 

public void ProcessData(object connection) 
    { 
     while (true) 
     { 
      priceBookData = ((ZeroMQClass)connection).GetPriceBook(); 
     } 

    } 

public List<PriceBookData> GetPriceBook() 
    { 
     List<PriceBookData> AskAndBid = new List<PriceBookData>(); 
     lockForList.EnterWriteLock(); 
     if (CachedAskBooks != null && CachedBidBooks != null) 
     { 
      AskAndBid.Add(CachedBidBooks); 
      AskAndBid.Add(CachedAskBooks); 
      CachedBidBooks = null; 
      CachedAskBooks = null; 
      lockForList.ExitWriteLock(); 
      return AskAndBid; 
     } 
     lockForList.ExitWriteLock(); 
     return null; 
    } 
+1

Вам необходимо предоставить код для обоих методов потоков. – alex

+0

показать нам код – nsconnector

+0

Не нужно делать это на своей собственной потоковой передаче, чтобы обновить результат. Используйте привязку данных для привязки вашего представления к данным, а вы только в th2, обновите свой источник данных, а поток пользовательского интерфейса автоматически обновит представление. – David

ответ

3

У вас здесь модель производителя-потребителя, но вы не синхронизируете их должным образом. Проблема в том, что вместо какого-то буфера или сбора данных, которые готовы к обработке, у вас есть одна переменная, и вы полностью синхронизируете доступ к этой переменной. Это означает, что производитель не может работать, пока потребитель работает.

BlockingCollection<T> является сказочным классом, когда имеет дело с очередями производителей/потребителей.

var queue = new BlockingCollection<PriceBookData>(); 

Task.Factory.StartNew(() => 
{ 
    while (true) 
    { 
     byte[] zmqBuffer = new byte[102400]; 
     int messageLength; 
     socket.Receive(zmqBuffer); 
     byte[] message = new byte[messageLength]; 
     Buffer.BlockCopy(zmqBuffer, 0, message, 0, messageLength); 
     PriceBookData priceBook = PriceBookData.CreateBuilder().MergeFrom(message).Build(); 
     double Type = priceBook.GetPb(0).QuoteType; 
     queue.Add(priceBook); 
    } 
}, TaskCreationOptions.LongRunning); 

Task.Factory.StartNew(() => 
{ 
    foreach (var item in queue.GetConsumingEnumerable()) 
    { 
     //do stuff with item 
    } 
}, TaskCreationOptions.LongRunning); 
+0

Вам нужна какая-то петля в первом «StartNew»? Когда код сидит, не выйдет ли он после одной итерации? –

+0

@chuex Справа, добавлено. – Servy

+0

, так это будет сделано внутри функции прослушивания или это будет вместо функции прослушивания? – dstew

Смежные вопросы