2017-02-22 7 views
3

Недавно у меня появились идеи для нескольких проектов, которые связаны с чтением IP-адресов из файла. Поскольку все они должны иметь возможность обрабатывать большое количество хостов, я попытался реализовать многопоточность или создать пул сокетов и выбрать() из них, чтобы достичь некоторой формы параллелизма для лучшей производительности , В нескольких случаях чтение из файла кажется узким местом в повышении производительности. То, как я это понимаю, чтение из файла с помощью fgets или аналогичного - это синхронная операция блокировки. Поэтому, даже если я успешно выполнил клиент, который подключается к нескольким хостам асинхронно, операция будет по-прежнему синхронной, потому что я могу читать только один адрес за раз из файла.Чтение асинхронного ввода-вывода из файла

/* partially pseudo code */ 

/* getaddrinfo() stuff here */ 

while(fgets(ip, sizeof(ip), file) { 
FD_ZERO(&readfds); 
/* create n sockets here in a for loop */ 
for (i = 0; i < socket_num; i++) { 
    if (newfd > fd[i]) newfd = fd[i]; 
    FD_SET(fd[i], &readfds); 
} 

/* here's where I think I should connect n sockets to n addresses from file 
* but I'm only getting one IP at a time from file, so I'm not sure how to connect to 
* n addresses at once with fgets 
*/ 

for (j = 0; j < socket_num; j++) { 
     if ((connect(socket, ai->ai_addr, ai->ai_addrlen)) == -1) 
     // error 
     else { 
      freeaddrinfo(ai);  
     FD_SET(socket, &master); 
      fdmax = socket; 
      if (select(socket+1, &master, NULL, NULL, &tv) == -1); 
     // error   
      if ((recvd = read(socket, banner, RECVD)) <= 0) 
     // error 
      if (FD_ISSET(socket, &master)) 
     // print success 
     } 
    /* clear sets and close sockets and stuff */ 
} 

Я указал, мои проблемы с комментариями, но только уточнить: я не уверен, как выполнять асинхронные операции ввода/вывода на нескольких целевых серверах, считанных из файла, так как чтение записей из файла кажется быть строго синхронным. Я столкнулся с аналогичными проблемами с многопоточным процессом, с незначительным успехом.

void *function_passed_to_pthread_create(void *opts) 
    { 
     while(fgets(ip_addr, sizeof(ip_addr), opts->file) { 
      /* speak to ip_addr and get response */ 
    } 
} 

main() 
{ 
    /* necessary stuff */ 
    for (i = 0; i < thread_num; i++) { 
     pthread_create(&tasks, NULL, above_function, opts) 
    } 
    for (j = 0; j < thread_num; j++) 
     /* join threads */ 
    return 0; 
} 

Это похоже на работу, но, поскольку несколько потоков обрабатывают один и тот же файл, результаты не всегда точны. Я предполагаю, что это связано с тем, что несколько потоков могут обрабатывать один и тот же адрес из файла одновременно.

Я рассмотрел загрузку всех записей из файла в массив/в память, но если файл был особенно большой, я думаю, что это может вызвать проблемы с памятью. Кроме того, я не уверен, что это даже имеет смысл делать в любом случае.

В качестве окончательного примечания; если файл, который я читаю, является очень большим файлом с огромным количеством IP-адресов, то я не считаю, что решение хорошо масштабируется. Что-то возможно с C, хотя, я полагаю, есть какой-то способ достичь того, что я надеюсь.

Чтобы суммировать это сообщение; Я хотел бы найти способ улучшить производительность приложений на стороне клиента, используя асинхронный ввод-вывод или многопоточность при чтении записей из файла.

+0

Я чувствую вашу боль, но есть много способов прочитать файл. Самый быстрый подход, я знаю, включает в себя чтение файла последовательно с помощью собственных функций API (то есть ReadFile). Затем вы можете открутить обработку до разных потоков и т. Д. – Mikhail

+1

Это бесполезно для чтения файла с несколькими потоками, прочитанного [this] (http://stackoverflow.com/a/10397184/7076153). У вас должен быть менеджер, который читает файл и передает каждому потоку строку файла. – Stargateur

+1

В типичной системе Linux жесткий предел для максимального количества открытых дескрипторов (файлов и сокетов) обычно составляет 65536. (Мягкий предел, то есть по умолчанию, обычно намного ниже, примерно как 1024.) Даже если вы принимаете чрезвычайно длинные имена хостов и имена сервисов (для соединений, а не номера портов) вам все равно потребуется не менее 100 символов на каждого клиента. 65536 × 100 - всего около 6,5 мегабайт. Чтение и токенизация того, что (для пар хостов и портов) требуется только на долю секунды. ** Это незначительно. ** –

ответ

5

Несколько человек намекнули на хорошее решение этого в своих комментариях, но, вероятно, стоит упомянуть об этом более подробно. полное решение имеет довольно много деталей и довольно сложный код, поэтому я собираюсь использовать псевдокод, чтобы объяснить, что я рекомендую.

Что вы имеете здесь, это действительно вариация на классическую проблему с производителем/потребителем: у вас есть одна вещь, производящая данные, и многие вещи, пытающиеся использовать эти данные. В вашем случае это должно быть быть «единственной вещью», производящей эти данные, поскольку длины каждой строки исходного файла неизвестны: вы не можете просто перепрыгнуть вперед «n» байты и как-то быть на следующем IP. В то время может быть только один актер, перемещающий указатель чтения к следующей неизвестной позиции \n, поэтому вы по определению имеете одного производителя.

Есть три основных способа атаковать это:

  • решение A подразумевает наличие каждой нити, потянув немного больше из общего буфера файла, и сбрасывая асинхронный (неблокирования) чтения каждый раз, когда последнее чтение завершается. Есть целый ряд головных болей, которые получают это решение правильно, так как он очень чувствителен к разнице во времени между файловой системой и выполняемой работой: если файл читается медленно, все будут ждать ожидания файла. Если рабочие работают медленно, программа чтения файлов либо остановится, либо заполнит память, ожидая, пока они будут потреблять данные. Это решение, скорее всего, является абсолютным самым быстрым, но это также невероятно сложный код синхронизации, чтобы получить право примерно с оговорками. Если вы не специалист в потоковом (или чрезвычайно умном злоупотреблении epoll_wait()), вы, вероятно, не хотите идти по этому маршруту.

  • Раствор В есть «мастер» нить, ответственная за чтение файла и заполнение какого-то потокобезопасное очереди с данными он читает, с одним IP-адресом (одна строки) на запись очереди , Каждый из рабочих потоков просто потребляет записи в очереди так быстро, как может, запрашивая удаленный сервер, а затем запрашивая другую запись в очереди. Это требует немного осторожности, чтобы получить право, но, как правило, намного безопаснее, чем Решение A, особенно если вы используете реализацию чужих очередей.

  • Решение C довольно hacktastic, но вы не должны уволить его вне стороны, в зависимости от того, что вы делаете. Это решение просто включает в себя использование чего-то вроде команды Un * x sed (см. Get a range of lines from a file given the start and end line numbers), чтобы обрезать исходный файл в кучу «коротких» исходных файлов заранее - скажем, двадцать из них. Затем вы просто запускаете двадцать экземпляров очень простой однопоточной программы параллельно, используя &, каждый на другом «срезе» файла. Мы объединились с небольшим сценарием оболочки, чтобы автоматизировать его, это может быть «достаточно хорошим» решением для множества потребностей.


Давайте повнимательнее посмотрим на решения B - мастер-нить с поточно-очереди. Я собираюсь обмануть и предположить, что вы можете построить реализацию рабочей очереди (если нет, есть статьи StackOverflow по реализации потоковой безопасности с использованием pthreads: pthread synchronized blocking queue).

В псевдокоде это решение, то что-то вроде этого:

main() 
{ 
    /* Create a queue. */ 
    queue = create_queue(); 

    /* Kick off the master thread to read the file, and give it the queue. */ 
    master_thread = pthread_create(master, queue); 

    /* Kick off a bunch of workers with access to the queue. */ 
    for (i = 0; i < 20; i++) { 
     worker_thread[i] = pthread_create(worker, queue); 
    } 

    /* Wait for everybody to finish. */ 
    pthread_join(master_thread); 
    for (i = 0; i < 20; i++) { 
     pthread_join(worker_thread[i]); 
    } 
} 

void master(queue q) 
{ 
    FILE *fp = fopen("ips.txt", "r"); 
    char buffer[BIGGER_THAN_ANY_IP]; 

    /* Inhale the file as fast as we can, and push each line we 
     read onto the queue. */ 
    while (fgets(fp, buffer) != NULL) { 
     char *next_ip = strdup(buffer); 
     enqueue(q, next_ip); 
    } 

    /* Add some final messages in the queue to let the workers 
     know that we're out of data. There are *much* better ways 
     of notifying them that we're "done", but in this case, 
     pushing a bunch of NULLs equal to the number of threads is 
     simple and probably good enough. */ 
    for (i = 0; i < 20; i++) { 
     enqueue(q, NULL); 
    } 
} 

void worker(queue q) 
{ 
    char *ip; 

    /* Inhale messages off the queue as fast as we can until 
     we get a "NULL", which means that it's time to stop. 
     The call to dequeue() *must* block if there's nothing 
     in the queue; the call should only return NULL if the 
     queue actually had NULL pushed into it. */ 
    while ((ip = dequeue(q)) != NULL) { 

     /* Insert code to actually do the work here. */ 
     connect_and_send_and_receive_to(ip); 
    } 
} 

Есть много предостережений и деталей в реальной реализации (например: как мы реализуем очереди, кольцевые буферы или связанный список? что если текст не все IP-адреса? Что, если буфер символов недостаточно велик, сколько потоков достаточно, как мы имеем дело с файловыми или сетевыми ошибками? будет ли производительность malloc стать узким местом? большой? можем ли мы лучше перекрывать сетевой ввод-вывод?).

Но, оговорки и подробности в сторону, представленный выше псевдокод, является достаточно хорошей отправной точкой, что вы, вероятно, можете расширить его в рабочее решение.

+0

Спасибо вам, идеально. Это определенно достаточно информации для меня, чтобы развить то, что у меня есть. – user3408678

+0

Вы также можете реализовать какой-то предел количества одновременных подключений одновременно. Вы можете использовать семафор/мьютекс в потребительском коде очереди –

+0

Извините, это означает, что награда за это ответ будет получена раньше. – user3408678

0

читать IP-адреса из файла, иметь рабочие потоки, поддерживать IP-адреса рабочих потоков. пусть вся связь сокетов происходит в рабочих потоках. Кроме того, если адреса IPv4 хранятся в шестнадцатеричном формате вместо ascii, возможно, они могут считывать кратность из них за один снимок, и это будет быстрее.

0

Если вы просто хотите асинхронно читать, вы можете использовать getch() из ncurses с задержкой, установленной на 0. Это часть posix, поэтому вам не нужны никакие дополнительные зависимости.Также у вас есть unlocked_stdio.

С другой стороны, я должен задаться вопросом, почему «fgets»() является узким местом. Пока у вас есть данные в файле, он не должен блокироваться. И даже если данные огромны (например, 1 МБ или 100 тыс. Ip-адресов), чтение его в список при запуске должно занимать менее 1 секунды.

И почему вы открываете подключения sockets_num к каждому ip в списке? У вас есть sockets_num, умноженное на количество IP-адресов одновременно. Поскольку каждый сокет является файлом в linux, вы будете сталкиваться с системными проблемами при попытке открыть более нескольких тысяч файлов (см. Ulimit -Sn). Можете ли вы подтвердить, что в этом случае проблема не связана с connect()?

Смежные вопросы