Я использую winsock и C++ 11 потоков для сервера tcp. Для каждого клиента я создаю новый объект ReceiveThread, который имеет объект std :: thread. Клиент Tcp находится на Java. Я хотел бы создать простую функцию вещания. (если кто-то отправляет сообщение, то сервер перенаправляет его всем). Я использую класс оболочки для клиентских сокетов, который включает мьютекс. (синхронизированный unordered_map). Каждое сообщение структурировано. Первый байт - это длина сообщения, второй байт указывает тип, а затем фактические данные. (Длина данных известно, первый байт в том, что)странное поведение сервера tcp (используя winsock)
[EDIT] Мой существующий код прекрасно работает с одним клиентом. Когда второй клиент подключается, он также может отправлять сообщения, и оба клиента получают его. Но если я отправляю сообщение с первым клиентом, сервер получает его на втором потоке (сообщения поступают корректно кстати.), Который принадлежит второму клиенту. После этого сервер ничего не получает от первого клиента. (Я удалил «отправить вперед всем» части, потому что проблема возникает при получении части, и я также редактировал void ReceiveThread::receive()
, теперь я называю только один раз, и я буду обрабатывать его позже)
server.cpp
#include "Server.h"
#include <thread>
#include <string>
#include <winsock2.h>
#include <iostream>
#include "ReceiveThread.h"
using namespace std;
Server::Server(string ip, int port):ip(ip),port(port){
init();
}
int Server::init(){
//init the winsock library
WSADATA wsaData;
int iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (iResult != NO_ERROR){
cout << "Error WSAStartup!";
return -1;
}
// Create a SOCKET for listening for incoming connection requests.
listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (listenSocket == INVALID_SOCKET) {
cout << "Error at socket(): " << WSAGetLastError();
WSACleanup();
return -1;
}
//----------------------
// The sockaddr_in structure specifies the address family,
// IP address, and port for the socket that is being bound.
sockaddr_in service;
service.sin_family = AF_INET;
service.sin_addr.s_addr = inet_addr(ip.c_str());
service.sin_port = htons(port);
if (::bind(listenSocket, (SOCKADDR*)&service, sizeof(service)) == SOCKET_ERROR) {
cout << "Bind error.";
closesocket(listenSocket);
WSACleanup();
return -1;
}
//----------------------
// Listen for incoming connection requests.
// on the created socket
if (listen(listenSocket, 1) == SOCKET_ERROR) {
cout << "Error listening on socket.\n";
closesocket(listenSocket);
WSACleanup();
return -1;
}
// Accept connections
acceptConnections();
}
void Server::acceptConnections(){
// Create a SOCKET for accepting incoming request.
SOCKET acceptSocket;
cout << "Waiting for clients.";
int counter = 0;
while (1){
acceptSocket = accept(listenSocket, NULL, NULL);
if (acceptSocket == INVALID_SOCKET){
cout << "Accept error";
closesocket(listenSocket);
WSACleanup();
return;
}
else{
clientSockets.add(acceptSocket);
cout << "Client connected.";
// create a new receive thread object for every client
counter++;
ReceiveThread receiveThread(clientSockets, acceptSocket,counter);
}
}
}
ReceiveThread.cpp
#include "ReceiveThread.h"
#include <winsock2.h>
#include "Message.h"
#include <iostream>
#include <string>
using namespace std;
ReceiveThread::ReceiveThread(ClientSockList &clients, SOCKET &socket,int counter) :clients(clients), socket(socket),counter(counter){
//cout << clients.getList().size();
receiveThread = new thread(&ReceiveThread::receive, this);
}
void ReceiveThread::terminateThread(){
terminated = true;
}
void ReceiveThread::receive(){
int res;
while (!terminated){
char recvbuf[BUF_SIZE]; // BU_SIZE = 1024
int recv_len = 0;
res = recv(socket, recvbuf + recv_len, BUF_SIZE - recv_len, 0);
if (!checkSocket(res)) break;
cout << "[" << counter << "] ";
for (int i = 0; i < res; ++i){
cout << recvbuf[i];
}
cout << endl;
}
//delete receiveThread;
}
bool ReceiveThread::checkSocket(int res){
if (res == SOCKET_ERROR || res == 0){
terminated = true;
cout << endl << "Terminated" << endl;
clients.remove(socket);
closesocket(socket);
return false;
}
else{
return true;
}
}
Это, как я отправлять сообщения от клиента:
public void sendMessageForBroadcast(String message) throws IOException {
//String m = buildMessage(message,Message.TYPE_BROADCAST);
StringBuffer buff = new StringBuffer();
buff.append(Character.toChars(message.length()));
buff.append(Character.toChars(1));
buff.append(message);
//System.out.println("Sending message: " + m + "["+m.length()+"]");
outputStream.write(buff.toString().getBytes("UTF8"));
outputStream.flush();
}
[EDIT] Сценарий:
- связывают с client1
- отправить сообщение с client1
- получать сообщения на сервере (Тема 1)
- связывают с client2
- посыла сообщение с client2
- получить сообщение на сервере (Тема 2)
- отправить сообщение с client1
- получить сообщение на сервере (Тема 2)
- теперь сервер не получает ничего от client1
Это довольно много кода для чтения, попробуйте сузить его с помощью отладчика. Когда код, кажется, блокируется, разбейте его, используя отладчик, чтобы увидеть, где находится блокировка, если он находится в блокирующей функции или бесконечном цикле. –
Примечание: я ничего не вижу, чтобы указать, что «Сообщение» является правилом трех безопасным. И вы действительно понимаете свой прием GetHitThread (clientSockets, acceptSocket, counter); 'logic вызывает * неопределенное поведение * функцией-членом, ссылающейся на объект (' this'), который уже давно уничтожен после того, как он выходит из сферы действия (что сразу после построения). Это значительный объем кода, который можно разделить без промежуточного тестирования и проверки. Я бы настоятельно предложил вам взять полдюжины шагов * назад * и пересмотреть то, что вы на самом деле пытаетесь сделать. – WhozCraig
Я только что отредактировал контент. Я удалил классы «Message» и отредактировал метод «ReceiveThread :: receive()». Минимальный код остается только, и проблема все еще существует. Кстати, как я могу избежать этого неопределенного поведения? Или как я могу начать потоки лучше? Весь дизайн плох? Не могли бы вы дать совет? – cylon