2016-02-18 3 views
0

Я новичок в Apache Camel. В hp nonstop есть приемник, который принимает события, генерируемые менеджером событий, как потоки. Моя цель - настроить конечную точку потребителя, которая получает входящее сообщение и обрабатывает его через Camel.Создание Apache Camel Потребительский компонент

Другая конечная точка Мне просто нужно написать ее в журналах. Из моего исследования я понял, что для конечной точки потребителя нужно создать собственный компонент и конфигурацию будет как

from("myComp:receive").to("log:net.javaforge.blog.camel?level=INFO") 

Вот мой фрагмент кода, который получает сообщение от системы событий.

Receive receive = com.tandem.ext.guardian.Receive.getInstance(); 
    byte[] maxMsg = new byte[500]; // holds largest possible request 
    short errorReturn = 0; 
    do { // read messages from $receive until last close 
     try { 
      countRead = receive.read(maxMsg, maxMsg.length); 
      String receivedMessage=new String(maxMsg, "UTF-8"); 
      //Here I need to handover receivedMessage to camel 

     } catch (ReceiveNoOpeners ex) { 
      moreOpeners = false; 
     } catch(Exception e) { 
      moreOpeners = false; 
     } 
    } while (moreOpeners); 

Может кто-нибудь посоветует с некоторыми подсказками, как сделать это как Потребителя.

ответ

1

10'000 футов взгляд это:

Вы должны начать с реализации компонента. Самый простой способ начать - это продлить org.apache.camel.impl.DefaultComponent. Единственное, что вам нужно сделать, это переопределить DefaultComponent::createEndpoint(..). Совершенно очевидно, что именно это создает вашу конечную точку.

Итак, следующее, что вам нужно, это реализовать свою конечную точку. Расширьте org.apache.camel.impl.DefaultEndpoint для этого. Переопределите минимум DefaultEndpoint::createConsumer(Processor), чтобы создать своего собственного потребителя.

Последнее, но не менее важное: вам необходимо реализовать потребителя. Опять же, лучше всего расширить org.apache.camel.impl.DefaultConsumer. Потребитель - это место, куда должен идти ваш код, который генерирует ваши сообщения. Через конструктор вы получаете ссылку на свою конечную точку. Используйте ссылку на конечную точку, чтобы создать новый Exchange, заполнить его и отправить по пути по маршруту. Что-то вдоль линий

Exchange ex = endpoint.createExchange(ExchangePattern.InOnly); 
setMyMessageHeaders(ex.getIn(), myMessagemetaData); 
setMyMessageBody(ex.getIn(), myMessage); 

getAsyncProcessor().process(ex, new AsyncCallback() { 
    @Override 
    public void done(boolean doneSync) { 
     LOG.debug("Mssage was processed " + (doneSync ? "synchronously" : "asynchronously")); 
    } 
}); 

Я рекомендую вам выбрать простой компонент (DirectComponent?) В качестве примера для подражания.

+0

Спасибо за ваш ответ, я создал MessageComponent, MessageEndpoint, MessageProducer и MessageConsumer. 'MessageConsumer' расширяет' DefaultConsumer'. Не удалось найти способ для обработки моего сообщения. Нужно ли добавлять его в конструктор? – vels4j

+0

Переопределите 'doStart()' и 'doStop()' методы 'DefaultConsumer' для запуска/остановки вашей подписки/опроса внешних сообщений. В моем случае я применил метод обратного вызова у своего потребителя, который получает вызов, когда я получаю внешнее сообщение. В этом я создаю заголовки и тело и устанавливаю его на новый «обмен», как показано выше, и сообщение отправляется по пути вниз по маршруту. – Ralf

+0

Совершено, отлично работает, спасибо – vels4j

0

При этом добавление моего собственного потребительского компонента может помочь кому-то.

public class MessageConsumer extends DefaultConsumer { 

private final MessageEndpoint endpoint; 

private boolean moreOpeners = true; 

public MessageConsumer(MessageEndpoint endpoint, Processor processor) { 
    super(endpoint, processor); 
    this.endpoint = endpoint; 

} 


@Override 
protected void doStart() throws Exception { 

    int countRead=0; // number of bytes read 

    do { 
     countRead++; 
      String msg = String.valueOf(countRead)+" "+System.currentTimeMillis(); 
      Exchange ex = endpoint.createExchange(ExchangePattern.InOnly); 
      ex.getIn().setBody(msg); 
      getAsyncProcessor().process(ex, new AsyncCallback() { 
       @Override 
       public void done(boolean doneSync) { 
        log.info("Mssage was processed " + (doneSync ? "synchronously" : "asynchronously")); 
       } 
      }); 
      // This is an echo server so echo request back to requester  

    } while (moreOpeners); 
} 

@Override 
protected void doStop() throws Exception { 
    moreOpeners = false; 
    log.debug("Message processor is shutdown"); 
} 

} 
Смежные вопросы