2015-01-04 3 views
0

Я использую winsock и C++ 11 потоков для сервера tcp. Для каждого клиента я создаю новый объект ReceiveThread, который имеет объект std :: thread. Клиент Tcp находится на Java. Я хотел бы создать простую функцию вещания. (если кто-то отправляет сообщение, то сервер перенаправляет его всем). Я использую класс оболочки для клиентских сокетов, который включает мьютекс. (синхронизированный unordered_map). Каждое сообщение структурировано. Первый байт - это длина сообщения, второй байт указывает тип, а затем фактические данные. (Длина данных известно, первый байт в том, что)странное поведение сервера tcp (используя winsock)

[EDIT] Мой существующий код прекрасно работает с одним клиентом. Когда второй клиент подключается, он также может отправлять сообщения, и оба клиента получают его. Но если я отправляю сообщение с первым клиентом, сервер получает его на втором потоке (сообщения поступают корректно кстати.), Который принадлежит второму клиенту. После этого сервер ничего не получает от первого клиента. (Я удалил «отправить вперед всем» части, потому что проблема возникает при получении части, и я также редактировал void ReceiveThread::receive(), теперь я называю только один раз, и я буду обрабатывать его позже)

server.cpp

#include "Server.h" 
#include <thread> 
#include <string> 
#include <winsock2.h> 
#include <iostream> 
#include "ReceiveThread.h" 


using namespace std; 

Server::Server(string ip, int port):ip(ip),port(port){ 
    init(); 
} 

int Server::init(){ 

    //init the winsock library 
    WSADATA wsaData; 
    int iResult = WSAStartup(MAKEWORD(2, 2), &wsaData); 
    if (iResult != NO_ERROR){ 
     cout << "Error WSAStartup!"; 
     return -1; 
    } 

    // Create a SOCKET for listening for incoming connection requests. 
    listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); 
    if (listenSocket == INVALID_SOCKET) { 
     cout << "Error at socket(): " << WSAGetLastError(); 
     WSACleanup(); 
     return -1; 
    } 

    //---------------------- 
    // The sockaddr_in structure specifies the address family, 
    // IP address, and port for the socket that is being bound. 
    sockaddr_in service; 
    service.sin_family = AF_INET; 
    service.sin_addr.s_addr = inet_addr(ip.c_str()); 
    service.sin_port = htons(port); 

    if (::bind(listenSocket, (SOCKADDR*)&service, sizeof(service)) == SOCKET_ERROR) { 
     cout << "Bind error."; 
     closesocket(listenSocket); 
     WSACleanup(); 
     return -1; 
    } 

    //---------------------- 
    // Listen for incoming connection requests. 
    // on the created socket 
    if (listen(listenSocket, 1) == SOCKET_ERROR) { 
     cout << "Error listening on socket.\n"; 
     closesocket(listenSocket); 
     WSACleanup(); 
     return -1; 
    } 

    // Accept connections 
    acceptConnections(); 

} 
void Server::acceptConnections(){ 
    // Create a SOCKET for accepting incoming request. 
    SOCKET acceptSocket; 
    cout << "Waiting for clients."; 
    int counter = 0; 
    while (1){ 
     acceptSocket = accept(listenSocket, NULL, NULL); 
     if (acceptSocket == INVALID_SOCKET){ 
      cout << "Accept error"; 
      closesocket(listenSocket); 
      WSACleanup(); 
      return; 
     } 
     else{ 
      clientSockets.add(acceptSocket); 
      cout << "Client connected."; 
      // create a new receive thread object for every client 
      counter++; 
      ReceiveThread receiveThread(clientSockets, acceptSocket,counter); 
     } 
    } 
} 

ReceiveThread.cpp

#include "ReceiveThread.h" 
#include <winsock2.h> 
#include "Message.h" 
#include <iostream> 
#include <string> 

using namespace std; 

ReceiveThread::ReceiveThread(ClientSockList &clients, SOCKET &socket,int counter) :clients(clients), socket(socket),counter(counter){ 
    //cout << clients.getList().size(); 
    receiveThread = new thread(&ReceiveThread::receive, this); 
} 

void ReceiveThread::terminateThread(){ 
    terminated = true; 
} 
void ReceiveThread::receive(){ 
    int res; 
    while (!terminated){ 

     char recvbuf[BUF_SIZE]; // BU_SIZE = 1024 
     int recv_len = 0; 
     res = recv(socket, recvbuf + recv_len, BUF_SIZE - recv_len, 0); 
     if (!checkSocket(res)) break; 
     cout << "[" << counter << "] "; 
     for (int i = 0; i < res; ++i){ 
      cout << recvbuf[i]; 
     } 
     cout << endl; 
    } 
    //delete receiveThread; 
} 

bool ReceiveThread::checkSocket(int res){ 
    if (res == SOCKET_ERROR || res == 0){ 
     terminated = true; 
     cout << endl << "Terminated" << endl; 
     clients.remove(socket); 
     closesocket(socket); 
     return false; 
    } 
    else{ 
     return true; 
    } 
} 

Это, как я отправлять сообщения от клиента:

public void sendMessageForBroadcast(String message) throws IOException { 
    //String m = buildMessage(message,Message.TYPE_BROADCAST); 
    StringBuffer buff = new StringBuffer(); 
    buff.append(Character.toChars(message.length())); 
    buff.append(Character.toChars(1)); 
    buff.append(message); 

    //System.out.println("Sending message: " + m + "["+m.length()+"]"); 
    outputStream.write(buff.toString().getBytes("UTF8")); 
    outputStream.flush(); 
} 

[EDIT] Сценарий:

  1. связывают с client1
  2. отправить сообщение с client1
  3. получать сообщения на сервере (Тема 1)
  4. связывают с client2
  5. посыла сообщение с client2
  6. получить сообщение на сервере (Тема 2)
  7. отправить сообщение с client1
  8. получить сообщение на сервере (Тема 2)
  9. теперь сервер не получает ничего от client1
+0

Это довольно много кода для чтения, попробуйте сузить его с помощью отладчика. Когда код, кажется, блокируется, разбейте его, используя отладчик, чтобы увидеть, где находится блокировка, если он находится в блокирующей функции или бесконечном цикле. –

+2

Примечание: я ничего не вижу, чтобы указать, что «Сообщение» является правилом трех безопасным. И вы действительно понимаете свой прием GetHitThread (clientSockets, acceptSocket, counter); 'logic вызывает * неопределенное поведение * функцией-членом, ссылающейся на объект (' this'), который уже давно уничтожен после того, как он выходит из сферы действия (что сразу после построения). Это значительный объем кода, который можно разделить без промежуточного тестирования и проверки. Я бы настоятельно предложил вам взять полдюжины шагов * назад * и пересмотреть то, что вы на самом деле пытаетесь сделать. – WhozCraig

+0

Я только что отредактировал контент. Я удалил классы «Message» и отредактировал метод «ReceiveThread :: receive()». Минимальный код остается только, и проблема все еще существует. Кстати, как я могу избежать этого неопределенного поведения? Или как я могу начать потоки лучше? Весь дизайн плох? Не могли бы вы дать совет? – cylon

ответ

1

У вас есть серьезный случай undefined behavior.

Все начинается с этой строкой:

ReceiveThread receiveThread(clientSockets, acceptSocket,counter); 

Это создает ReceiveThread объекта (конструктор которого создает свой поток, ссылающийся this). Проблема заключается в том, что, как только будет объявлено объявление, блок кода, объявленный в конце переменной, приводит к тому, что переменная выходит из области действия и разрушает объект.

После уничтожения объекта любой код, разыменовывающий прежние объекты this указатель (который является всем кодом, который использует объекты, не являющиеся статическими переменными или функциями), будет разыменовывать указатель разрушенного объекта, что приведет к указанному неопределенному поведению.

Я предлагаю вам сохранить коллекцию указателей на объект нити, чтобы сохранить объект в области видимости до тех пор, пока он не будет разрушен, и сохраните ссылку на объект, если впоследствии вам понадобится использовать его из других потоков.


Там также другой возможный источник непредсказуемое поведение, потому что я не вижу инициализации переменной-члена terminated, что означает его значение будет неопределенным, когда вы ссылаетесь его в потоке.

+0

Спасибо :) теперь он отлично работает. – cylon

Смежные вопросы