2016-10-12 7 views
0

Получил некоторые проблемы с мультиплексированием сокетов TCP.TCP Socket Multiplexing Отправить большие данные

 //socket is non-blocking 
     const int MAX = 4096; 
     char *buff[MAX]; 
     char *p = buff; 
     int fd, rvalue; 
     rvalue = 0; 

     if ((fd = open(path, O_RDONLY)) < 0) { 
      return errno; 
     } else { 
     int didsend, didread; 
     int shouldsend; 
     while ((didread = read(fd, buff, MAX)) > 0) { 
      p = buff; 
      shouldsend = didread; 
      while (1) { 
      didsend = send(sockfd, p, shouldsend, 0); 
      //if send succeeds and returns the number of bytes fewer than asked for then try to send rest part in next time. 
      if (didsend < shouldsend) { 
       p += didsent; 
       shouldsend -= didsend; 
       continue; 
      } 
      //if there is no place for new data to send, then wait a brief time and try again. 
      if (didsend < 0 && (errno == EWOULDBLOCK || errno == EAGAIN)) { 
       usleep(1000); 
       continue; 
      } 
      //if all data has been sent then sending loop is over. 
      if (didsend == shouldsend) { 
       break; 
      } 
      //send error 
      if (didsend < 0) { 
       rvalue = errno; 
       break; 
      } 
      } 
     } 
     close(fd); 
     if (didread == -1) { 
      return errno; 
     } 
     return rvalue; 
     } 

Предположим, я использую I/O Multiplexing функция опроса() или Kqueue() и неблокируемой сокет, а затем, если есть только некоторые небольшие данные, как отправить короткое сообщение, он отлично работает.

Но если дело доходит до больших данных, я имею в виду больший размер буфера send(), поскольку с помощью неблокирующего сокета send() просто отправит часть данных и вернет, сколько данных он отправит, остальная часть данных может быть отправлена ​​только в другом вызове send(), но требуется время и не может определить, сколько времени потребуется. Таким образом, второй while() на самом деле является блокирующей отправкой, которая использует неблокирующий сокет.

Эквивалент:

//socket is blocking 
    const int MAX = 4096; 
    char *buff[MAX]; 
    int fd, n; 
    if ((fd = open(path, O_RDONLY)) < 0) { 
     return errno; 
    } else { 
    while ((n = read(fd, buff, MAX)) > 0) { 
     if (send(sockfd, buff, n, 0) < 0) { 
     return errno; 
     } 
    } 
    close(fd); 
    return 0; 
    } 

Итак, что такое решение этой проблемы, многопоточность может работать, но это своего рода тратить ресурс может быть.

+1

Будьте осторожны при выполнении 'while ((didread = read (fd, buff, MAX)))'. Помните, что 'read' возвращает' -1' при ошибке, а sinze '-1' отличен от нуля, считается« истинным ». Кроме того, если показанный код не находится в функции 'main', тогда у вас есть утечка ресурсов, так как вы не закрываете файл, который вы открываете. –

+0

@JoachimPileborg Спасибо, я исправил его. – ryuu

+0

that} else {для open, похоже, не добавляет значения –

ответ

2

Это общий шаблон для однопоточного сервера, который работает с несколькими соединениями и неблокирующими сокетами.

Это прежде всего псевдокод в C и не требует необходимой проверки ошибок. Но это дает вам представление о том, что для каждого принятого соединения вы сохраняете экземпляр структуры, который поддерживает дескриптор сокета, запрашивает состояние синтаксического анализа, ответный поток и любые другие «государственные» члены этого соединения. Затем вы просто используете «select», чтобы подождать или иметь несколько потоков, делающих то же самое.

Опять же, это только псевдокод и использует пример/опрос в качестве примера. Вы можете получить еще большую масштабируемость с помощью epoll.

while (1) 
{ 
    fd_set readset = {}; 
    fd_set writeset = {}; 

    for (int i = 0; i < number_of_client_connections; i++) 
    { 
     if (client_connections[i].reading_request) 
      FD_SET(client_connection.sock, &readset); 
     else 
      FD_SET(client_connection.sock, &writeset); 
    } 

    // add the listen socket to the read set 
    FD_SET(listen_socket, &readset); 

    select(n + 1, &readset, &writeset, &timeout); // wait for a socket to be ready (not shown - check for errors and return value) 

    if (FD_ISSET(listen_socket, &readset)) 
    { 
     int new_client_socket = accept(listen_socket, &addr, &addrlength); 

     // create a struct that keeps track of the connection state data 
     struct ConnectionData client_connection = {}; 
     client_connection.sock = new_client_socket; 
     client_connection.reading_request = 1; // awaiting for all the request bytes to come in 
     client_connections[number_of_client_connections++] = client_connection; // pseudo code, add the client_connection to the list 
    } 


    for (int i = 0; i < number_of_client_connections; i++) 
    { 
     if (client_connections[i].reading_request) 
     { 
      if (FD_ISSET(client_connections[i], &readset)) 
      { 
       char buffer[2000]; 
       int len = recv(client_connections[i].sock, buffer, 2000, 0); 
       // not shown - handle error case when (recv < 0) 
       // not shown - handle case when (recv == 0) 
       ProcessIncomingData(client_connections[i], buffer, len); // do all the request parsing here. Flip the client_connections[i].reading_request to 0 if ready to respond 
      } 
     } 
     else if (client_conections[i].reading_request == 0) 
     { 
      if (FD_ISSET(client_connections[i], &writeset)) 
      { 
       client_connection* conn = &client_connections[i]; 
       int len = send(conn->sock, conn->response_buffer + conn->txCount, conn->response_size - conn->txCount, 0); 
       conn->txCount += len; 

       if (conn->txCount == conn->response_size) 
       { 
        // done sending response - we can close this connection or change it to back to the reading state 
       } 
      } 
     } 
    } 
+0

Давайте просто сосредоточимся на send(), ваш код в основном говорит, что у нас есть флаг 'read_request' для каждого соединения, если один метод send() в цикле обработки select() не отправил все данные, мы не устанавливаем 'read_request' в true (это означает, что мы не будем обрабатывать запрос в следующем цикле обработки select()) и попытаться отправить данные покоя в следующий цикл обработки, пока все данные не будут отправлены, верно? Извините, если мой плохой английский смущает вас. xD – ryuu

+0

Исправить. Я пытаюсь указать, что вам нужно распределить структуру данных, которая должна быть связана с каждым сокетом, чтобы поддерживать состояние соединения, дескриптор открытого файла, который вы читаете, и предпочтительно временный буфер ('buff' в вашем примере), вместо использования переменных стека. Имеет ли это смысл? – selbie

+0

Да, я тебя понимаю. Спасибо за ваш эффективный ответ. – ryuu