2016-06-13 4 views
1

Мне действительно нужен ответ на этот вопрос, поэтому я его редактирую.Java Qpid Proton - брокер ActiveMQ Невозможно назначить запрошенный адрес: bind

У меня есть брокер Apache ActiveMQ построен в моей связи с использованием этого кода

Broker.java

общественного класса Брокер {

private BrokerService broker; 

public Broker(String connector) { 
    this.broker = new BrokerService(); 
    this.broker.setUseJmx(true); 
    try { 
     this.broker.addConnector(connector); 

    } catch (URISyntaxException e) { 
     e.printStackTrace(); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
} 

public void addConnector(String connector){ 
    try { 
     this.broker.addConnector(connector); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
} 

public void start() { 
    try { 
     this.broker.start(); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
} 

public BrokerService getBroker() { 
    return broker; 
} 

public void setBroker(BrokerService broker) { 
    this.broker = broker; 
} 

}

Вот моя проблема

Я использую библиотеку Qpid Proton (см. Здесь: Qpid Proton). У меня есть один класс, чтобы прочитать данные, которые почти пример они дают вам на qpid Webiste

package messaging; 

import java.io.IOException; 

import org.apache.qpid.proton.Proton; 
import org.apache.qpid.proton.amqp.messaging.AmqpValue; 
import org.apache.qpid.proton.engine.BaseHandler; 
import org.apache.qpid.proton.engine.Delivery; 
import org.apache.qpid.proton.engine.Event; 
import org.apache.qpid.proton.engine.Receiver; 
import org.apache.qpid.proton.message.Message; 
import org.apache.qpid.proton.reactor.FlowController; 
import org.apache.qpid.proton.reactor.Handshaker; 

public class AMQPSubscriber extends BaseHandler { 

    private String broker; 
    private String topic; 
    private String port; 

    public AMQPSubscriber(String broker, String port, String topic) { 
     this.broker = broker; 
     this.port = port; 
     this.topic = topic; 
     this.add(new Handshaker()); 
     this.add(new FlowController()); 

    } 

    @Override 
    public void onReactorInit(Event event) { 
     try { 
      event.getReactor().acceptor(broker, Integer.parseInt(port), new AMQPSubscriber(broker, port, topic)); 
     } catch (NumberFormatException e) { 
      e.printStackTrace(); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
    } 

    @Override 
    public void onDelivery(Event event) { 
     System.out.println("---------Message Received--------"); 
     Receiver recv = (Receiver) event.getLink(); 
     Delivery delivery = recv.current(); 
     if (delivery.isReadable() && !delivery.isPartial()) { 
      int size = delivery.pending(); 
      byte[] buffer = new byte[size]; 
      int read = recv.recv(buffer, 0, buffer.length); 
      recv.advance(); 

      Message msg = Proton.message(); 
      msg.decode(buffer, 0, read); 
      System.out.println("Subject : " + msg.getProperties().getSubject()); 
      System.out.println("Text : " + ((AmqpValue) msg.getBody()).getValue()); 
     } 
    } 

} 

Этот класс называется в главном:

public static void main (String[]args) throws IOException, TimeoutException, InterruptedException{ 

    Broker broker = new Broker("amqp://" + host + ":" + AMQPport); 
    broker.start(); 

AMQPSubscriber receiv = new AMQPSubscriber(host, "5672", topic); 
     Reactor r; 
     try { 
      r = Proton.reactor(receiv); 
      r.run(); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 


} 

Но когда я исполняю этот код, я получаю

INFO | Loaded the Bouncy Castle security provider. 
INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\Users\alexi\Documents\workspace-sts-3.7.3.RELEASE\IOT\activemq-data\localhost\KahaDB] 
INFO | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi 
INFO | KahaDB is version 6 
INFO | Recovering from the journal @1:61115 
INFO | Recovery replayed 11 operations from the journal in 0.014 seconds. 
INFO | PListStore:[C:\Users\alexi\Documents\workspace-sts-3.7.3.RELEASE\IOT\activemq-data\localhost\tmp_storage] started 
INFO | Apache ActiveMQ 5.13.3 (localhost, ID:DESKTOP-UK0JIC4-52783-1467025817901-0:1) is starting 
INFO | Listening for connections at: amqp://127.0.0.1:5672 
INFO | Connector amqp://127.0.0.1:5672 started 
INFO | Apache ActiveMQ 5.13.3 (localhost, ID:DESKTOP-UK0JIC4-52783-1467025817901-0:1) started 
INFO | For help or more information please see: http://activemq.apache.org 
WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: C:\Users\alexi\Documents\workspace-sts-3.7.3.RELEASE\IOT\activemq-data\localhost\KahaDB only has 7792 mb of usable space. - resetting to maximum available disk space: 7792 mb 
WARN | Temporary Store limit is 51200 mb (current store usage is 0 mb). The data directory: C:\Users\alexi\Documents\workspace-sts-3.7.3.RELEASE\IOT\activemq-data\localhost only has 7792 mb of usable space. - resetting to maximum available disk space: 7792 mb 
java.net.BindException: Address already in use: bind 
    at sun.nio.ch.Net.bind0(Native Method) 
    at sun.nio.ch.Net.bind(Unknown Source) 
    at sun.nio.ch.Net.bind(Unknown Source) 
    at sun.nio.ch.ServerSocketChannelImpl.bind(Unknown Source) 
    at java.nio.channels.ServerSocketChannel.bind(Unknown Source) 
    at org.apache.qpid.proton.reactor.impl.AcceptorImpl.<init>(AcceptorImpl.java:102) 
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.acceptor(ReactorImpl.java:477) 
    at messaging.AMQPSubscriber.onReactorInit(AMQPSubscriber.java:33) 
    at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:209) 
    at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108) 
    at org.apache.qpid.proton.engine.impl.EventImpl.delegate(EventImpl.java:129) 
    at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:114) 
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:307) 
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:275) 
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.run(ReactorImpl.java:343) 
    at messaging.Main.main(Main.java:98) 

Этот брокер отлично работает, когда я использую MQTT и Paho, мне бы хотелось, чтобы он также работал с AMQP. Я знаю, что bind означает, что порт уже используется, но я не могу понять, как я мог бы прослушивать порт, где не отправляются данные.

Спасибо, что помогли мне.

Alexi

+0

«192.168.100.47» должен быть вашим местным IP-адресом? Если да, то уверены ли вы? Также проверьте файл 'hosts', чтобы узнать, есть ли какая-либо запись для' localhost', кроме '127.0.0.1' – Bohemian

+0

. Этот IP-адрес - это другой адрес компьютера, который является сетью. –

ответ

0

Я нашел решение.

Когда я добавляю соединитель к встроенному брокеру activeMQ, он добавляет его как TCP, который позволяет только одно соединение одновременно.

Так создать соединитель как UDP, как это: broker.addConnector("udp://"+host+":"+AMQPport);

Это решение работает для меня, я надеюсь, что это может помочь другим разработчикам в будущем.

Cheers, Alexi

0

Вы получаете эту ошибку, когда есть другое приложение, которое создает сокет-сервер прослушивает порт назначения вы запросили. Вам нужно проверить, что вы не используете другой экземпляр брокера на этом порту или не используете какой-либо брандмауэр, блокирующий доступ.

+0

Я пробовал использовать 'netstat -a -n -o | найти «5672» '' и когда моя программа будет завершена, у меня ничего не осталось. Я также проверил свой брандмауэр, который разрешает соединения на нем. И единственный процесс, использующий этот порт в моем приложении, - это мой встроенный брокер, который, я думаю, является обязательным для транспорта AMQP. Я предполагаю, что ошибка где-то еще –