У меня есть несколько клиентов, отправляющих файлы на сервер. Для одного набора данных есть два файла, которые содержат информацию об этих данных, каждая с тем же именем. Когда файл получен, сервер отправляет сообщение в мою очередь, содержащую путь к файлу, имя файла, идентификатор клиента и «тип» файла (он имеет одинаковое расширение файла, но есть два типа, «назовите их A и B).Использование верблюда для агрегирования сообщений одного и того же заголовка
Два файла для одного набора данных имеют одинаковое имя файла. Как только сервер получил оба файла, мне нужно запустить программу, которая объединяет эти два. В настоящее время у меня есть что-то, что выглядит следующим образом:
from("jms:queue.name").aggregate(header("CamelFileName")).completionSize(2).to("exec://FILEPATH?args=");
Где я застрял на заголовок («CamelFileName»), и более конкретно, как агрегатор работает.
С установленным значением CompleSize 2 оно просто всасывает все сообщения и сохраняет их в некоторой структуре данных до тех пор, пока не появится второе сообщение, соответствующее первому? Кроме того, отвечает ли заголовок() конкретное значение? У меня несколько клиентов, поэтому я думал о наличии идентификатора клиента и имени файла в заголовке, но опять же я не знаю, должен ли я давать определенное значение. Я также не знаю, могу ли я использовать регулярное выражение или нет.
Любые идеи или советы были бы очень полезны. Спасибо
EDIT: Вот код, который у меня есть. Основываясь на моем описании проблемы здесь и в комментариях к выбранному ответу, кажется ли он точным (помимо закрытых скобок, которые я не копировал)?
public static void main(String args[]) throws Exception{
CamelContext c = new DefaultCamelContext();
c.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
//ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
//c.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
c.addRoutes(new RouteBuilder() {
public void configure() {
from("activemq:queue:analytics.camelqueue").aggregate(new MyAggregationStrategy()).header("subject").completionSize(2).to("activemq:queue:analytics.success");
}
});
c.start();
while (true) {
System.out.println("Waiting on messages to come through for camel");
Thread.sleep(2 * 1000);
}
//c.stop();
}
private static class MyAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null)
return newExchange;
// and here is where combo stuff goes
String oldBody = oldExchange.getIn().getBody(String.class);
String newBody = newExchange.getIn().getBody(String.class);
boolean oldSet = oldBody.contains("set");
boolean newSet = newBody.contains("set");
boolean oldFlow = oldBody.contains("flow");
boolean newFlow = newBody.contains("flow");
if ((oldSet && newFlow) || (oldFlow && newSet)) {
//they match so return new exchange with info so extractor can be started with exec
String combined = oldBody + "\n" + newBody + "\n";
newExchange.getIn().setBody(combined);
return newExchange;
}
else {
// no match so do something....
return null;
}
}
}
Так AggregationStrategy используется для объединения сообщений каким-то образом, которые затем могут быть переданы «exec»? Что вы подразумеваете под «обоими биржами»? Я только начал использовать Camel вчера, поэтому все по-прежнему для меня совершенно новое. – thaweatherman
np, добро пожаловать в Camel ... и Aggregator - довольно сложный/мощный инструмент ... короче говоря, Exchange обертывает сообщение (из вашей очереди и т. Д.), , поэтому, если вы ожидаете 2 сообщения (соотнесенные с именем файла), вы получите 2 обмена, которые сгруппированы вместе ... тогда вам нужно вытащить соответствующие данные из них, чтобы передать exec (имя файла, идентификаторы клиента, и т. д.) ... –
Хотя имена файлов могут быть одинаковыми от разных клиентов, так что я могу сопоставить ID и имя файла? – thaweatherman