2016-08-24 3 views
3

Я хочу использовать linq для обработки событий, полученных через соединение с веб-разъемом. Это то, что я до сих пор:Наблюдение входящих сообщений веб-памяти с реактивными расширениями?

private static void Main() 
    { 
     string WsEndpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting"; 
     using (WebSocket ws = new WebSocket(WsEndpoint)) 
     { 
      ws.OnMessage += Ws_OnMessage; 

      ws.Connect(); 
      Console.ReadKey(); 
      ws.Close(); 
     } 
    } 

    private static void Ws_OnMessage(object sender, MessageEventArgs e) 
    { 
     Console.WriteLine(e.Data); 
    } 

Первый думаю, что пни меня, как превратить ws.OnMessage в какой-то поток событий. Я не могу найти никаких примеров в Интернете для наблюдения за источником внешнего с реактивными расширениями. Я намерен анализировать сообщения в json-объектах, а затем фильтровать их и агрегировать.

Может ли кто-нибудь предоставить пример создания наблюдаемого из сообщений веб-камеры и подписаться на него?


Edit: Заключительный рабочий код

Единственное отличие от выбранного ответа заключается в том, что я инициализирован WebSocket перед передачей его в Observable.Using

//------------------------------------------------------- 
// Create websocket connection 
//------------------------------------------------------- 
const string wsEndpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting"; 
WebSocket socket = new WebSocket(wsEndpoint); 


//------------------------------------------------------- 
// Create an observable by wrapping ws.OnMessage 
//------------------------------------------------------- 
var globalEventStream = Observable 
    .Using(
     () => socket, 
     ws => 
      Observable 
       .FromEventPattern<EventHandler<MessageEventArgs>, MessageEventArgs>(
        handler => ws.OnMessage += handler, 
        handler => ws.OnMessage -= handler)); 
//--------------------------------------------------------- 
// Subscribe to globalEventStream 
//--------------------------------------------------------- 

IDisposable subscription = globalEventStream.Subscribe(ep => 
{ 
    Console.WriteLine("Event Recieved"); 
    Console.WriteLine(ep.EventArgs.Data); 
}); 

//---------------------------------------------------------- 
// Send message over websocket 
//---------------------------------------------------------- 
socket.Connect(); 
socket.Send("test message"); 
// When finished, close the connection. 
socket.Close(); 
+0

Что 'WebSocket' библиотеки вы используете? – Jacob

+0

Я использую 'WebSocketSharp' – mooglinux

+0

@mooglinux - Почему у вас есть вызов' .Publish() '? Это предотвратит все значения, пока вы не назовете '.Connect()' на наблюдаемое. Вероятно, вы можете просто удалить '.Publish()'. – Enigmativity

ответ

6

Вы должны настроить ваш наблюдаемые как это:

var observable = 
     Observable 
      .Using(
       () => new WebSocket(WsEndpoint), 
       ws => 
        Observable 
         .FromEventPattern<EventHandler<MessageEventArgs>, MessageEventArgs>(
          handler => ws.OnMessage += handler, 
          handler => ws.OnMessage -= handler)); 

Это правильно создаст сокет t, а затем наблюдать событие, когда подписывается наблюдаемый. Когда подписка будет размещена, она будет правильно отсоединяться от события и утилизировать сокет.


Тип observable будет IObservable<EventPattern<MessageEventArgs>>. Вы потребляете это наблюдаемое таким образом:

IDisposable subscription = observable.Subscribe(ep => 
{ 
    Console.WriteLine(ep.EventArgs.Data); 
}); 

Спасибо за публикуемые ссылки NuGet.

Вот рабочий код:

const string WsEndpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting"; 

Console.WriteLine("Defining Observable:"); 

IObservable<EventPattern<WebSocketSharp.MessageEventArgs>> observable = 
    Observable 
     .Using(
      () => 
      { 
       var ws = new WebSocketSharp.WebSocket(WsEndpoint); 
       ws.Connect(); 
       return ws; 
      }, 
      ws => 
       Observable 
        .FromEventPattern<EventHandler<WebSocketSharp.MessageEventArgs>, WebSocketSharp.MessageEventArgs>(
         handler => ws.OnMessage += handler, 
         handler => ws.OnMessage -= handler)); 

Console.WriteLine("Subscribing to Observable:"); 

IDisposable subscription = observable.Subscribe(ep => 
{ 
    Console.WriteLine("Event Recieved"); 
    Console.WriteLine(ep.EventArgs.Data); 
}); 

Console.WriteLine("Writing to Source:"); 

using (var source = new WebSocketSharp.WebSocket(WsEndpoint)) 
{ 
    source.Connect(); 
    source.Send("test"); 
} 
+0

Какая польза от размещения websocket внутри 'Observable.Using' в отличие от другого ответа? – mooglinux

+3

Когда вы распоряжаетесь подписью наблюдаемого, сокет также удаляется. – Shlomo

+0

Я пишу пользовательскую функцию для 'handler'? Если да, то каков тип возврата? Я попытался изменить 'handler' на функцию, которая напечатала данные и вернула' void', но это не сработало. – mooglinux

Смежные вопросы