Я пытаюсь интегрировать Camel-Kafka.
У меня есть две очереди:
queue1
и queue2
.Проблема интеграции Camel-Kafka
Есть три пути:
- Route1 помещает список из двух сообщений в
queue1
(Он должен сделать это только один раз ). - route2 читает список из
queue1
, разбивает его, и помещает отдельные сообщения вqueue2
- Route3 читает сообщения из
queue2
и просто выводит его.
Код выглядит следующим образом:
import java.util.ArrayList;
import java.util.List;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
public class CamelListTest {
public static void main(String[] args) throws Exception {
CamelContext context = new DefaultCamelContext();
context.addRoutes(new CamelListRoute());
context.start();
Thread.sleep(30000);
context.stop();
}
}
class CamelListRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
//Route1, expected to run once
from("timer://timerName?repeatCount=1").process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
List<String> inOrderList = new ArrayList<String>();
inOrderList.add("1");
inOrderList.add("2");
exchange.getIn().setBody(inOrderList, ArrayList.class);
}
})
.to("kafka:<ip>:9092?topic=queue1");
//Route2
from("kafka:<ip>:9092?topic=queue1&groupId=testing&autoOffsetReset=latest&consumersCount=1")
.split()
.body().process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("2nd Route : " + (exchange.getIn().getBody().toString()));
}
})
.to("kafka:<ip>:9092?topic=queue2");
//Route3
from("kafka:<ip>:9092?topic=queue2&groupId=testing&autoOffsetReset=latest&consumersCount=1")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("3rd Route : " + (exchange.getIn().getBody().toString()));
}
});
}
}
Это не работает, как ожидалось, и есть несколько вопросов, наблюдавшиеся:
- Первый маршрут, который, как ожидается, запустить только один раз (repeatCount = 1), работает непрерывно, помещая то же сообщение в
queue1
снова и снова. - Второй маршрут читает сообщения из
queue1
, разбивает его, но не ставит его вqueue2
- Поскольку второй маршрут ничего не поставить в
queue2
, этот маршрут не получает никаких сообщений.
Может ли кто-нибудь помочь мне выяснить, что здесь не так?
Да, я правильно использовал URL. Проблема была решена после использования exchange.getOut(). SetBody() для передачи данных на следующий маршрут. – rvd
Ох, я думаю, вы можете опубликовать ответ. Это будет полезно для других. Благодарю. –