2015-07-03 4 views
6

У меня есть проект C#, работающий с входным аудиопотоком из Kinect 1, Kinect 2, Microphone или что-то еще.Как я могу разделить и направить несколько потоков NAudio

waveIn.DataAvailable += (object sender, WaveInEventArgs e) => { 
    lock(buffer){ 
    var pos = buffer.Position; 
       buffer.Write(e.Buffer, 0, e.BytesRecorded); 
       buffer.Position = pos; 
    } 
}; 

Переменный буфер является потоком из компонента А, которые будут обрабатываться SpeechRecognition компонента В, работающих на Потоках.

я добавлять новые компоненты C, D, E, работающий на Streams, чтобы вычислить высоту, обнаруживать звук, делать отпечатки пальцев или что-нибудь еще ...

Как я могу дублировать поток для компонентов C, D, E?

  • Компонент А отправить событие «У меня есть поток делать то, что вы хотите» Я не хочу, чтобы изменить логику с помощью Event «Дайте мне ваши потоки»

  • Я ищу для «MULTISTREAM», которые могли бы дать мне поток экземпляр и будет обращаться с работой

Компонент а

var MultiStream buffer = new MultiStream() 
... 
SendMyEventWith(buffer) 

Компонент В, С, D, Е

public void HandleMyEvent(MultiStream buffer){ 
    var stream = buffer.GetNewStream(); 
    var engine = new EngineComponentB() 
     engine.SetStream(stream); 
} 
  • MultiStream должен быть потоком, чтобы обернуть метод Write() (потому что поток не имеет доступную механику данных)?
  • Если поток является Dispose() компонентом B, MultiStream должен удалить его из массива?
  • MultiStream должен бросить исключение на Read() требует использования GetNewStream()

EDIT: Kinect 1 обеспечить сам поток ... :-(я должен использовать тему pumpit в MULTISTREAM ?

ли кто-нибудь есть такой многопоточных класса?

Благодаря

ответ

1

Я не уверен, что это лучший способ сделать это или что это лучше, чем предыдущий ответ, и я не гарантирую, что этот код идеален , но я закодировал что-то, что буквально вы просили, потому что это было весело - класс MultiStream.

Вы можете найти код для класса здесь: http://pastie.org/10289142

Пример использования:

MultiStream ms = new MultiStream(); 

Stream copy1 = ms.CloneStream(); 
ms.Read(...); 

Stream copy2 = ms.CloneStream(); 
ms.Read(...); 

copy1 и copy2 будут содержать одинаковые данные после того, как в приведенном примере RAN, и они будут продолжать обновляться, как MultiStream написано. Вы можете читать, обновлять позицию и удалять клонированные потоки по отдельности. Если удаленные клонированные потоки будут удалены с MultiStream, а утилизация Multistream закроет все связанные и клонированные потоки (вы можете изменить это, если это не то поведение, которое вы хотите). Попытка написать клонированные потоки вызовет не поддерживаемое исключение.

1

Как-то я не думаю, что потоки действительно соответствуют тому, что вы пытаетесь сделать. вы настраиваете ситуацию, в которой долгое время запуск программы идет t o постоянно увеличивать требования к данным без видимых причин.

Я бы предложил паб/вспомогательную модель, которая публикует полученные аудиоданные подписчикам, предпочтительно используя многопоточный подход, чтобы минимизировать влияние плохого абонента. Некоторые идеи можно найти here.

Я сделал это раньше с классом процессора, который реализует IObserver<byte[]> и использует Queue<byte[]> для хранения блоков выборки до тех пор, пока поток процессов не будет готов для них. Вот являются базовые классы:

public abstract class BufferedObserver<T> : IObserver<T>, IDisposable 
{ 
    private object _lck = new object(); 

    private IDisposable _subscription = null; 
    public bool Subscribed { get { return _subscription != null; } } 

    private bool _completed = false; 
    public bool Completed { get { return _completed; } } 

    protected readonly Queue<T> _queue = new Queue<T>(); 

    protected bool DataAvailable { get { lock(_lck) { return _queue.Any(); } } } 
    protected int AvailableCount { get { lock (_lck) { return _queue.Count; } } } 

    protected BufferedObserver() 
    { 
    } 

    protected BufferedObserver(IObservable<T> observable) 
    { 
     SubscribeTo(observable); 
    } 

    public virtual void Dispose() 
    { 
     if (_subscription != null) 
     { 
      _subscription.Dispose(); 
      _subscription = null; 
     } 
    } 

    public void SubscribeTo(IObservable<T> observable) 
    { 
     if (_subscription != null) 
      _subscription.Dispose(); 
     _subscription = observable.Subscribe(this); 
     _completed = false; 
    } 

    public virtual void OnCompleted() 
    { 
     _completed = true; 
    } 

    public virtual void OnError(Exception error) 
    { } 

    public virtual void OnNext(T value) 
    { 
     lock (_lck) 
      _queue.Enqueue(value); 
    } 

    protected bool GetNext(ref T buffer) 
    { 
     lock (_lck) 
     { 
      if (!_queue.Any()) 
       return false; 
      buffer = _queue.Dequeue(); 
      return true; 
     } 
    } 

    protected T NextOrDefault() 
    { 
     T buffer = default(T); 
     GetNext(ref buffer); 
     return buffer; 
    } 
} 

public abstract class Processor<T> : BufferedObserver<T> 
{ 
    private object _lck = new object(); 
    private Thread _thread = null; 

    private object _cancel_lck = new object(); 
    private bool _cancel_requested = false; 
    private bool CancelRequested 
    { 
     get { lock(_cancel_lck) return _cancel_requested; } 
     set { lock(_cancel_lck) _cancel_requested = value; } 
    } 

    public bool Running { get { return _thread == null ? false : _thread.IsAlive; } } 
    public bool Finished { get { return _thread == null ? false : !_thread.IsAlive; } } 

    protected Processor(IObservable<T> observable) 
     : base(observable) 
    { } 

    public override void Dispose() 
    { 
     if (_thread != null && _thread.IsAlive) 
     { 
      //CancelRequested = true; 
      _thread.Join(5000); 
     } 
     base.Dispose(); 
    } 

    public bool Start() 
    { 
     if (_thread != null) 
      return false; 

     _thread = new Thread(threadfunc); 
     _thread.Start(); 
     return true; 
    } 

    private void threadfunc() 
    { 
     while (!CancelRequested && (!Completed || _queue.Any())) 
     { 
      if (DataAvailable) 
      { 
       T data = NextOrDefault(); 
       if (data != null && !data.Equals(default(T))) 
        ProcessData(data); 
      } 
      else 
       Thread.Sleep(10); 
     } 
    } 

    // implement this in a sub-class to process the blocks 
    protected abstract void ProcessData(T data); 
} 

Таким образом, вы только сохраняя данные до тех пор, как вам нужно, и вы можете прикрепить столько потоков процесса, как вам нужно же наблюдаемого источника данных.


И для полноты картины, вот общий класс, который реализует IObservable<T>, так что вы можете увидеть, как все это совмещается. Это один даже есть комментарии:

/// <summary>Generic IObservable implementation</summary> 
/// <typeparam name="T">Type of messages being observed</typeparam> 
public class Observable<T> : IObservable<T> 
{ 
    /// <summary>Subscription class to manage unsubscription of observers.</summary> 
    private class Subscription : IDisposable 
    { 
     /// <summary>Observer list that this subscription relates to</summary> 
     public readonly ConcurrentBag<IObserver<T>> _observers; 

     /// <summary>Observer to manage</summary> 
     public readonly IObserver<T> _observer; 

     /// <summary>Initialize subscription</summary> 
     /// <param name="observers">List of subscribed observers to unsubscribe from</param> 
     /// <param name="observer">Observer to manage</param> 
     public Subscription(ConcurrentBag<IObserver<T>> observers, IObserver<T> observer) 
     { 
      _observers = observers; 
      _observer = observer; 
     } 

     /// <summary>On disposal remove the subscriber from the subscription list</summary> 
     public void Dispose() 
     { 
      IObserver<T> observer; 
      if (_observers != null && _observers.Contains(_observer)) 
       _observers.TryTake(out observer); 
     } 
    } 

    // list of subscribed observers 
    private readonly ConcurrentBag<IObserver<T>> _observers = new ConcurrentBag<IObserver<T>>(); 

    /// <summary>Subscribe an observer to this observable</summary> 
    /// <param name="observer">Observer instance to subscribe</param> 
    /// <returns>A subscription object that unsubscribes on destruction</returns> 
    /// <remarks>Always returns a subscription. Ensure that previous subscriptions are disposed 
    /// before re-subscribing.</remarks> 
    public IDisposable Subscribe(IObserver<T> observer) 
    { 
     // only add observer if it doesn't already exist: 
     if (!_observers.Contains(observer)) 
      _observers.Add(observer); 

     // ...but always return a new subscription. 
     return new Subscription(_observers, observer); 
    } 

    // delegate type for threaded invocation of IObserver.OnNext method 
    private delegate void delNext(T value); 

    /// <summary>Send <paramref name="data"/> to the OnNext methods of each subscriber</summary> 
    /// <param name="data">Data object to send to subscribers</param> 
    /// <remarks>Uses delegate.BeginInvoke to send out notifications asynchronously.</remarks> 
    public void Notify(T data) 
    { 
     foreach (var observer in _observers) 
     { 
      delNext handler = observer.OnNext; 
      handler.BeginInvoke(data, null, null); 
     } 
    } 

    // delegate type for asynchronous invocation of IObserver.OnComplete method 
    private delegate void delComplete(); 

    /// <summary>Notify all subscribers that the observable has completed</summary> 
    /// <remarks>Uses delegate.BeginInvoke to send out notifications asynchronously.</remarks> 
    public void NotifyComplete() 
    { 
     foreach (var observer in _observers) 
     { 
      delComplete handler = observer.OnCompleted; 
      handler.BeginInvoke(null, null); 
     } 
    } 
} 

Теперь вы можете создать Observable<byte[]> для использования в качестве передатчика для Process<byte[]> экземпляров, которые заинтересованы. Извлеките блоки данных из входного потока, звукового считывателя и т. Д. И передайте их методу Notify. Просто убедитесь, что вы клонировали массивы заранее ...

+0

В Паб/Суб наблюдателя я буду писать данные из потока в другой, используя поток? Это может исправить проблему Kinect 1, которая обеспечивает поток, который я могу перекачивать в поток памяти. –

+0

Но это не решает проблему MultiStream. На стороне у меня есть поток в компоненте A, а с другой стороны у меня есть считыватели B, C, D, E, которые хотят читать полный поток. Так что это не проблема с блокировкой, а проблема с «прочитанным» курсором, которую я пытаюсь решить. –

+0

В методе 'Notify'' IObservable вы отправляете полученную информацию каждому подписчику, чтобы каждый абонент получал полную копию данных работать на. Если вам нужно сохранить историю данных в каждом наблюдателе, тогда использование памяти может немного сдуться ... это компромисс, я думаю. Вы * можете * делиться потоком между разными подписчиками, если хотите, с объектом курсора потока для каждого абонента, который обрабатывает независимое позиционирование, которое им требуется. Было бы интересно писать ,,, но дать Pub/Sub попробовать. – Corey