2014-12-27 5 views
0

я преподавал в университете, к простейшим способом обработки процессов в фоновом режиме, чтобы повесить ход на детей/родителей Мультипроцессный процессов с трубой read() функции. Честно говоря, я уже 2 недели работаю над своей домашней работой, и я не могу решить асинхронную обработку процесса. Я минимизировал свой код, чтобы написать одну дочернюю обработку с двумя трубами и заблокировать родительский и дочерний процессы с помощью функции read(). Вы можете найти текущее состояние моего кода ниже:GNU C манипуляции с трубами

#include <sys/types.h> 
#include <unistd.h> 
#include <stdio.h> 
#include <stdlib.h> 
#include <string.h> 

char pipeMessage[100]; 

char* readFromPipe(int pipe) 
{ 
    pipeMessage[0] = '\0'; 
    read(pipe, pipeMessage, sizeof(pipeMessage)); 
    fflush(NULL); 
    return pipeMessage; 
} 

void writeToPipe(int pipe, char *input) 
{ 
    char text[strlen(input) + 1]; 
    strncpy(text, input, (int)strlen(input)); 
    strncat(text, "\0", 1); 
    printf("TEXT: %s\n", text); 
    write(pipe, text, sizeof(text)); 
    fflush(NULL); 
} 

int main(void) 
{ 
    int pipes[2][2]; 
    pid_t pid; 

    if(pipe(pipes[0]) < 0 || pipe(pipes[1]) < 0) 
    { 
     printf("[ERROR] create pipes\n"); 
     return -1; 
    } 

    printf("[PARENT] Create child1\n"); 
    if((pid = fork()) < 0) 
    { 
     printf("[ERROR] Fork error\n"); 
     return -1; 
    } 

    if(pid == 0) 
    { 
     // Child code 
     close(pipes[0][0]); 
     writeToPipe(pipes[0][1], "TEST MESSAGE"); 
     printf("[CHILD1] pipe message: %s\n", readFromPipe(pipes[1][0])); 
     writeToPipe(pipes[0][1], "-1"); 
    } 
    else if(pid > 0) 
    { 
     // Parent code 
     close(pipes[0][1]); 
     close(pipes[1][0]); 
     char *message; 
     do 
     { 
      message = readFromPipe(pipes[0][0]); 
      printf("[PARENT] pipe message: %s\n", message); 
      writeToPipe(pipes[1][1], "-1"); 
     } 
     while(atoi(message) != -1); 
    } 
    return 0; 
} 

ошибка является следующее:

[PARENT] Create child1 
TEXT: TEST MESSAGE 
[PARENT] pipe message: TEST MESSAGE 
TEXT: -1 
[CHILD1] pipe message: -1 
TEXT: -1�po 
[PARENT] pipe message: -1�T MESSAGE 
TEXT: -1�po 

Я пытался осуществить эту обработку с сигналами процесса, но в конечном приложении мне потребуется 3 различные дочерние процессы, а асинхронный запуск вызвал проблемы с обработкой сигнала. Я также попытался найти учебное пособие в сети, но каждый многопроцессорный раздел охватывает простое решение одного сообщения от родителя к ребенку и наоборот. Но в родительском процессе мне нужно меню, основанное на символах, поэтому дети должны постоянно ждать родительского сигнала/сообщения, а родителям также нужно дождаться, когда ребенок закончит фактическую задачу. Пожалуйста, помогите мне, потому что я действительно застрял. Если у вас есть нормальное решение для обработки процессов, сообщите мне, потому что я знаю, что этот код ужасен. Единственная причина - отсутствие правильных учебников. Спасибо заранее.

+0

Я бы использовал некоторое время (правда) с помощью выбора вызова, работающего с файловыми дескрипторами, созданными с помощью socketpair (двунаправленная связь). –

+0

Можете ли вы дать мне отправную точку, пожалуйста? как ссылка или простой пример. заранее заранее – MontyX

+0

Дайте мне 5 мин. Если я не могу найти ссылку, я даю вам простой пример –

ответ

1

Вы логика записи сомнительна, и расположение ваших труб явно сложнее. Ниже приведен ваш код с учетом всего простого случая, который я могу собрать. Комментарии включены, чтобы помочь вам. Я считаю, что проще при работе с скованными труб (что во всех отношениях, что именно это), чтобы выложить один массив дескрипторов, индексированную с помощью макросов, которые показывают цепное:

// some hand macros for access the correct pipes 
#define P_READ 0 
#define C_WRITE 1 
#define C_READ 2 
#define P_WRITE 3 
#define N_PIPES 4 

выше будет в конечном источнике список. Прозвища должны быть само собой разумеющимися, но в случае их отсутствия, P_XXX отмечает, что трубы родительский использует процессы, C_XXX отмечает, что трубы у ребенка использует процесс. Имейте это в виду, когда видите код:

#include <stdio.h> 
#include <stdlib.h> 
#include <string.h> 
#include <assert.h> 
#include <unistd.h> 
#include <sys/types.h> 

// some hand macros for access the correct pipes 
#define P_READ 0 
#define C_WRITE 1 
#define C_READ 2 
#define P_WRITE 3 
#define N_PIPES 4 

// reads a buffer up-to len size. 
ssize_t readFromPipe(int pipe, char *buff, size_t len) 
{ 
    buff[0] = 0; 
    ssize_t res = read(pipe, buff, len); 
    assert(res >= 0 && "Failed to read from pipe"); 
    return res; 
} 

ssize_t writeToPipe(int pipe, const char *input) 
{ 
    size_t len = strlen(input)+1; 
    ssize_t res = write(pipe, input, len); 
    assert(res == len && "Failed to write to pipe"); 
    return res; 
} 

int main(void) 
{ 
    int pipes[N_PIPES]; 
    char msg[128] = {0}; 
    pid_t pid; 

    if(pipe(pipes) < 0 || pipe(pipes+2) < 0) 
    { 
     printf("[ERROR] create pipes\n"); 
     return EXIT_FAILURE; 
    } 

    if((pid = fork()) < 0) 
    { 
     printf("[ERROR] Fork error\n"); 
     return EXIT_FAILURE; 
    } 

    // parent code 
    if(pid > 0) 
    { 
     // Parent code. close down the child pipes; don't need them 
     printf("parent(%d) create: child(%d)\n", getpid(), pid); 
     close(pipes[C_WRITE]); 
     close(pipes[C_READ]); 

     do 
     { 
      if (readFromPipe(pipes[P_READ], msg, sizeof(msg)) > 0) 
      { 
       printf("parent(%d) read : %s\n", getpid(), msg); 
       writeToPipe(pipes[P_WRITE], "-1"); 
      } 
      else break; 
     } 
     while(atoi(msg) != -1); 

     // close remaining pipes. no longer needed 
     close(pipes[P_READ]); 
     close(pipes[P_WRITE]); 
    } 

    else if(pid == 0) 
    { 
     // Child code. don't need parent write or child-read lines 
     close(pipes[P_READ]); 
     close(pipes[P_WRITE]); 

     // write message 
     writeToPipe(pipes[C_WRITE],"test message"); 

     // read test message 
     if (readFromPipe(pipes[C_READ], msg, sizeof(msg)) > 0) 
      printf("child(%d) read : %s\n", getpid(), msg); 

     // write another message 
     writeToPipe(pipes[C_WRITE], "-1"); 

     // close remaining pipes. no longer needed 
     close(pipes[C_READ]); 
     close(pipes[C_WRITE]); 
    } 

    return EXIT_SUCCESS; 
} 

Дескриптор массива трубы не выдержать, самое большое изменение в это упрощенная writeToPipe логика, которая просто записывает строку терминатор C к трубе, нуль-терминатор знак.

ssize_t writeToPipe(int pipe, const char *input) 
{ 
    size_t len = strlen(input)+1; 
    ssize_t res = write(pipe, input, len); 
    assert(res == len && "Failed to write to pipe"); 
    return res; 
} 

Вызывающий проверяет результат, чтобы убедиться, что написал все запрошенные данные, и встроенный утверждают() макрос будет срабатывать ваш отладчик в случае неудачи. Аналогичная логика существует для функции чтения.

Выход (в зависимости от идентификатора процесса)

parent(2067) create: child(2068) 
parent(2067) read : test message 
child(2068) read : -1 
parent(2067) read : -1 

Я надеюсь, что это помогает. При работе с трубами и, в частности, с перенаправлением stdio, с которым вы, скорее всего, столкнетесь в не слишком отдаленном будущем (see spoiler here), это очень помогает иметь значимые мнемоники для вашего кода, такие как макросы, которые я использовал выше для индексации массив труб.

Удачи.

+1

Почему вы используете макросы вместо простого ENUM? – MontyX

+0

@MontyX либо будет работать, но перечисление, безусловно, безопаснее. Спасибо за ввод. – WhozCraig

+0

enum кажется более гибким для меня, если мне нужно добавить еще дочерний процесс – MontyX

1

Хорошо, у меня есть шаблон, который я использую профессионально для встроенных решений. Используя это, я создал рабочее решение, которое содержит цикл клиента, но цикл сервера объявлен. Должно быть ясно, как можно было бы сделать решение.

Должно быть ясно, как несколько читателей/писателей могут быть добавлены к выбранному вызову. Примечание. Используется SOCK_DGRAM. В дополнение к socketpairs могут быть добавлены трубы (что предпочтительнее для потоковой передачи необработанных данных от процесса к процессу). Я надеюсь, что вы сможете чему-то научиться. Мы должны удалить его (когда-нибудь). Это требует немного больше работы, чтобы быть полным (петля сервера)

#include <stdlib.h> 
#include <stdio.h> 
#include <sys/types.h> 
#include <sys/socket.h> 
#include <sys/wait.h> 
#include <errno.h> 
#include <cstring> // strerror 
#include <unistd.h> // pipe 
#include <fcntl.h> 

#include <stdint.h> 
#include <algorithm> 

#define CallErrExit(fun, arg, retval) { \ 
    if ((fun arg)<0) {    \ 
     FailErr(#fun);    \ 
     return retval;    \ 
    }      \ 
    } 

#define FailErr(msg) {    \ 
    (void)fprintf(stderr, "%s, %s(%d)\n", \ 
      msg, strerror(errno), errno); \ 
    (void)fflush(stderr);} 

const int parentsocket = 0; 
const int childsocket = 1; 

int disk_main(int comm_server[]); 
int server_main(int comm_disk[]); 

int main(int argc, char* argv[]) { 

    int status; // Status parameter used waitpid 

    // Communication sockets 
    int comm_server_disk[2]; 

    // Process Id's 
    pid_t proc_server; 
    pid_t proc_disk; 

    if (argc < 2) { 
    fprintf(stderr,"ERROR, no port provided\n"); 
    return EXIT_FAILURE; 
    } 

    proc_server = getpid(); 

    int socket; 

    // SOCK_DGRAM are connectionless as opposed to SOCK_STREAM or SOCK_SEQPACKET 
    CallErrExit(socketpair, (AF_UNIX, SOCK_DGRAM, 0, 
        comm_server_disk), EXIT_FAILURE); 

    CallErrExit(proc_disk = fork,(),EXIT_FAILURE); 

    if (proc_disk == 0) { 
    // Disk process 
    proc_disk = getpid(); 

    // TODO: Try the alternative, use the names '/proc/<process id>/fd/ 
    printf("disk_main started with sockets:\n" 
      "\tserver->disk: /proc/%d/fd/%d\n" 
      "\tdaup->disk: /proc/%d/fd/%d\n", 
      proc_disk, socket+1, 
      proc_disk, socket+3); 

    /* 
    * Closing comm_server_disk[1] before function entry 
    */ 
    close(comm_server_disk[childsocket]); // Write to server 

    return disk_main(comm_server_disk); 
    } 
    else { 

    // Server process, closes comm_server_disk[0] 
    server_main(comm_server_disk); 

    // Never reached 

    // calling process sets SIGCHLD to SIG_IGN, ECHILD is set 
    CallErrExit(waitpid, (proc_disk,&status,0), EXIT_FAILURE); 
    return EXIT_SUCCESS; 
    } 
} 

typedef struct internal_cmd { 
    char cmd[32]; 
} internal_cmd_t; 

enum e_cmd { 
    eExit = 0x01, 
}; 

// TODO: Some map between enums and cmd[32] 

int disk_main(int comm_server[]) { 

    char buf[1024]; 

    int select_width; 
    fd_set rfds, wfds, efds; 

    int next_select_width; 
    fd_set next_rfds, next_wfds, next_efds; 

    int n_fds; 

    struct timeval timeout = { 10, 0 }; 

    FD_ZERO(&next_rfds); 
    FD_ZERO(&next_wfds); 
    FD_ZERO(&next_efds); 

    FD_SET(comm_server[0], &next_rfds); 

    // Default: is blocking, but be sure 
    fcntl(comm_server[0], 
     F_SETFL, fcntl(comm_server[0], F_GETFL, 0) & ~O_NONBLOCK); 

    // Add other file descriptors 
    int ofd = comm_server[0]; 
    next_select_width = std::max(comm_server[0],ofd) + 1; 

    do { 
    // Update read/write state 
    select_width = next_select_width; 
    rfds = next_rfds; 
    wfds = next_wfds; 
    efds = next_efds; 

    // Wait for interrupt 
    n_fds = select(select_width,&rfds, &wfds, &efds, &timeout); 

    if (n_fds < 0) { 
     fprintf(stderr,"ERROR\n"); 
     exit(1); 
    } 

    if (FD_ISSET(comm_server[0], &wfds)) 
     printf("Disk process can write to server\n"); 

    if (FD_ISSET(comm_server[0], &rfds)) { 
     printf("Disk process received message from server\n"); 
     int rv = recv(comm_server[0], buf, sizeof(buf), MSG_DONTWAIT); 
     if (rv < 0) { 
     printf("Disk process - %s(%d)\n", strerror(errno), errno); 
     exit (1); 
     } 
     printf("Disk process received %d bytes:\n", rv); 
     // Interpret command 
     if (rv == sizeof(internal_cmd_t)) { 

     // Here interpret command 
     e_cmd cmd; 
     if (cmd == eExit) { 
      printf("Exiting\n"); 
      close(comm_server[0]); 
      return EXIT_SUCCESS; 
     } 
     } 
    } 

    FD_ZERO(&next_rfds); FD_ZERO(&next_wfds); FD_ZERO(&next_efds); 
    FD_SET(comm_server[0], &next_rfds); 

    fcntl(comm_server[0], F_SETFL, fcntl(comm_server[0], F_GETFL, 0) & ~O_NONBLOCK); 

    int ofd = comm_server[0]; 
    next_select_width = std::max(comm_server[0],ofd) + 1; 
    } 
    while (true); 

    close(comm_server[0]); 

    return EXIT_SUCCESS; 
} 
+0

большое спасибо. Я собираюсь проанализировать ваш код. Я еще не могу его продвинуть (у меня недостаточно репутации), но я думаю, что это будет поддержано другими. – MontyX

+0

Добро пожаловать. Это очень хорошо известный шаблон для многопроцессорной связи с использованием socketpair и выбор –

Смежные вопросы