2015-02-13 4 views
1

Я хочу реализовать инфраструктуру pub-sub для нашей распределенной системы. Идея сети, которую вы можете видеть на картинке, заключается в том, что я хочу реализовать издателя и подписчика в java. Но в JZmq кривая шифрования еще не поддерживается. Так что я хочу реализовать прокси в C (++), где он доступен (на данный момент у меня есть это только в Java)ZeroMQ два PUB-SUB Proxies

вот мой код

Subscriber.java:.

import java.nio.charset.Charset; 

import org.zeromq.ZMQ; 
import org.zeromq.ZMQ.Context; 
import org.zeromq.ZMQ.Socket; 

public class Subscriber { 
    public static void main(String[] args) { 
     String address = args[0]; 
     String topic = args[1]; 

     Context context = ZMQ.context(1); 
     Socket subscriber = context.socket(ZMQ.SUB); 
     subscriber.connect(address); 
     subscriber.subscribe(topic.getBytes()); 

     while (!Thread.currentThread().isInterrupted()) { 
      String top = subscriber.recvStr(Charset.defaultCharset()); 
      String contents = subscriber.recvStr(Charset.defaultCharset()); 

      System.out.println(top + ": " + contents); 
     } 
     subscriber.close(); 
     context.term(); 
    }  
} 

Publisher.java:

import java.util.Random; 

import org.zeromq.ZMQ; 
import org.zeromq.ZMQ.Context; 
import org.zeromq.ZMQ.Socket; 

public class Publisher { 
    public static void main(String[] args) { 
     String url = args[0]; 
     String topic = args[1]; 
     int intervall = Integer.valueOf(args[2]); 

     Context context = ZMQ.context(1); 
     Socket publisher = context.socket(ZMQ.PUB); 

     Random rand = new Random(); 
     publisher.connect(url); 
     while (!Thread.currentThread().isInterrupted()) { 
      try { 
       Thread.sleep(intervall); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      int value = rand.nextInt(20) * (rand.nextBoolean() ? (-1) : 1); 
      publisher.sendMore(topic); 
      publisher.send(String.valueOf(value)); 
      System.out.println("PUB: " + topic + ":" + value); 
     } 

     publisher.close(); 
     context.term(); 
    } 
} 

PubSubProxy.java:

import java.io.PrintStream; 

import org.zeromq.ZContext; 
import org.zeromq.ZFrame; 
import org.zeromq.ZMQ; 
import org.zeromq.ZMQ.Context; 
import org.zeromq.ZMQ.Socket; 
import org.zeromq.ZThread; 
import org.zeromq.ZThread.IAttachedRunnable; 

public class PubSubProxy { 
    static Socket frontend; 
    static Socket backend; 

    public static void main(String[] args) { 
     String addressSubscriber = args[0]; 
     String modeSubscriber = args[1]; 
     String addressPublisher = args[2]; 
     String modePublisher = args[3]; 

     // Prepare our context and sockets 
     // ZContext context = ZMQ.context(1); 
     ZContext context = new ZContext(); 

     // This is where the weather server sits 
     frontend = context.createSocket(ZMQ.XSUB); 
     if (modeSubscriber.equals("client")) { 
      System.out.println("Subscriber connecting to: " + addressSubscriber); 
      frontend.connect(addressSubscriber); 
     } else if (modeSubscriber.equalsIgnoreCase("server")) { 
      System.out.println("Subscriber binding to: " + addressSubscriber); 
      frontend.bind(addressSubscriber); 
     } 
     // This is our public endpoint for subscribers 
     backend = context.createSocket(ZMQ.XPUB); 

     if (modePublisher.equals("client")) { 
      System.out.println("Publisher connecting to: " + addressPublisher); 
      backend.connect(addressPublisher); 
     } else if (modePublisher.equalsIgnoreCase("server")) { 
      System.out.println("Publisher binding to: " + addressPublisher); 
      backend.bind(addressPublisher); 
     } 

     // Subscribe on everything 
     // frontend.subscribe("".getBytes()); 

     // Run the proxy until the user interrupts us 
     IAttachedRunnable runnable = new Listener(); 
     Socket listener = ZThread.fork(context, runnable); 
     ZMQ.proxy(frontend, backend, listener); 

     frontend.close(); 
     backend.close(); 
     context.destroy(); 
    } 

    private static class Listener implements IAttachedRunnable { 
     @Override 
     public void run(Object[] args, ZContext ctx, Socket pipe) { 
      // Print everything that arrives on pipe 
      while (true) { 
       ZFrame frame = ZFrame.recvFrame(pipe); 
       if (frame == null) 
        break; // Interrupted 
       System.out.println(frame.toString()); 
       frame.destroy(); 
      } 
     } 
    } 
} 

, как вы можете увидеть, что я добавил Слушатель к прокси-серверу, чтобы увидеть, если я получаю сообщение. На прокси-сервере издателя (верхний на картинке) я получаю сообщения, но ничего на другом прокси.

это так, как я выполнить мои приложения

#beaglebone #1 
#proxy #1 
java -Djava.library.path=/usr/local/lib -jar proxy.jar ipc:///tmp/pub server tcp://*:5555 server 
#pub 
java -Djava.library.path=/usr/local/lib -jar publisher.jar ipc:///tmp/pub temperature 10000 
java -Djava.library.path=/usr/local/lib -jar publisher.jar ipc:///tmp/pub humidity 1000 
java -Djava.library.path=/usr/local/lib -jar publisher.jar ipc:///tmp/pub testvar 5000 

#beaglebone #2 
#proxy #2 
java -Djava.library.path=/usr/local/lib -jar proxy.jar tcp://192.168.0.192:5555 client ipc:///tmp/sub server 
#sub 
java -Djava.library.path=/usr/local/lib -jar subscriber.jar ipc:///tmp/sub temperature 
java -Djava.library.path=/usr/local/lib -jar subscriber.jar ipc:///tmp/sub humidity 
java -Djava.library.path=/usr/local/lib -jar subscriber.jar ipc:///tmp/sub testvar 

pub-sub network

ответ

0

Вы, кажется, делает замешательства смешивания клиента шаблон/сервер с паба/суб шаблон.

В шаблоне pub/sub издатель уведомляет своего абонента (при его наличии). Издатель должен использовать bind (для прослушивания подписки), и абонент должен использовать connect (для запроса подписки).

Тогда ваши обмены становится: enter image description here

Для того, чтобы сделать это, вы могли бы:

  • изменить Publisher.java замена publisher.connect(url); с publisher.bind(url);
  • изменить PubSubProxy.java удаление бесполезных клиент/сервер аргумент
import java.io.PrintStream; 

import org.zeromq.ZContext; 
import org.zeromq.ZFrame; 
import org.zeromq.ZMQ; 
import org.zeromq.ZMQ.Context; 
import org.zeromq.ZMQ.Socket; 
import org.zeromq.ZThread; 
import org.zeromq.ZThread.IAttachedRunnable; 

public class PubSubProxy { 
    static Socket frontend; 
    static Socket backend; 

    public static void main(String[] args) { 
     String addressSubscriber = args[0]; 
     String addressPublisher = args[1]; 

     // Prepare our context and sockets 
     ZContext context = new ZContext(); 

     // This is where the weather server sits 
     frontend = context.createSocket(ZMQ.XSUB); 
     System.out.println("Subscriber connecting to: " + addressSubscriber); 
     frontend.connect(addressSubscriber); 

     // This is our public endpoint for subscribers 
     backend = context.createSocket(ZMQ.XPUB); 
     System.out.println("Publisher binding to: " + addressPublisher); 
     backend.bind(addressPublisher); 

     // Run the proxy until the user interrupts us 
     ZMQ.proxy(frontend, backend, null); 

     frontend.close(); 
     backend.close(); 
     context.destroy(); 
    } 
} 

Тогда вы должны быть в состоянии полученных данных с сервером для FRONTEND с помощью:

#beaglebone #1 
#proxy #1 
java -Djava.library.path=/usr/local/lib -jar proxy.jar ipc:///tmp/pub tcp://*:5555 
#pub 
java -Djava.library.path=/usr/local/lib -jar publisher.jar ipc:///tmp/pub temperature 10000 

#beaglebone #2 
#proxy #2 
java -Djava.library.path=/usr/local/lib -jar proxy.jar tcp://192.168.0.192:5555 ipc:///tmp/sub 
#sub 
java -Djava.library.path=/usr/local/lib -jar subscriber.jar ipc:///tmp/sub temperature 
+0

идея была, что прокси-туп, не нужно ничего знать о издателей. – user2071938

+0

@ user2071938 в предлагаемом ответе, прокси не знает больше, чем ваш вопрос. Он использует те же самые параметры. – mpromonet