Я пытаюсь создать прокси-сервер HTTP, где, в соответствии с именем хоста GET/CONNET в HTTP-запросе, некоторые соединения будут иметь более высокие приоритеты перед другими.Socket client recv() всегда возвращает 0
Идея состоит в выполнении запросов с более высоким приоритетом на основе заданного списка имен хостов, каждый из которых имеет определенный приоритет.
Рассматриваемые соединения будут сохранены приемщик нити в четырех различных очередей (по одному для каждого класса приоритета: максимальный, средний, минимальный и неклассифицированном); accepter будет затем fork()
дочерним процессом, который будет деактивировать и обрабатывать ожидающие соединения в порядке приоритета. Поступая таким образом, приемщика нити всегда будет принимать новые соединения и для каждой установлена в очереди Conne Короче, вот мой прокси:
- главного: открывает TCP сокет, связывается с данным портом, слушает до 10 соединения, вызывает поток accepter передавая ему сокет fd, открытый с помощью предыдущего
socket()
, вызывает и присоединяется к этой теме; - приемщика: этот поток получает гнездо FD передаются от главного и петли с
accept()
возвращение клиента сокет,recv()
от клиента, анализирует запрос и в соответствии с именем хоста в запросе HTTP будет помещёна пользовательская структура шахты в правильной очереди; тогда он будетfork()
, поэтому процесс будет деактивирован и обработать соединение; - manageConnection: этот процесс, раздвоенный на приемщике, dequeues из очередей, исследует совала структуру разрешения поля имени хоста, открывает сокет клиент, к душевой головке может светиться серверу и, GET или CONNECT, выполнят запрос.
Новый прокси: не более fork()
, я сделал пул потоков из четырех потоков (один «приемщик» и три «Коннектер»: так как я планирую поставить этот прокси-сервер на моем RPi 2, имеет процессор quadcore, я думал, что по крайней мере четыре потока были хорошими). Теперь у меня есть один mutex
и два condition_variables
. Код почти тот же, кроме потоков, мьютексов и переменных условий. Эти новые функции, вызываемые потоками:
Enqueue: эта нить содержит
accept()
петлю, где он получает от клиента, анализирует запрос HTTP, находит имя хоста и в соответствии с его приоритетом, поставить в очередьinfo_conn
структура (введено в начале кода);Dequeue: эта нить содержит освобождение пакета из очереди и управления соединениями цикла, где он получает
info_conn
-структуру из очереди, извлекает клиентский сокет (который я получил отaccept()
цикла), разрешает имя хоста и управление GET или CONNECT запроса.
Проблема: всегда то же самое, когда дело доходит до управления CONNECT запросов, recv()
от клиента всегда возвращает 0: Я знаю, ПРИЕМ() возвращает 0, когда другая сторона связи отключилась, но это не то, что я хотел! Основываясь на поточном подходе, это тривиальная проблема с производителем/потребителем (выскакивание и нажатие на очереди), поэтому я считаю, что чередование потоков в очереди и деактивации правильное.
Мой код (новый):
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/time.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <thread>
#include <iostream>
#include <netdb.h>
#include <queue>
#include <list>
#include <vector>
#include <condition_variable>
#include <cstdlib>
using namespace std;
#define GET 0
#define CONNECT 1
#define DEFAULTCOLOR "\033[0m"
#define RED "\033[22;31m"
#define YELLOW "\033[1;33m"
#define GREEN "\033[0;0;32m"
#define MAX_SIZE 1000
#define CONNECT_200_OK "HTTP/1.1 200 Connection established\r\nProxy-agent: myproxy\r\n\r\n"
// my custom struct stored in queues
typedef struct info_connection {
int client_fd;
string host;
string payload;
int request;
} info_conn;
queue<info_conn>q1;
queue<info_conn>q2;
queue<info_conn>q3;
queue<info_conn>q4;
vector<thread> workers;
condition_variable cond_read, cond_write;
mutex mtx;
void enqueue(int sock_client);
void dequeue(void);
int main(int argc, char *argv[]) {
int socket_desc;
struct sockaddr_in server;
socket_desc = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (socket_desc == -1) {
perror("socket()");
exit(-1);
}
server.sin_family = AF_INET;
server.sin_addr.s_addr = INADDR_ANY;
if (argc == 2)
server.sin_port = htons(atoi(argv[1]));
printf("listening to port %d\n", atoi(argv[1]));
if (bind(socket_desc,(struct sockaddr *)&server, sizeof(server)) < 0) {
perror("bind failed. Error");
exit(-1);
}
printf("binded\n");
listen(socket_desc, 10);
printf("listen\n");
// thread pool, because I suck at forking
workers.push_back(thread(enqueue, socket_desc));
workers.push_back(thread(dequeue));
workers.push_back(thread(dequeue));
workers.push_back(thread(dequeue));
for (thread& t : workers) {
t.join();
}
return 0;
}
void enqueue(int sock_client) {
printf("enqueue()\n");
int client_sock;
struct sockaddr_in *client_struct;
unsigned int clilen;
bzero((char*)&client_struct, sizeof(client_struct));
clilen = sizeof(client_struct);
char host_name[128];
char buff[4096];
int n_recv, n_send;
char *start_row, *end_row, *tmp_ptr, *tmp_start;
int req;
while((client_sock = accept(sock_client, (struct sockaddr *)&client_struct, &clilen))) {
memset(host_name, 0, sizeof(host_name));
n_recv = recv(client_sock, buff, sizeof(buff), 0);
if (n_recv < 0) {
perror("recv()");
break;
}
start_row = end_row = buff;
while ((end_row = strstr(start_row, "\r\n")) != NULL) {
int row_len = end_row - start_row;
if (row_len == 0)
break;
if (strncmp(buff, "GET ", 4) == 0) {
req = GET;
tmp_start = start_row + 4;
tmp_ptr = strstr(tmp_start, "//");
int len = tmp_ptr - tmp_start;
tmp_start = tmp_start + len + 2;
tmp_ptr = strchr(tmp_start, '/');
len = tmp_ptr - tmp_start;
strncpy(host_name, tmp_start, len);
break;
}
else if (strncmp(buff, "CONNECT ", 8) == 0) {
req = CONNECT;
tmp_start = start_row + 8;
tmp_ptr = strchr(tmp_start, ':');
int host_len = tmp_ptr - tmp_start;
strncpy(host_name, tmp_start, host_len);
break;
}
start_row = end_row + 2;
/* if ((start_row - buff) >= strlen(buff))
break;*/
}
unique_lock<mutex> locker(mtx, defer_lock);
locker.lock();
cond_write.wait(locker, [](){
return (q1.size() < MAX_SIZE || q2.size() < MAX_SIZE || q3.size() < MAX_SIZE || q4.size() < MAX_SIZE);
});
cout << "(DEBUG) thread " << this_thread::get_id() << " wants to insert, queues not full " <<
q1.size() << ' ' << q2.size() << ' ' << q3.size() << ' ' << q4.size() << '\n';
int priority = 0;
info_conn info_c;
info_c.client_fd = client_sock;
info_c.host = host_name;
info_c.request = req;
info_c.payload = string(buff);
cout << "(DEBUG) thread " << this_thread::get_id() << " looking for " << host_name <<
" queues" << '\n';
if (strcmp(host_name, "www.netflix.com") == 0) {
priority = 1;
printf("hostname = www.netflix.com, priority %d\n", priority);
q1.push(info_c);
}
else if (strcmp(host_name, "www.youtube.com") == 0) {
priority = 2;
printf("hostname = www.youtube.com, priority %d\n", priority);
q2.push(info_c);
}
else if (strcmp(host_name, "www.facebook.com") == 0) {
priority = 3;
printf("hostname = www.facebook.com, priority %d\n", priority);
q3.push(info_c);
}
else {
priority = 4;
printf("hostname %s not found in queues\n", host_name);
q4.push(info_c);
}
cout << GREEN << "(DEBUG) thread " << this_thread::get_id() << " inserted " <<
q1.size() << ' ' << q2.size() << ' ' << q3.size() << ' ' << q4.size() << DEFAULTCOLOR<< '\n';
locker.unlock();
cond_read.notify_all();
}
if (client_sock < 0) {
perror("accept failed");
exit(-1);
}
}
void dequeue(void) {
int fd_client = -1;
int fd_server = -1;
struct sockaddr_in server;
int what_request;
char host_name[128];
char buffer[1500];
int n_send, n_recv;
size_t length;
info_conn req;
// CONNECT
int r, max;
int send_200_OK;
int read_from_client = 0;
int read_from_server = 0;
int send_to_client = 0;
int send_to_server = 0;
struct timeval timeout;
char buff[8192];
fd_set fdset;
printf("dequeue()\n");
while (true) {
unique_lock<mutex> locker(mtx, defer_lock);
locker.lock();
cond_read.wait(locker, [](){
return (q1.size() > 0 || q2.size() > 0 || q3.size() > 0 || q4.size() > 0);
});
cout << "(DEBUG) thread " << this_thread::get_id() << " wants to remove, queues not empty " <<
q1.size() << ' ' << q2.size() << ' ' << q3.size() << ' ' << q4.size() << '\n';
if (q1.size() > 0) {
req = q1.front();
q1.pop();
}
else if (q2.size() > 0) {
req = q2.front();
q2.pop();
}
else if (q3.size() > 0) {
req = q3.front();
q3.pop();
}
else if (q4.size() > 0) {
req = q4.front();
q4.pop();
}
cout << YELLOW <<"(DEBUG) thread " << this_thread::get_id() << " removed, " <<
q1.size() << ' ' << q2.size() << ' ' << q3.size() << ' ' << q4.size() << DEFAULTCOLOR<<'\n';
locker.unlock();
// notify one, because I have only one "producer" thread
cond_write.notify_one();
fd_client = req.client_fd;
//memcpy(host_name, req.host.c_str(), strlen(req.host));
length = req.host.copy(host_name, req.host.size(), 0);
host_name[length] = '\0';
what_request = req.request;
//memcpy(buffer, req.payload, req.payload.size());
length = req.payload.copy(buffer, req.payload.size(), 0);
buffer[length] = '\0';
what_request = req.request;
//cout << RED <<"(DEBUG) thread " << this_thread::get_id() << " copied packet payload " <<
// buffer << DEFAULTCOLOR<<'\n';
struct addrinfo* result;
struct addrinfo* res;
int error;
struct sockaddr_in *resolve;
fd_server = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (fd_server < 0) {
perror("socket()");
exit(-1);
}
cout << "(DEBUG) thread " << this_thread::get_id() << " fd_server " << fd_server << '\n';
error = getaddrinfo(host_name, NULL, NULL, &result);
if (error != 0) {
if (error == EAI_SYSTEM) {
perror("getaddrinfo");
} else {
fprintf(stderr, "error in getaddrinfo for (%s): %s\n", host_name, gai_strerror(error));
}
exit(EXIT_FAILURE);
}
if (what_request == GET) {
server.sin_port = htons(80);
}
else if (what_request == CONNECT) {
server.sin_port = htons(443);
}
server.sin_family = AF_INET;
cout << "(DEBUG) thread " << this_thread::get_id() << " getaddrinfo()" << '\n';
for (res = result; res != NULL; res = res->ai_next) {
if (res->ai_family == AF_INET) {
resolve = (struct sockaddr_in *)res->ai_addr;
//char *ip = inet_ntoa(resolve->sin_addr);
//printf("%s\n", ip);
server.sin_addr.s_addr = resolve->sin_addr.s_addr;
if (connect(fd_server, (struct sockaddr *)&server, sizeof (struct sockaddr_in)) < 0) {
fflush(stdout);
perror("connect()");
}
else {
cout << "(DEBUG) thread " << this_thread::get_id() << " connected to " << inet_ntoa(server.sin_addr) << '\n';
}
break;
}
}
// dealing with GET
if (what_request == GET) {
cout << "thread " << this_thread::get_id() << " dealing GET " << host_name <<
" sending to server " << buffer << '\n';
n_send = send(fd_server, buffer, strlen(buffer)+1, 0);
if (n_send < 0) {
cout << "thread " << this_thread::get_id() << " error sending GET request to server" << '\n';
perror("send()");
break;
}
do {
memset(buffer, 0, sizeof(buffer));
n_recv = recv(fd_server, buffer, sizeof(buffer), 0);
cout << "thread " << this_thread::get_id() << " GET: " << host_name << " read from recv() " << n_recv << " bytes, " <<
fd_client << "<->" << fd_server << '\n';
n_send = send(fd_client, buffer, n_recv, 0);
} while (n_recv > 0);
if (n_recv < 0) {
cout << RED << "thread " << this_thread::get_id() << " error sending GET response from server to client" << DEFAULTCOLOR<<'\n';
perror("send()");
break;
}
close(fd_client);
close(fd_server);
cout << "thread " << this_thread::get_id() <<
" done with GET request, quitting\n";
}
// dealing with CONNECT
else if (what_request == CONNECT) {
cout << "thread " << this_thread::get_id() << " dealing CONNECT " << host_name << '\n';
max = fd_server >= fd_client ? fd_server+1 : fd_client+1;
send_200_OK = send(fd_client, CONNECT_200_OK, sizeof(CONNECT_200_OK), 0);
if (send_200_OK < 0) {
perror("send() 200 OK to client");
break;
}
cout << "thread " << this_thread::get_id() << " SENT 200 OK to client " << '\n';
int tot_recvd;
int tot_sent;
// TCP tunnel
while(true) {
memset(buff, 0, sizeof(buff));
FD_ZERO(&fdset);
FD_SET(fd_client, &fdset);
FD_SET(fd_server, &fdset);
timeout.tv_sec = 15;
timeout.tv_usec = 0;
r = select(max, &fdset, NULL, NULL, &timeout);
if (r < 0) {
perror("select()");
close(fd_client);
close(fd_server);
break;
}
if (r == 0) { // select timed out
printf("tunnel(): select() request timeout 408\n");
close(fd_client);
close(fd_server);
break;
}
if (FD_ISSET(fd_client, &fdset)) {
tot_recvd = 0;
tot_sent = 0;
do {
read_from_client = recv(fd_client, &(buff[tot_recvd]), sizeof(buff), 0);
tot_recvd += read_from_client;
cout << "thread " << this_thread::get_id() <<
" select(), reading from client " << fd_client <<
" " << read_from_client << " bytes, " << fd_client<< " <-> " << fd_server<<'\n';
if (buff[tot_recvd-1] == '\0') {
break;
}
} while (read_from_client > 0);
if (read_from_client < 0) {
perror("recv()");
close(fd_client);
close(fd_server);
break;
}
if (read_from_client == 0) {
// this always happens!!!
}
send_to_server = send(fd_server, buff, read_from_client, 0);
if (send_to_server < 0) {
perror("send() to client");
close(fd_client);
close(fd_server);
break;
}
}
if (FD_ISSET(fd_server, &fdset)) {
tot_recvd = 0;
tot_sent = 0;
do {
read_from_server = recv(fd_server, &(buff[tot_recvd]), sizeof(buff), 0);
tot_recvd += read_from_server;
cout << "thread " << this_thread::get_id() <<
" select(), reading from server " << fd_client <<
" " << read_from_server << " bytes, " << fd_client<< " <-> " << fd_server<<'\n';
if (buff[tot_recvd-1] == '\0') {
break;
}
} while (read_from_server > 0);
if (read_from_server < 0) {
perror("read()");
close(fd_client);
close(fd_server);
break;
}
if (read_from_server == 0) {
cout << "thread " << this_thread::get_id() << " select(), server closed conn" << '\n';
close(fd_client);
close(fd_server);
break;
}
send_to_client = send(fd_client, buff, read_from_server, 0);
if (send_to_client < 0) {
perror("send() to client");
close(fd_client);
close(fd_server);
break;
}
}
}
cout << "thread " << this_thread::get_id() << " done with CONNECT request\n";
}
}
}
Environment: прокси работает на моем ноутбуке под управлением Ubuntu 14.04, x86_64; proxy тестируется в Chrome с плагином SwitchyOmega, который позволяет перенаправить трафик на определенный порт (тот же самый порт, который я передам моему прокси), скомпилированный с g++ -std=c++11 -pedantic -Wall -o funwithproxyfork funwithproxyfork.cpp -lpthread
.
Выход (пробовал для Netflix и YouTube, они оба имеют те же проблемы, т.е. client closed conn
, recv()
возвращает 0):
req: 1, hostname: www.netflix.com, priority: 1
thread 5611 accepting again
(CHILD 5627) is about to handle conn
(CHILD 5627) popped sock_client 4, request 1
req: 1, hostname: www2-ext-s.nflximg.net, priority: 4
thread 5611 accepting again
(CHILD 5628) is about to handle conn
(CHILD 5628) popped sock_client 4, request 1
req: 1, hostname: www2-ext-s.nflximg.net, priority: 4
thread 5611 accepting again
(CHILD 5629) is about to handle conn
(CHILD 5629) popped sock_client 4, request 1
(CHILD 5627) attempting to connect to 54.247.92.196 (www.netflix.com)
(CHILD 5628) attempting to connect to 54.247.125.40 (www.netflix.com)
(CHILD 5629) attempting to connect to 54.247.110.247 (www.netflix.com)
(CHILD 5627) connected to www.netflix.com, dealing CONNECT request
(CHILD 5628) connected to www.netflix.com, dealing CONNECT request
(CHILD 5628) client closed conn
(CHILD 5627) client closed conn
(CHILD 5628) done with CONNECT request
(CHILD 5627) done with CONNECT request
req: 1, hostname: www.netflix.com, priority: 1
thread 5611 accepting again
(CHILD 5630) is about to handle conn
(CHILD 5630) popped sock_client 4, request 1
(CHILD 5630) attempting to connect to 176.34.188.125 (www.netflix.com)
(CHILD 5629) connected to www.netflix.com, dealing CONNECT request
(CHILD 5629) client closed conn
(CHILD 5629) done with CONNECT request
(CHILD 5630) connected to www.netflix.com, dealing CONNECT request
Тогда он не говорит ничего.
Вы понимаете, что у вас есть серьезные проблемы с дизайном с кодом, кроме проблемы с recv()? 1.Если клиент подключается и не отправляет команду, но поддерживает соединение, ваш серверный процесс зависает и не принимает никаких новых подключений, пока не получит команду 2. fork() клонирует все пространство процесса. Детский процесс, удаляющий элементы из очереди, не влияет на очереди в родительском процессе. Ничто не удаляется из них. Этот код не будет работать, даже если проблема recv() исправлена. –
Я думаю, что у меня есть второй момент, поэтому я создал пул потоков с 4 потоками (один цикл для приема соединений и три для открытия и управления соединениями), так как у меня есть процессор Quadcore ARM. Что касается первого момента, я не понял, что вы имели в виду о том, что клиент отправляет команды. – elmazzun
И все же, даже с потоками и без вилки клиент по-прежнему отправляет 0 байтов моему прокси при обработке запроса CONNET. – elmazzun