2009-07-25 2 views
26

Я использую Enterprise Library 4 в одном из моих проектов для ведения журнала (и других целей). Я заметил, что для регистрации, которую я делаю, существует некоторая стоимость, которую я могу смягчить, выполнив ведение журнала в отдельном потоке.Как эффективно регистрировать асинхронно?

То, как я делаю это сейчас, это то, что я создаю объект LogEntry, а затем я вызываю BeginInvoke на делегат, который вызывает Logger.Write.

new Action<LogEntry>(Logger.Write).BeginInvoke(le, null, null); 

Что я действительно хочу сделать, это добавить лог сообщение в очередь и затем один поток тяговых экземпляров LogEntry из очереди и выполнение операции журнала. Преимущество этого заключалось в том, что ведение журнала не мешает выполнению операции, а не каждая операция ведения журнала приводит к тому, что задание запускается в пуле потоков.

Как создать общую очередь, которая поддерживает множество писателей и одного считывателя потоковым безопасным способом? Некоторые примеры реализации очереди, которые предназначены для поддержки многих авторов (без создания синхронизации/блокировки) и одного читателя, будут действительно оценены.

Рекомендация относительно альтернативных подходов также будет оценена, но я не заинтересован в изменении рамок регистрации.

+0

@spoon, я добавил немного улучшенная версия, имейте в виду, что вам придется много раз протестировать ее, прежде чем использовать ее, потому что я просто ее сбил. –

+1

Хотя этот вопрос не применим непосредственно к этому вопросу (так как он говорит EntLib 4), корпоративная библиотека 6 теперь поддерживает асинхронное ведение журнала. –

ответ

28

Я написал этот код некоторое время назад, не стесняйтесь использовать его.

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading; 

namespace MediaBrowser.Library.Logging { 
    public abstract class ThreadedLogger : LoggerBase { 

     Queue<Action> queue = new Queue<Action>(); 
     AutoResetEvent hasNewItems = new AutoResetEvent(false); 
     volatile bool waiting = false; 

     public ThreadedLogger() : base() { 
      Thread loggingThread = new Thread(new ThreadStart(ProcessQueue)); 
      loggingThread.IsBackground = true; 
      loggingThread.Start(); 
     } 


     void ProcessQueue() { 
      while (true) { 
       waiting = true; 
       hasNewItems.WaitOne(10000,true); 
       waiting = false; 

       Queue<Action> queueCopy; 
       lock (queue) { 
        queueCopy = new Queue<Action>(queue); 
        queue.Clear(); 
       } 

       foreach (var log in queueCopy) { 
        log(); 
       } 
      } 
     } 

     public override void LogMessage(LogRow row) { 
      lock (queue) { 
       queue.Enqueue(() => AsyncLogMessage(row)); 
      } 
      hasNewItems.Set(); 
     } 

     protected abstract void AsyncLogMessage(LogRow row); 


     public override void Flush() { 
      while (!waiting) { 
       Thread.Sleep(1); 
      } 
     } 
    } 
} 

Некоторые преимущества:

  • Он держит фон регистратор жив, так что не нужно раскручивать и спином вниз темы.
  • Он использует один поток для обслуживания очереди, что означает, что никогда не будет ситуации, когда 100 потоков обслуживают очередь.
  • Она копирует очередь, чтобы обеспечить очередь не блокируются, пока операция журнала выполняются
  • Он использует AutoResetEvent для обеспечения Б.Г. поток находится в состоянии ожидания
  • Это, имхо, очень легко следовать

Вот несколько улучшенная версия, имейте в виду, что я провел очень мало испытаний на ней, но она затрагивает несколько мелких проблем.

public abstract class ThreadedLogger : IDisposable { 

    Queue<Action> queue = new Queue<Action>(); 
    ManualResetEvent hasNewItems = new ManualResetEvent(false); 
    ManualResetEvent terminate = new ManualResetEvent(false); 
    ManualResetEvent waiting = new ManualResetEvent(false); 

    Thread loggingThread; 

    public ThreadedLogger() { 
     loggingThread = new Thread(new ThreadStart(ProcessQueue)); 
     loggingThread.IsBackground = true; 
     // this is performed from a bg thread, to ensure the queue is serviced from a single thread 
     loggingThread.Start(); 
    } 


    void ProcessQueue() { 
     while (true) { 
      waiting.Set(); 
      int i = ManualResetEvent.WaitAny(new WaitHandle[] { hasNewItems, terminate }); 
      // terminate was signaled 
      if (i == 1) return; 
      hasNewItems.Reset(); 
      waiting.Reset(); 

      Queue<Action> queueCopy; 
      lock (queue) { 
       queueCopy = new Queue<Action>(queue); 
       queue.Clear(); 
      } 

      foreach (var log in queueCopy) { 
       log(); 
      }  
     } 
    } 

    public void LogMessage(LogRow row) { 
     lock (queue) { 
      queue.Enqueue(() => AsyncLogMessage(row)); 
     } 
     hasNewItems.Set(); 
    } 

    protected abstract void AsyncLogMessage(LogRow row); 


    public void Flush() { 
     waiting.WaitOne(); 
    } 


    public void Dispose() { 
     terminate.Set(); 
     loggingThread.Join(); 
    } 
} 

Преимущество по сравнению с оригинальным:

  • Это одноразовая, так что вы можете избавиться от асинхронного регистратора
  • скрытой семантика улучшенной
  • Он будет реагировать немного лучше пачку с последующей по тишине
+2

Ваш флаг ожидания должен быть изменчивым, если он используется из нескольких потоков - или использовать блокировку при доступе к ней. Я не уверен, что вижу смысл ждать 10 секунд, а не просто ждать без тайм-аута. Я бы также вызвал Set, пока вы все еще держите блокировку - на самом деле это не будет иметь большого значения, но вы можете получить два потока, добавляя события, а затем один поток, определяющий событие, читатель завершает ожидание и копирование данных * затем * другой писатель снова устанавливает событие. По правде говоря, никаких больших потерь. Почему у вас есть переменная экземпляра loggingThread? –

+1

10-секундное ожидание - это избежать голодания, в случае, если вы получаете пакет событий, частично обслуживаемых. (получение событий сразу после копирования очереди). Я должен, вероятно, изменить это на событие ручного сброса и, возможно, добавить некоторую логику для повторной службы очереди до ее действительно пустой, прежде чем ждать снова. Thread var - это всего лишь крошечный бит будущей проверки, который действительно не нужен (в случае, если я хочу получить идентификатор потока или что-то в этом роде). –

+0

внесли поправки в код для учета комментариев Джона, спасибо! –

0

Дополнительный уровень косвенности может помочь здесь.

Ваш первый вызов метода асинхронного вызова может помещать сообщения в синхронизированную очередь и устанавливать событие - поэтому блокировки выполняются в пуле потоков, а не в ваших рабочих потоках, а затем есть еще один поток, вытягивающий сообщения с очереди при возникновении события.

0

Если вы регистрируете что-то в отдельном потоке, сообщение может не быть записано, если приложение сработает, что делает его бесполезным.

Причина в том, почему вы должны всегда скрывать после каждой записи.

+0

Это правда, я не использую этот метод для регистрации исключений или другого критического ведения журнала. Я использую его только для протоколирования сообщений о состоянии информации, описывающих успешно выполненные операции. Эти сообщения, как правило, происходят гораздо чаще, и мне нужно минимизировать их стоимость. –

9

Да, вам нужна очередь производителей/потребителей. У меня есть один пример этого в моем учебнике по разметке - если вы посмотрите мою страницу "deadlocks/monitor methods", вы найдете код во второй половине.

Есть много других примеров в Интернете, конечно - и .NET 4.0 будет поставляться с одним в рамках тоже (скорее более полно, чем у меня!). В .NET 4.0 вы, вероятно, обернете ConcurrentQueue<T> в BlockingCollection<T>.

Версия на этой странице является неэквивалентной (было написано long), но вы, вероятно, захотите сделать ее обобщенной - было бы тривиально делать.

Вы можете назвать Produce из каждой «нормальной» нити и Consume из одного потока, просто зацикливая и регистрируя все, что он потребляет. Вероятно, проще всего сделать поток потребителя фоновым потоком, поэтому вам не нужно беспокоиться о «остановке» очереди при выходе из приложения. Это означает, что есть удаленная возможность пропустить окончательную запись в журнале, хотя (если это происходит на полпути через ее запись, когда приложение выходит) - или даже больше, если вы создаете быстрее, чем может потреблять/регистрировать.

+0

Можете ли вы ссылаться на документацию .NET 4, описывающую это? –

+0

@ ложка16: Сделано. –

+0

спасибо, оцените дополнительные мысли, а также –

0

Если вы имеете в виду очередь SHARED, то я думаю, вам придется синхронизировать записи с ней, нажатиями и попками.

Но я все еще думаю, что стоит обратить внимание на общий дизайн очереди.По сравнению с IO регистрации и, вероятно, по сравнению с другой работой, выполняемой вашим приложением, краткое количество блокировок для нажатий и всплывающих окон, вероятно, не будет значительным.

2

Я предлагаю начали с измерением фактического влияния на производительность лесозаготовок на общей системе (т.е. запустив профайлер) и, возможно переключение на что-то быстрее, как log4net (я лично мигрировал к нему из EntLib регистрации давно).

Если это не работает, вы можете попробовать использовать этот простой метод с .NET Framework:

ThreadPool.QueueUserWorkItem 

очередями способ для выполнения. Метод выполняется, когда поток пула потоков становится доступным.

MSDN Details

Если это не работает, либо, то вы можете прибегнуть к чему-то, как Джон Скит предложил и фактически закодировать структуру асинхронной протоколирования самостоятельно.

+1

Я уже определил, что есть возможность асинхронного ведения журнала. Кроме того, я не в состоянии переключать фреймворки регистрации, поскольку мы широко используем ведение журнала EntLib. Остальная команда не была бы тривиальной или радушной заменой :). ThreadPool.QueueUserWorkItem - это в основном то, что теперь вызывает вызов BeginInvoke, который я использую. Я думаю, что ответ Джона является наиболее подходящим для моей ситуации. –

+1

Признание рабочего элемента не гарантирует никакого заказа, поэтому, если вы заботитесь о порядке регистрации сообщений, то это не будет жизнеспособным подходом ИМО. –

2

Вот что я придумал ... также см. Ответ Сэма Шаффрона. Этот ответ является вики-сообществом в случае возникновения каких-либо проблем, которые люди видят в коде и хотят обновить.

/// <summary> 
/// A singleton queue that manages writing log entries to the different logging sources (Enterprise Library Logging) off the executing thread. 
/// This queue ensures that log entries are written in the order that they were executed and that logging is only utilizing one thread (backgroundworker) at any given time. 
/// </summary> 
public class AsyncLoggerQueue 
{ 
    //create singleton instance of logger queue 
    public static AsyncLoggerQueue Current = new AsyncLoggerQueue(); 

    private static readonly object logEntryQueueLock = new object(); 

    private Queue<LogEntry> _LogEntryQueue = new Queue<LogEntry>(); 
    private BackgroundWorker _Logger = new BackgroundWorker(); 

    private AsyncLoggerQueue() 
    { 
     //configure background worker 
     _Logger.WorkerSupportsCancellation = false; 
     _Logger.DoWork += new DoWorkEventHandler(_Logger_DoWork); 
    } 

    public void Enqueue(LogEntry le) 
    { 
     //lock during write 
     lock (logEntryQueueLock) 
     { 
      _LogEntryQueue.Enqueue(le); 

      //while locked check to see if the BW is running, if not start it 
      if (!_Logger.IsBusy) 
       _Logger.RunWorkerAsync(); 
     } 
    } 

    private void _Logger_DoWork(object sender, DoWorkEventArgs e) 
    { 
     while (true) 
     { 
      LogEntry le = null; 

      bool skipEmptyCheck = false; 
      lock (logEntryQueueLock) 
      { 
       if (_LogEntryQueue.Count <= 0) //if queue is empty than BW is done 
        return; 
       else if (_LogEntryQueue.Count > 1) //if greater than 1 we can skip checking to see if anything has been enqueued during the logging operation 
        skipEmptyCheck = true; 

       //dequeue the LogEntry that will be written to the log 
       le = _LogEntryQueue.Dequeue(); 
      } 

      //pass LogEntry to Enterprise Library 
      Logger.Write(le); 

      if (skipEmptyCheck) //if LogEntryQueue.Count was > 1 before we wrote the last LogEntry we know to continue without double checking 
      { 
       lock (logEntryQueueLock) 
       { 
        if (_LogEntryQueue.Count <= 0) //if queue is still empty than BW is done 
         return; 
       } 
      } 
     } 
    } 
} 
+0

Похоже, вы намерены сделать класс AsyncLogger одиночным. Я бы сделал конструктор закрытым в этом случае, но в противном случае кажется, что он выполняет свою работу. –

+0

спасибо, хороший улов –

1

В ответ на сообщение Сэма Сафронса я хотел вызвать флеш и убедиться, что ev все было действительно закончено.В моем случае я пишу в базу данных в потоке очереди, и все мои события журнала выходили в очередь, но иногда приложение останавливалось, прежде чем все было закончено, что было неприемлемо в моей ситуации. Я изменил несколько кусков вашего кода, но главное я хотел бы поделиться было вровень:

public static void FlushLogs() 
     { 
      bool queueHasValues = true; 
      while (queueHasValues) 
      { 
       //wait for the current iteration to complete 
       m_waitingThreadEvent.WaitOne(); 

       lock (m_loggerQueueSync) 
       { 
        queueHasValues = m_loggerQueue.Count > 0; 
       } 
      } 

      //force MEL to flush all its listeners 
      foreach (MEL.LogSource logSource in MEL.Logger.Writer.TraceSources.Values) 
      {     
       foreach (TraceListener listener in logSource.Listeners) 
       { 
        listener.Flush(); 
       } 
      } 
     } 

Я надеюсь, что кто-то экономит некоторое разочарование. Это особенно заметно в параллельных процессах, регистрирующих множество данных.

Спасибо, что поделились своим решением, оно поставило меня в хорошее направление!

--Johnny S

1

Я хотел сказать, что мой предыдущий пост был отчасти бесполезным. Вы можете просто установить AutoFlush в true, и вам не придется перебирать все слушатели. Тем не менее, у меня все еще была сумасшедшая проблема с параллельными потоками, пытаясь сбросить регистратор. Мне пришлось создать еще одно логическое значение, которое было установлено в true при копировании очереди и выполнении записей LogEntry, а затем в рутине флеша мне пришлось проверить, что логическое значение, чтобы убедиться, что что-то еще не было в очереди, и ничего не обрабатывалось перед возвращением.

Теперь несколько потоков параллельно могут ударить по этой вещи, и когда я вызываю флеш, я знаю, что это действительно покраснело.

 public static void FlushLogs() 
    { 
     int queueCount; 
     bool isProcessingLogs; 
     while (true) 
     { 
      //wait for the current iteration to complete 
      m_waitingThreadEvent.WaitOne(); 

      //check to see if we are currently processing logs 
      lock (m_isProcessingLogsSync) 
      { 
       isProcessingLogs = m_isProcessingLogs; 
      } 

      //check to see if more events were added while the logger was processing the last batch 
      lock (m_loggerQueueSync) 
      { 
       queueCount = m_loggerQueue.Count; 
      }     

      if (queueCount == 0 && !isProcessingLogs) 
       break; 

      //since something is in the queue, reset the signal so we will not keep looping 

      Thread.Sleep(400); 
     } 
    } 
+1

Добро пожаловать в переполнение стека! Здесь мы не работаем, как традиционный форум. Если у вас есть обновление, вы можете (и должны) отредактировать свой оригинальный пост с новой информацией/исправлениями вместо добавления нового ответа. – Pops

1

Просто обновление:

Использование Enteprise библиотеки 5.0 с .NET 4.0 легко может быть сделано:

static public void LogMessageAsync(LogEntry logEntry) 
{ 
    Task.Factory.StartNew(() => LogMessage(logEntry)); 
} 

См: http://randypaulo.wordpress.com/2011/07/28/c-enterprise-library-asynchronous-logging/

+2

Это добавляет поток для сообщения журнала, который * может быть очень дорогостоящим и продуктивным. Как упоминалось в вопросе @ spoon16, очередность пишет массовым образом. –

+0

@ M.Babcock, когда вы говорите, что добавьте поток в сообщение журнала, вы имеете в виду новый поток? – Mukund

+0

@Mukund - Да, если обнаружено, что logging занимает более 200-300 циклов процессора, он, безусловно, выполняется в новом потоке. – Lalman

Смежные вопросы