2013-03-11 3 views
0

Я не могу получить сообщение, сериализованное в protobuf через сокеты ZeroMQ, используя C. У меня есть сериализованное сообщение, введенное клиентом, и отправьте этот буфер на сервер, используя функцию s_send(), определенную в zhelpers.h. Код сервера - это тот же тестовый код, который поставляется вместе с пакетом zeromq в качестве примера.Общайтесь с zeromq с серийными сообщениями protobuf

Вот мой клиент сторона:

#include "amessage.pb-c.h" 
#include "zhelpers.h" 

int main (void) 
{ 

    AMessage msg = AMESSAGE__INIT; // AMessage 
    void *buf;      // Buffer to store serialized data 
    unsigned len; 

    printf ("Connecting to server...\n"); 
    void *context = zmq_ctx_new(); 
    void *requester = zmq_socket (context, ZMQ_REQ); 

    char buffer[256] = ""; 

    printf("[client] :"); 
    scanf("%s", buffer); 

    msg.csmsg = buffer; 
    len = amessage__get_packed_size(&msg);   
    buf = malloc(len); 

    printf("[client]: pack msg len : %d\n ", len); 
    printf("Sent msg : %d\n", buf); 

    amessage__pack(&msg,buf); 

    s_send(requester, buf); 

    zmq_close (requester); 
    zmq_ctx_destroy (context); 
    return 0; 
} 

И на стороне сервера:

#include "zhelpers.h" 
#include <pthread.h> 
#include <stdlib.h> 
#include "amessage.pb-c.h" 

#define MAX_MSG_SIZE 256 

static size_t read_buffer (unsigned max_length, unsigned char *out) 
{ 
    size_t cur_len = 0, nread; 
    uint8_t c; 
    while ((nread=fread(out + cur_len, 1, max_length - cur_len, stdin)) != 0) 
    { 
     cur_len += nread; 
     if (cur_len == max_length) 
     { 
      fprintf(stderr, "[server]: max message length exceeded\n"); 
      exit(1); 
     } 
    } 
    return cur_len; 
} 


static void * worker_routine (void *context) 
{ 

    AMessage *msg; 

    uint8_t buf[MAX_MSG_SIZE]; 
    char buffer[256]; 

    // Socket to talk to dispatcher 
    void *receiver = zmq_socket (context, ZMQ_REP); 
    zmq_connect (receiver, "inproc://workers"); 

    while (1) { 

      uint8_t *string = s_recv (receiver); 

     if(string == 0) 
      printf("[server]: Error: In receiving msg.\n"); 
     else 
     { 

     size_t msg_len = read_buffer (MAX_MSG_SIZE, string); 
     printf("[server]: client msg len is: %d.\n", msg_len); 
     msg = amessage__unpack(NULL, msg_len, string); 
     if (msg == NULL) 
     { 
      fprintf(stderr, "[server]: error unpacking incoming message\n"); 
      exit(1); 
     } 
     printf ("[client]: %s \n", msg->csmsg); 

    } 
    amessage__free_unpacked(msg, NULL); 
    free (string); 
    // Do some 'work' 
    sleep (1); 

    } 
    zmq_close (receiver); 
    return NULL; 
} 

    int main (void) 
    { 
    void *context = zmq_ctx_new(); 
    void *clients = zmq_socket (context, ZMQ_ROUTER); 
    zmq_bind (clients, "tcp://*:5555"); 

    void *workers = zmq_socket (context, ZMQ_DEALER); 
    zmq_bind (workers, "inproc://workers"); 

    // Launch pool of worker threads 
    int thread_nbr; 
    for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) { 
     pthread_t worker; 
     pthread_create (&worker, NULL, worker_routine, context); 
    } 
    // Connect work threads to client threads via a queue proxy 
    zmq_proxy (clients, workers, NULL); 

    zmq_close (clients); 
    zmq_close (workers); 

    zmq_ctx_destroy (context); 
    return 0; 
    } 

Любая идея, что я делаю неправильно?

ответ

2

Вы используете s_send(), который ожидает строку C в качестве аргумента и вызывает strlen(), чтобы определить его размер. Однако данные буферов протокола являются двоичными данными и могут содержать нулевые байты в любом месте сообщения.

Вместо этого используйте zmq_send() и укажите длину сообщения функции zmq_msg_init_size().

+0

: Спасибо за ответ. Тем не менее, я получаю исключение на сервере на memcpy(). 'zmq_msg_recv (& msg, receiver, 0); memcpy (rBuf, zmq_msg_data (& msg), zmq_msg_size (&msg));), однако zmq_msg_size одинаково на обоих концах. Любая идея, что я делаю неправильно в получении данных сообщения? – Sam

+0

Какое исключение? Напишите в неверную память? Проверьте, на что указывает rBuf. В противном случае задайте новый вопрос для новой проблемы, так как ваш код, похоже, немного изменился. – jpa

Смежные вопросы