2016-01-23 2 views
4

Я новичок в Rx, поэтому я, вероятно, делаю некоторые важные ошибки здесь.Наблюдаемый TcpListener заканчивается после однократного соединения

Я хотел создать очень простой сервер сокетов, который мог бы получать сообщения от клиентов с помощью Observables. Для этого я использую Rxx, который предоставляет методы расширения в пространстве имен System.Net.Sockets, а также предоставляет статический заводский класс ObserableTcpListener.

Вот что я до сих пор в значительной степени украл ее из различных источников:

IPEndPoint endpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9001); 
TcpListener listener = new TcpListener(endpoint); 

IObservable<TcpClient> clients = listener 
    .StartSocketObservable(1) 
    .SelectMany<Socket, TcpClient>(socket => SocketToTcpClient(socket)); 
    .Finally(listener.Stop) 

clients.Subscribe(client => 
{ 
    OnConnect(client).Subscribe(
     message => OnMessage(client, message), 
     ex => OnException(client, ex), 
     () => OnCompleted(client)); 
}); 

private static IObservable<TcpClient> SocketToTcpClient(Socket socket) 
{ 
    TcpClient client = new TcpClient(); 
    client.Client = socket; 
    return Observable.Return<TcpClient>(client); 
} 

private static IObservable<byte[]> OnConnect(TcpClient client) 
{ 
    return client.Client.ReceiveUntilCompleted(SocketFlags.None); 
} 

private static void OnMessage(TcpClient client, byte[] message) 
{ 
    Console.WriteLine("Mesage Received! - {0}", Encoding.UTF8.GetString(message)); 
} 

private static void OnCompleted(TcpClient client) 
{ 
    Console.WriteLine("Completed."); 
} 

private static void OnException(TcpClient client, Exception ex) 
{ 
    Console.WriteLine("Exception: {0}", ex.ToString()); 
} 

Это работает ... до точки. Я могу создать одно клиентское соединение. Как только это соединение завершается, кажется, что последовательность Observable завершается и вызывается .Finally(listener.Stop). Очевидно, это не то, что я хочу.

Я пробовал использовать фабричный класс ObserableTcpListener.Start(), но это дает мне тот же результат.

IObservable<TcpClient> sockets = ObservableTcpListener.Start(endpoint); 
sockets.Subscribe(client => 
{ 
    OnConnect(client).Subscribe(
     message => OnMessage(client, message), 
     ex => OnException(client, ex), 
     () => OnCompleted(client)); 
}); 

Я предполагаю, что я понимаю, проблема здесь: clients наблюдаемая последовательность просто пустой после того, как первый клиент прекращает, таким образом .Finally(listener.Stop) называется.

Что мне нужно сделать, чтобы обойти это? Как я могу продолжать слушать входящие соединения?

+0

https://searchcode.com/codesearch/view/14317362/ мне кажется, что код принимает только одно соединение. В любом случае, посоветуйте отказаться от этого подхода и использовать стандартные методы для запуска сокетов. Я не вижу никакого преимущества, связанного с использованием TcpListener/Client напрямую, возможно, с ожиданием. – usr

+1

@usr Основными причинами являются то, что по большей части мне нравится писать код Rx-way, потому что он очень читабельен и четко выражает намерение. Вторая причина заключается в том, что я пишу кучу разных кодов событийного стиля, а Rx обеспечивает отличную абстракцию над всеми этими стилями, позволяя сохранять шаблоны одинаковыми. В-третьих, я просто пытаюсь изучить Rx. Спасибо за совет! –

+0

Если я могу добавить аргумент счетчика к этому: Этот стиль разбивает нормальный последовательный код на обратные вызовы, что, как правило, является ужасным для качества кода. Например, у вас есть классическая ошибка в коде, где вы предполагаете, что TCP отправляет «сообщения». Это не так, и поэтому 'Encoding.UTF8.GetString' будет иногда возвращать мусор, если вам не повезет, и ваше« сообщение »разделяется на полпути через кодированную кодовую точку UTF8. Это трудно исправить в этом стиле. В последовательном коде вы можете использовать StreamReader или BinaryReader и вытаскивать данные. С толчком вы должны взять то, что приходит. – usr

ответ

4

Сделайте Observableгорячим и сохраняйтесь, пока есть подписки.

IObservable<TcpClient> clients = listener 
    .StartSocketObservable(1) 
    .SelectMany<Socket, TcpClient>(socket => SocketToTcpClient(socket)) 
    .Finally(listener.Stop) 
    .Publish().RefCount(); 
+0

Простите, я заболел какое-то время после публикации этого вопроса, и я совершенно забыл, что спросил. В любом случае, я попробовал пару часов назад, и да, на самом деле это, похоже, исправило это. –

Смежные вопросы