2016-04-20 4 views
0

Я пытаюсь создать прокси-сервер HTTP, где, в соответствии с именем хоста GET/CONNET в HTTP-запросе, некоторые соединения будут иметь более высокие приоритеты перед другими.Socket client recv() всегда возвращает 0

Идея состоит в выполнении запросов с более высоким приоритетом на основе заданного списка имен хостов, каждый из которых имеет определенный приоритет.

Рассматриваемые соединения будут сохранены приемщик нити в четырех различных очередей (по одному для каждого класса приоритета: максимальный, средний, минимальный и неклассифицированном); accepter будет затем fork() дочерним процессом, который будет деактивировать и обрабатывать ожидающие соединения в порядке приоритета. Поступая таким образом, приемщика нити всегда будет принимать новые соединения и для каждой установлена ​​в очереди Conne Короче, вот мой прокси:

  • главного: открывает TCP сокет, связывается с данным портом, слушает до 10 соединения, вызывает поток accepter передавая ему сокет fd, открытый с помощью предыдущего socket(), вызывает и присоединяется к этой теме;
  • приемщика: этот поток получает гнездо FD передаются от главного и петли с accept() возвращение клиента сокет, recv() от клиента, анализирует запрос и в соответствии с именем хоста в запросе HTTP будет помещёна пользовательская структура шахты в правильной очереди; тогда он будет fork(), поэтому процесс будет деактивирован и обработать соединение;
  • manageConnection: этот процесс, раздвоенный на приемщике, dequeues из очередей, исследует совала структуру разрешения поля имени хоста, открывает сокет клиент, к душевой головке может светиться серверу и, GET или CONNECT, выполнят запрос.

Новый прокси: не более fork(), я сделал пул потоков из четырех потоков (один «приемщик» и три «Коннектер»: так как я планирую поставить этот прокси-сервер на моем RPi 2, имеет процессор quadcore, я думал, что по крайней мере четыре потока были хорошими). Теперь у меня есть один mutex и два condition_variables. Код почти тот же, кроме потоков, мьютексов и переменных условий. Эти новые функции, вызываемые потоками:

  • Enqueue: эта нить содержит accept() петлю, где он получает от клиента, анализирует запрос HTTP, находит имя хоста и в соответствии с его приоритетом, поставить в очередь info_conn структура (введено в начале кода);

  • Dequeue: эта нить содержит освобождение пакета из очереди и управления соединениями цикла, где он получает info_conn-структуру из очереди, извлекает клиентский сокет (который я получил от accept() цикла), разрешает имя хоста и управление GET или CONNECT запроса.

Проблема: всегда то же самое, когда дело доходит до управления CONNECT запросов, recv() от клиента всегда возвращает 0: Я знаю, ПРИЕМ() возвращает 0, когда другая сторона связи отключилась, но это не то, что я хотел! Основываясь на поточном подходе, это тривиальная проблема с производителем/потребителем (выскакивание и нажатие на очереди), поэтому я считаю, что чередование потоков в очереди и деактивации правильное.

Мой код (новый):

#include <stdio.h> 
#include <string.h> 
#include <stdlib.h> 
#include <sys/socket.h> 
#include <sys/types.h> 
#include <sys/time.h> 
#include <arpa/inet.h> 
#include <unistd.h>  
#include <thread> 
#include <iostream> 
#include <netdb.h> 
#include <queue> 
#include <list> 
#include <vector> 
#include <condition_variable> 
#include <cstdlib> 


using namespace std; 

#define GET  0 
#define CONNECT 1 

#define DEFAULTCOLOR  "\033[0m" 
#define RED     "\033[22;31m" 
#define YELLOW    "\033[1;33m" 
#define GREEN    "\033[0;0;32m" 

#define MAX_SIZE   1000 
#define CONNECT_200_OK  "HTTP/1.1 200 Connection established\r\nProxy-agent: myproxy\r\n\r\n" 

// my custom struct stored in queues 
typedef struct info_connection { 
    int client_fd; 
    string host; 
    string payload; 
    int request; 
} info_conn; 

queue<info_conn>q1; 
queue<info_conn>q2; 
queue<info_conn>q3; 
queue<info_conn>q4; 
vector<thread> workers; 
condition_variable cond_read, cond_write; 
mutex mtx; 

void enqueue(int sock_client); 
void dequeue(void); 

int main(int argc, char *argv[]) { 
    int socket_desc; 
    struct sockaddr_in server; 

    socket_desc = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); 
    if (socket_desc == -1) { 
     perror("socket()"); 
     exit(-1); 
    } 

    server.sin_family = AF_INET; 
    server.sin_addr.s_addr = INADDR_ANY; 
    if (argc == 2) 
     server.sin_port = htons(atoi(argv[1])); 
    printf("listening to port %d\n", atoi(argv[1])); 

    if (bind(socket_desc,(struct sockaddr *)&server, sizeof(server)) < 0) { 
     perror("bind failed. Error"); 
     exit(-1); 
    } 
    printf("binded\n"); 

    listen(socket_desc, 10); 
    printf("listen\n"); 

    // thread pool, because I suck at forking 
    workers.push_back(thread(enqueue, socket_desc)); 
    workers.push_back(thread(dequeue)); 
    workers.push_back(thread(dequeue)); 
    workers.push_back(thread(dequeue)); 

    for (thread& t : workers) { 
     t.join(); 
    } 

    return 0; 
} 

void enqueue(int sock_client) { 
    printf("enqueue()\n"); 
    int client_sock; 
    struct sockaddr_in *client_struct; 
    unsigned int clilen; 
    bzero((char*)&client_struct, sizeof(client_struct)); 
    clilen = sizeof(client_struct); 
    char host_name[128]; 
    char buff[4096]; 
    int n_recv, n_send; 
    char *start_row, *end_row, *tmp_ptr, *tmp_start; 
    int req; 

    while((client_sock = accept(sock_client, (struct sockaddr *)&client_struct, &clilen))) { 
     memset(host_name, 0, sizeof(host_name)); 
     n_recv = recv(client_sock, buff, sizeof(buff), 0); 
     if (n_recv < 0) { 
      perror("recv()"); 
      break; 
     } 

     start_row = end_row = buff; 

     while ((end_row = strstr(start_row, "\r\n")) != NULL) { 
      int row_len = end_row - start_row; 
      if (row_len == 0) 
       break; 
      if (strncmp(buff, "GET ", 4) == 0) { 
       req = GET; 
       tmp_start = start_row + 4; 
       tmp_ptr = strstr(tmp_start, "//"); 
       int len = tmp_ptr - tmp_start; 
       tmp_start = tmp_start + len + 2; 
       tmp_ptr = strchr(tmp_start, '/'); 
       len = tmp_ptr - tmp_start; 
       strncpy(host_name, tmp_start, len); 
       break; 
      } 
      else if (strncmp(buff, "CONNECT ", 8) == 0) { 
       req = CONNECT; 
       tmp_start = start_row + 8; 
       tmp_ptr = strchr(tmp_start, ':'); 
       int host_len = tmp_ptr - tmp_start; 
       strncpy(host_name, tmp_start, host_len); 
       break; 
      } 
      start_row = end_row + 2; 
      /* if ((start_row - buff) >= strlen(buff)) 
        break;*/ 
     } 

     unique_lock<mutex> locker(mtx, defer_lock); 
     locker.lock(); 
     cond_write.wait(locker, [](){ 
      return (q1.size() < MAX_SIZE || q2.size() < MAX_SIZE || q3.size() < MAX_SIZE || q4.size() < MAX_SIZE); 
     }); 

     cout << "(DEBUG) thread " << this_thread::get_id() << " wants to insert, queues not full " << 
      q1.size() << ' ' << q2.size() << ' ' << q3.size() << ' ' << q4.size() << '\n'; 
     int priority = 0; 
     info_conn info_c; 
     info_c.client_fd = client_sock; 
     info_c.host = host_name; 
     info_c.request = req; 
     info_c.payload = string(buff); 
     cout << "(DEBUG) thread " << this_thread::get_id() << " looking for " << host_name << 
      " queues" << '\n'; 
     if (strcmp(host_name, "www.netflix.com") == 0) { 
      priority = 1; 
      printf("hostname = www.netflix.com, priority %d\n", priority); 
      q1.push(info_c); 
     } 
     else if (strcmp(host_name, "www.youtube.com") == 0) { 
      priority = 2; 
      printf("hostname = www.youtube.com, priority %d\n", priority); 
      q2.push(info_c); 
     } 
     else if (strcmp(host_name, "www.facebook.com") == 0) { 
      priority = 3; 
      printf("hostname = www.facebook.com, priority %d\n", priority); 
      q3.push(info_c); 
     } 
     else { 
      priority = 4; 
      printf("hostname %s not found in queues\n", host_name);     
      q4.push(info_c); 
     } 

     cout << GREEN << "(DEBUG) thread " << this_thread::get_id() << " inserted " << 
      q1.size() << ' ' << q2.size() << ' ' << q3.size() << ' ' << q4.size() << DEFAULTCOLOR<< '\n'; 

     locker.unlock(); 
     cond_read.notify_all(); 
    } 
    if (client_sock < 0) { 
     perror("accept failed"); 
     exit(-1); 
    } 
} 

void dequeue(void) { 
    int fd_client = -1; 
    int fd_server = -1; 
    struct sockaddr_in server; 
    int what_request; 
    char host_name[128]; 
    char buffer[1500]; 
    int n_send, n_recv; 
    size_t length; 
    info_conn req; 

    // CONNECT 
    int r, max; 
    int send_200_OK; 
    int read_from_client = 0; 
    int read_from_server = 0; 
    int send_to_client = 0; 
    int send_to_server = 0; 
    struct timeval timeout; 
    char buff[8192]; 
    fd_set fdset; 

    printf("dequeue()\n"); 
    while (true) { 
     unique_lock<mutex> locker(mtx, defer_lock); 
     locker.lock(); 
     cond_read.wait(locker, [](){ 
      return (q1.size() > 0 || q2.size() > 0 || q3.size() > 0 || q4.size() > 0); 
     }); 

     cout << "(DEBUG) thread " << this_thread::get_id() << " wants to remove, queues not empty " << 
      q1.size() << ' ' << q2.size() << ' ' << q3.size() << ' ' << q4.size() << '\n'; 
     if (q1.size() > 0) { 
      req = q1.front(); 
      q1.pop(); 
     } 
     else if (q2.size() > 0) { 
      req = q2.front(); 
      q2.pop(); 
     } 
     else if (q3.size() > 0) { 
      req = q3.front(); 
      q3.pop();  
     } 
     else if (q4.size() > 0) { 
      req = q4.front(); 
      q4.pop(); 
     } 
     cout << YELLOW <<"(DEBUG) thread " << this_thread::get_id() << " removed, " << 
      q1.size() << ' ' << q2.size() << ' ' << q3.size() << ' ' << q4.size() << DEFAULTCOLOR<<'\n'; 
     locker.unlock(); 
     // notify one, because I have only one "producer" thread 
     cond_write.notify_one(); 

     fd_client = req.client_fd; 
     //memcpy(host_name, req.host.c_str(), strlen(req.host)); 
     length = req.host.copy(host_name, req.host.size(), 0); 
     host_name[length] = '\0'; 
     what_request = req.request; 
     //memcpy(buffer, req.payload, req.payload.size()); 
     length = req.payload.copy(buffer, req.payload.size(), 0); 
     buffer[length] = '\0'; 
     what_request = req.request; 

     //cout << RED <<"(DEBUG) thread " << this_thread::get_id() << " copied packet payload " << 
     // buffer << DEFAULTCOLOR<<'\n'; 

     struct addrinfo* result; 
     struct addrinfo* res; 
     int error; 
     struct sockaddr_in *resolve; 

     fd_server = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); 
     if (fd_server < 0) { 
      perror("socket()"); 
      exit(-1); 
     } 

     cout << "(DEBUG) thread " << this_thread::get_id() << " fd_server " << fd_server << '\n'; 
     error = getaddrinfo(host_name, NULL, NULL, &result); 
     if (error != 0) { 
      if (error == EAI_SYSTEM) { 
       perror("getaddrinfo"); 
      } else { 
       fprintf(stderr, "error in getaddrinfo for (%s): %s\n", host_name, gai_strerror(error)); 
      } 
      exit(EXIT_FAILURE); 
     } 

     if (what_request == GET) { 
      server.sin_port = htons(80); 
     } 
     else if (what_request == CONNECT) { 
      server.sin_port = htons(443); 
     } 
     server.sin_family = AF_INET; 
     cout << "(DEBUG) thread " << this_thread::get_id() << " getaddrinfo()" << '\n'; 
     for (res = result; res != NULL; res = res->ai_next) { 
      if (res->ai_family == AF_INET) { 
       resolve = (struct sockaddr_in *)res->ai_addr; 
       //char *ip = inet_ntoa(resolve->sin_addr); 
       //printf("%s\n", ip); 
       server.sin_addr.s_addr = resolve->sin_addr.s_addr; 
       if (connect(fd_server, (struct sockaddr *)&server, sizeof (struct sockaddr_in)) < 0) { 
        fflush(stdout); 
        perror("connect()"); 
       } 
       else { 
        cout << "(DEBUG) thread " << this_thread::get_id() << " connected to " << inet_ntoa(server.sin_addr) << '\n'; 
       } 
       break; 
      } 
     } 

     // dealing with GET 
     if (what_request == GET) { 
      cout << "thread " << this_thread::get_id() << " dealing GET " << host_name << 
       " sending to server " << buffer << '\n'; 
      n_send = send(fd_server, buffer, strlen(buffer)+1, 0); 
      if (n_send < 0) { 
       cout << "thread " << this_thread::get_id() << " error sending GET request to server" << '\n'; 
       perror("send()"); 
       break; 
      } 
      do { 
       memset(buffer, 0, sizeof(buffer)); 
       n_recv = recv(fd_server, buffer, sizeof(buffer), 0); 
       cout << "thread " << this_thread::get_id() << " GET: " << host_name << " read from recv() " << n_recv << " bytes, " << 
        fd_client << "<->" << fd_server << '\n'; 
       n_send = send(fd_client, buffer, n_recv, 0); 
      } while (n_recv > 0); 

      if (n_recv < 0) { 
       cout << RED << "thread " << this_thread::get_id() << " error sending GET response from server to client" << DEFAULTCOLOR<<'\n'; 
       perror("send()"); 
       break; 
      } 
      close(fd_client); 
      close(fd_server); 
      cout << "thread " << this_thread::get_id() << 
       " done with GET request, quitting\n"; 
     } 

     // dealing with CONNECT 
     else if (what_request == CONNECT) { 
      cout << "thread " << this_thread::get_id() << " dealing CONNECT " << host_name << '\n';   
      max = fd_server >= fd_client ? fd_server+1 : fd_client+1; 
      send_200_OK = send(fd_client, CONNECT_200_OK, sizeof(CONNECT_200_OK), 0); 
      if (send_200_OK < 0) { 
       perror("send() 200 OK to client"); 
       break; 
      } 
      cout << "thread " << this_thread::get_id() << " SENT 200 OK to client " << '\n'; 

      int tot_recvd; 
      int tot_sent; 

      // TCP tunnel 
      while(true) { 
       memset(buff, 0, sizeof(buff)); 
       FD_ZERO(&fdset); 
       FD_SET(fd_client, &fdset); 
       FD_SET(fd_server, &fdset); 
       timeout.tv_sec = 15; 
       timeout.tv_usec = 0; 

       r = select(max, &fdset, NULL, NULL, &timeout); 

       if (r < 0) { 
        perror("select()"); 
        close(fd_client); 
        close(fd_server); 
        break; 
       } 

       if (r == 0) { // select timed out 
        printf("tunnel(): select() request timeout 408\n"); 
        close(fd_client); 
        close(fd_server); 
        break; 
       } 

       if (FD_ISSET(fd_client, &fdset)) { 
        tot_recvd = 0; 
        tot_sent = 0; 
        do { 
         read_from_client = recv(fd_client, &(buff[tot_recvd]), sizeof(buff), 0); 
         tot_recvd += read_from_client; 
         cout << "thread " << this_thread::get_id() << 
          " select(), reading from client " << fd_client << 
          " " << read_from_client << " bytes, " << fd_client<< " <-> " << fd_server<<'\n'; 
         if (buff[tot_recvd-1] == '\0') { 
          break; 
         } 
        } while (read_from_client > 0); 

        if (read_from_client < 0) { 
         perror("recv()"); 
         close(fd_client); 
         close(fd_server); 
         break; 
        } 

        if (read_from_client == 0) { 
         // this always happens!!! 
        } 

        send_to_server = send(fd_server, buff, read_from_client, 0); 
        if (send_to_server < 0) { 
         perror("send() to client"); 
         close(fd_client); 
         close(fd_server); 
         break; 
        } 
       } 

       if (FD_ISSET(fd_server, &fdset)) { 
        tot_recvd = 0; 
        tot_sent = 0; 
        do { 
         read_from_server = recv(fd_server, &(buff[tot_recvd]), sizeof(buff), 0); 
         tot_recvd += read_from_server; 
         cout << "thread " << this_thread::get_id() << 
          " select(), reading from server " << fd_client << 
          " " << read_from_server << " bytes, " << fd_client<< " <-> " << fd_server<<'\n'; 
         if (buff[tot_recvd-1] == '\0') { 
          break; 
         } 
        } while (read_from_server > 0); 

        if (read_from_server < 0) { 
         perror("read()"); 
         close(fd_client); 
         close(fd_server); 
         break;   
        } 

        if (read_from_server == 0) { 
         cout << "thread " << this_thread::get_id() << " select(), server closed conn" << '\n'; 
         close(fd_client); 
         close(fd_server); 
         break;      
        } 

        send_to_client = send(fd_client, buff, read_from_server, 0);  
        if (send_to_client < 0) { 
         perror("send() to client"); 
         close(fd_client); 
         close(fd_server); 
         break; 
        }  
       } 
      } 
      cout << "thread " << this_thread::get_id() << " done with CONNECT request\n"; 
     } 
    } 
} 

Environment: прокси работает на моем ноутбуке под управлением Ubuntu 14.04, x86_64; proxy тестируется в Chrome с плагином SwitchyOmega, который позволяет перенаправить трафик на определенный порт (тот же самый порт, который я передам моему прокси), скомпилированный с g++ -std=c++11 -pedantic -Wall -o funwithproxyfork funwithproxyfork.cpp -lpthread.

Выход (пробовал для Netflix и YouTube, они оба имеют те же проблемы, т.е. client closed conn, recv() возвращает 0):

req: 1, hostname: www.netflix.com, priority: 1 
thread 5611 accepting again 
(CHILD 5627) is about to handle conn 
(CHILD 5627) popped sock_client 4, request 1 
req: 1, hostname: www2-ext-s.nflximg.net, priority: 4 
thread 5611 accepting again 
(CHILD 5628) is about to handle conn 
(CHILD 5628) popped sock_client 4, request 1 
req: 1, hostname: www2-ext-s.nflximg.net, priority: 4 
thread 5611 accepting again 
(CHILD 5629) is about to handle conn 
(CHILD 5629) popped sock_client 4, request 1 
(CHILD 5627) attempting to connect to 54.247.92.196 (www.netflix.com) 
(CHILD 5628) attempting to connect to 54.247.125.40 (www.netflix.com) 
(CHILD 5629) attempting to connect to 54.247.110.247 (www.netflix.com) 
(CHILD 5627) connected to www.netflix.com, dealing CONNECT request 
(CHILD 5628) connected to www.netflix.com, dealing CONNECT request 
(CHILD 5628) client closed conn 
(CHILD 5627) client closed conn 
(CHILD 5628) done with CONNECT request 
(CHILD 5627) done with CONNECT request 
req: 1, hostname: www.netflix.com, priority: 1 
thread 5611 accepting again 
(CHILD 5630) is about to handle conn 
(CHILD 5630) popped sock_client 4, request 1 
(CHILD 5630) attempting to connect to 176.34.188.125 (www.netflix.com) 
(CHILD 5629) connected to www.netflix.com, dealing CONNECT request 
(CHILD 5629) client closed conn 
(CHILD 5629) done with CONNECT request 
(CHILD 5630) connected to www.netflix.com, dealing CONNECT request 

Тогда он не говорит ничего.

+1

Вы понимаете, что у вас есть серьезные проблемы с дизайном с кодом, кроме проблемы с recv()? 1.Если клиент подключается и не отправляет команду, но поддерживает соединение, ваш серверный процесс зависает и не принимает никаких новых подключений, пока не получит команду 2. fork() клонирует все пространство процесса. Детский процесс, удаляющий элементы из очереди, не влияет на очереди в родительском процессе. Ничто не удаляется из них. Этот код не будет работать, даже если проблема recv() исправлена. –

+0

Я думаю, что у меня есть второй момент, поэтому я создал пул потоков с 4 потоками (один цикл для приема соединений и три для открытия и управления соединениями), так как у меня есть процессор Quadcore ARM. Что касается первого момента, я не понял, что вы имели в виду о том, что клиент отправляет команды. – elmazzun

+0

И все же, даже с потоками и без вилки клиент по-прежнему отправляет 0 байтов моему прокси при обработке запроса CONNET. – elmazzun

ответ

-1

Изучив код, это кажется нормальным и связано с тем, как работает HTTP/1.1.

Возможно, вы используете некоторые клиенты, которые поддерживают конвейерную обработку HTTP/1.1. Когда HTTP/1.1 конвейерная обработка действует, сервер будет поддерживать соединение открытым, если клиент хочет отправить другой запрос. Если клиент этого не делает, клиент закрывает соединение.

Похоже, что ваш код ожидает, что сервер закроет соединение после ответа на HTTP-запрос, и вы не ожидаете, что клиент сначала закроет его сторону. Это неверно для HTTP/1.1, где вы можете сначала либо закрыть клиент, либо сервер. Любой, кто закрывает соединение, является нормальным.

Итак, здесь нет никаких проблем, кроме нескольких вопросов, которые я отметил отдельно, в комментариях, не связанных с проблемой recv(). Кроме того, код во многих местах недостаточно проверяет возвращаемое значение от send() и предполагает, что все запрошенные байты были отправлены. Это неверно. Вам не гарантируется, что send() отправит точное количество запрошенных байтов. Он может фактически отправлять меньше, и это указывается в возвращаемом значении, которое представляет собой количество фактически отправленных байтов.

Этот прокси начнет работать с ошибкой при высокой нагрузке на трафик, поскольку будет отправлено меньше байтов, чем было запрошено, но код не смог обнаружить эту ситуацию и правильно справиться с этим. Например, он прочитает 2000 байт с сервера, попытается отправить их клиенту send() отчетов о том, что отправлено 1000 байт, код продолжает свой веселый путь, и клиент не получает весь ответ от сервера. Наступает Хилларити.

Кроме того, здесь есть некоторые другие условия гонки, которые могут привести к тому, что прокси-сервер получит «заклинило» или заперт с помощью клиентов HTTP/1.1, которые полностью поддерживают конвейерную обработку. Но если у вас возникнут такие проблемы, это будет еще один вопрос, который нужно задать ...

+0

Если клиент 'recv() 'возвращает ноль, сервер * * закрыл соединение. – EJP

+0

Нет. Если вы recv() из клиентского сокета, и вы получаете 0, это только тот сокет, соединение которого было закрыто. –

+0

Почему downvote? – elmazzun

Смежные вопросы