2010-10-18 5 views
8

Я хотел бы разработать многопоточный UDP-сервер в C/Linux. Служба работает на одном порту x, поэтому есть возможность привязать к нему единственный UDP-сокет. Чтобы работать под большими нагрузками, у меня есть n потоков (статически определено), например 1 поток на процессор. Работа может быть доставлена ​​в поток с помощью epoll_wait, поэтому потоки разбужаются по требованию с помощью «EPOLLET | EPOLLONESHOT. Я приложил пример кода:Многопоточный UDP-сервер с epoll?

static int epfd; 
static sig_atomic_t sigint = 0; 

... 

/* Thread routine with epoll_wait */ 
static void *process_clients(void *pevents) 
{ 
    int rc, i, sock, nfds; 
    struct epoll_event ep, *events = (struct epoll_event *) pevents; 

    while (!sigint) { 
     nfds = epoll_wait(epfd, events, MAX_EVENT_NUM, 500); 

     for (i = 0; i < nfds; ++i) { 
      if (events[i].data.fd < 0) 
       continue; 

      sock = events[i].data.fd; 

      if((events[i].events & EPOLLIN) == EPOLLIN) { 
       printf("Event dispatch!\n"); 
       handle_request(sock); // do a recvfrom 
      } else 
       whine("Unknown poll event!\n"); 

      memset(&ep, 0, sizeof(ep)); 
      ep.events = EPOLLIN | EPOLLET | EPOLLONESHOT; 
      ep.data.fd = sock; 

      rc = epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &ep); 
      if(rc < 0) 
       error_and_die(EXIT_FAILURE, "Cannot add socket to epoll!\n"); 
     } 
    } 

    pthread_exit(NULL); 
} 

int main(int argc, char **argv) 
{ 
    int rc, i, cpu, sock, opts; 
    struct sockaddr_in sin; 
    struct epoll_event ep, *events; 
    char *local_addr = "192.168.1.108"; 
    void *status; 
    pthread_t *threads = NULL; 
    cpu_set_t cpuset; 

    threads = xzmalloc(sizeof(*threads) * MAX_THRD_NUM); 
    events = xzmalloc(sizeof(*events) * MAX_EVENT_NUM); 

    sock = socket(PF_INET, SOCK_DGRAM, 0); 
    if (sock < 0) 
     error_and_die(EXIT_FAILURE, "Cannot create socket!\n"); 

    /* Non-blocking */ 
    opts = fcntl(sock, F_GETFL); 
    if(opts < 0) 
     error_and_die(EXIT_FAILURE, "Cannot fetch sock opts!\n"); 
    opts |= O_NONBLOCK; 
    rc = fcntl(sock, F_SETFL, opts); 
    if(rc < 0) 
     error_and_die(EXIT_FAILURE, "Cannot set sock opts!\n"); 

    /* Initial epoll setup */ 
    epfd = epoll_create(MAX_EVENT_NUM); 
    if(epfd < 0) 
     error_and_die(EXIT_FAILURE, "Error fetching an epoll descriptor!\n"); 

    memset(&ep, 0, sizeof(ep)); 
    ep.events = EPOLLIN | EPOLLET | EPOLLONESHOT; 
    ep.data.fd = sock; 

    rc = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ep); 
    if(rc < 0) 
     error_and_die(EXIT_FAILURE, "Cannot add socket to epoll!\n"); 

    /* Socket binding */ 
    sin.sin_family = AF_INET; 
    sin.sin_addr.s_addr = inet_addr(local_addr); 
    sin.sin_port = htons(port_xy); 

    rc = bind(sock, (struct sockaddr *) &sin, sizeof(sin)); 
    if (rc < 0) 
     error_and_die(EXIT_FAILURE, "Problem binding to port! " 
         "Already in use?\n"); 

    register_signal(SIGINT, &signal_handler); 

    /* Thread initialization */ 
    for (i = 0, cpu = 0; i < MAX_THRD_NUM; ++i) { 
     rc = pthread_create(&threads[i], NULL, process_clients, events); 
     if (rc != 0) 
      error_and_die(EXIT_FAILURE, "Cannot create pthread!\n"); 

     CPU_ZERO(&cpuset); 
     CPU_SET(cpu, &cpuset); 

     rc = pthread_setaffinity_np(threads[i], sizeof(cpuset), &cpuset); 
     if (rc != 0) 
      error_and_die(EXIT_FAILURE, "Cannot create pthread!\n"); 

     cpu = (cpu + 1) % NR_CPUS_ON; 
    } 

    printf("up and running!\n"); 

    /* Thread joining */ 
    for (i = 0; i < MAX_THRD_NUM; ++i) { 
     rc = pthread_join(threads[i], &status); 
     if (rc != 0) 
      error_and_die(EXIT_FAILURE, "Error on thread exit!\n"); 
    } 

    close(sock); 
    xfree(threads); 
    xfree(events); 

    printf("shut down!\n"); 

    return 0; 
} 

Это правильный способ обработки этого сценария с помощью epoll? Должна ли функция _handle_request_ возвращаться как можно быстрее, потому что за это время блокировка eventqueue для сокета?

Спасибо за ответы!

ответ

9

Поскольку вы используете только один сокет UDP, нет смысла использовать epoll - вместо этого используйте блокировку recvfrom.

Теперь, в зависимости от протокола, который вам нужно обработать - если вы можете обрабатывать каждый пакет UDP по отдельности - вы можете фактически вызвать recvfrom одновременно из нескольких потоков (в пуле потоков). ОС позаботится о том, чтобы ровно один поток получал UDP-пакет. Затем этот поток может делать все, что ему нужно, в handle_request.

Однако, если вам нужно обрабатывать пакеты UDP в определенном порядке, вы, вероятно, не так уж много возможностей parallalise программу ...

+0

Точно то, что я собирался сказать :) – MarkR

-1

Нет, это не будет работать так, как вы хотите. Чтобы рабочие потоки обрабатывали события, поступающие через интерфейс epoll, вам нужна другая архитектура.

Пример конструкции (Есть несколько способов сделать это) Использование: SysV/POSIX семафоры.

  • Есть мастер резьбы икру н subthreads и семафор, а затем блокировать epolling свои гнезда (или любой другой).

  • Имейте каждый блок подпотока вниз по семафору.

  • Когда главный поток разблокируется, он хранит события в некоторой глобальной структуре и поднимает семафор один раз за событие.

  • В subthreads разблокированию, обрабатывать события, блок снова, когда семафор возвращается в 0.

Вы можете использовать трубу общего числа всех потоков для достижения очень схожую функциональность, что семафора. Это позволит вам заблокировать на select() вместо семафора, который вы можете использовать, чтобы разбудить потоки на каком-либо другом событии (тайм-ауты, другие трубки и т. Д.).

Вы также можете отменить этот элемент управления и получить главный поток просыпаться, когда его рабочие требуют задания. Я думаю, что вышеупомянутый подход лучше для вашего дела.

+1

Разве это не похоже на то, что Epoll делает внутри? Он имеет своего рода очередь событий и диспетчер событий. потоки просыпаются по запросу с epoll_wait ?! Я прочитал этот thead на LKML, где у разработчика PowerDNS был аналогичный вопрос (http://www.gossamer-threads.com/lists/linux/kernel/1197050) ... – Daniel

+0

Вы заставили меня сомневаться там ...Тем не менее, я уверен, что наличие нескольких потоков, ожидающих одного и того же дескриптора epoll, вызывает проблему Thundering Herd. Подожди, теперь я не совсем уверен. Проклятье. – slezica

+0

Вы получаете только громоподобное стадо, если вы не используете EPOLLET (с красным). Кстати, в зависимости от того, что вы делаете в handle_request, вы, вероятно, можете уйти без использования EPOLLONESHOT. Но, тем не менее, epoll не имеет большого смысла, если у вас есть только один сокет. – cmeerw

Смежные вопросы