2016-11-28 4 views
0

Я пытаюсь интегрировать Camel-Kafka.
У меня есть две очереди:
queue1 и queue2.Проблема интеграции Camel-Kafka

Есть три пути:

  1. Route1 помещает список из двух сообщений в queue1 (Он должен сделать это только один раз ).
  2. route2 читает список из queue1, разбивает его, и помещает отдельные сообщения в queue2
  3. Route3 читает сообщения из queue2 и просто выводит его.

Код выглядит следующим образом:

import java.util.ArrayList; 
import java.util.List; 

import org.apache.camel.CamelContext; 
import org.apache.camel.Exchange; 
import org.apache.camel.Processor; 
import org.apache.camel.builder.RouteBuilder; 
import org.apache.camel.impl.DefaultCamelContext; 

public class CamelListTest { 
    public static void main(String[] args) throws Exception { 
     CamelContext context = new DefaultCamelContext(); 
     context.addRoutes(new CamelListRoute()); 
     context.start(); 
     Thread.sleep(30000); 
     context.stop(); 
    } 
} 

class CamelListRoute extends RouteBuilder { 
    @Override 
    public void configure() throws Exception { 


     //Route1, expected to run once 
     from("timer://timerName?repeatCount=1").process(new Processor() { 
      @Override 
      public void process(Exchange exchange) throws Exception { 
       List<String> inOrderList = new ArrayList<String>(); 
       inOrderList.add("1"); 
       inOrderList.add("2"); 
       exchange.getIn().setBody(inOrderList, ArrayList.class); 
      } 
     }) 
     .to("kafka:<ip>:9092?topic=queue1"); 


     //Route2 
     from("kafka:<ip>:9092?topic=queue1&groupId=testing&autoOffsetReset=latest&consumersCount=1") 
     .split() 
     .body().process(new Processor() { 
      @Override 
      public void process(Exchange exchange) throws Exception { 
       System.out.println("2nd Route : " + (exchange.getIn().getBody().toString())); 
      } 
     }) 
     .to("kafka:<ip>:9092?topic=queue2"); 


     //Route3 
     from("kafka:<ip>:9092?topic=queue2&groupId=testing&autoOffsetReset=latest&consumersCount=1") 
     .process(new Processor() { 
      @Override 
      public void process(Exchange exchange) throws Exception { 
       System.out.println("3rd Route : " + (exchange.getIn().getBody().toString())); 
      } 
     }); 
    } 
} 

Это не работает, как ожидалось, и есть несколько вопросов, наблюдавшиеся:

  1. Первый маршрут, который, как ожидается, запустить только один раз (repeatCount = 1), работает непрерывно, помещая то же сообщение в queue1 снова и снова.
  2. Второй маршрут читает сообщения из queue1, разбивает его, но не ставит его в queue2
  3. Поскольку второй маршрут ничего не поставить в queue2, этот маршрут не получает никаких сообщений.

Может ли кто-нибудь помочь мне выяснить, что здесь не так?

ответ

0

Я вижу несколько вещей:

  1. Я надеюсь, что вы даете Кафка Url как это: "Кафка: // локальный: 9092 тема = queue1"

Примечание: Кафка: //

  1. Предоставление URL-адресов zookeeper для потребителей, например: kafka: //?тема = queue1 & zookeeperConnect = & consumerStreams = 1 & идентификатор_группа = тестирование & autoOffsetReset = крупнейшего

  2. Обратите внимание на предыдущем значении autoOffsetReset точки будет по величине или маленького вместо последней.

+0

Да, я правильно использовал URL. Проблема была решена после использования exchange.getOut(). SetBody() для передачи данных на следующий маршрут. – rvd

+0

Ох, я думаю, вы можете опубликовать ответ. Это будет полезно для других. Благодарю. –

0

Я думаю, что вы обменяете сообщение.

процессор сделать что-то вроде:

exchng.getOut() SetHeader ("тип", "очередь");. exchng.getOut(). SetBody (exchng.getIn(). GetBody());

Тогда можно было бы добавить выбор на втором маршруте, не требуя третьего маршрута.

Смежные вопросы