2013-06-21 5 views
1

У меня есть несколько клиентов, отправляющих файлы на сервер. Для одного набора данных есть два файла, которые содержат информацию об этих данных, каждая с тем же именем. Когда файл получен, сервер отправляет сообщение в мою очередь, содержащую путь к файлу, имя файла, идентификатор клиента и «тип» файла (он имеет одинаковое расширение файла, но есть два типа, «назовите их A и B).Использование верблюда для агрегирования сообщений одного и того же заголовка

Два файла для одного набора данных имеют одинаковое имя файла. Как только сервер получил оба файла, мне нужно запустить программу, которая объединяет эти два. В настоящее время у меня есть что-то, что выглядит следующим образом:

from("jms:queue.name").aggregate(header("CamelFileName")).completionSize(2).to("exec://FILEPATH?args="); 

Где я застрял на заголовок («CamelFileName»), и более конкретно, как агрегатор работает.

С установленным значением CompleSize 2 оно просто всасывает все сообщения и сохраняет их в некоторой структуре данных до тех пор, пока не появится второе сообщение, соответствующее первому? Кроме того, отвечает ли заголовок() конкретное значение? У меня несколько клиентов, поэтому я думал о наличии идентификатора клиента и имени файла в заголовке, но опять же я не знаю, должен ли я давать определенное значение. Я также не знаю, могу ли я использовать регулярное выражение или нет.

Любые идеи или советы были бы очень полезны. Спасибо

EDIT: Вот код, который у меня есть. Основываясь на моем описании проблемы здесь и в комментариях к выбранному ответу, кажется ли он точным (помимо закрытых скобок, которые я не копировал)?

public static void main(String args[]) throws Exception{ 
     CamelContext c = new DefaultCamelContext(); 
     c.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false")); 
     //ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); 
     //c.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); 
     c.addRoutes(new RouteBuilder() { 
      public void configure() { 
       from("activemq:queue:analytics.camelqueue").aggregate(new MyAggregationStrategy()).header("subject").completionSize(2).to("activemq:queue:analytics.success"); 
      } 
     }); 
     c.start(); 
     while (true) { 
      System.out.println("Waiting on messages to come through for camel"); 
      Thread.sleep(2 * 1000); 
     } 
     //c.stop(); 
    } 

    private static class MyAggregationStrategy implements AggregationStrategy { 

     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 
      if (oldExchange == null) 
       return newExchange; 
      // and here is where combo stuff goes 
      String oldBody = oldExchange.getIn().getBody(String.class); 
      String newBody = newExchange.getIn().getBody(String.class); 
      boolean oldSet = oldBody.contains("set"); 
      boolean newSet = newBody.contains("set"); 
      boolean oldFlow = oldBody.contains("flow"); 
      boolean newFlow = newBody.contains("flow"); 
      if ((oldSet && newFlow) || (oldFlow && newSet)) { 
       //they match so return new exchange with info so extractor can be started with exec 
       String combined = oldBody + "\n" + newBody + "\n"; 
       newExchange.getIn().setBody(combined); 
       return newExchange; 
      } 
      else { 
       // no match so do something.... 
       return null; 
      } 
     } 
    } 

ответ

3

вы должны поставлять AggregationStrategy, чтобы определить, как вы хотите совместить куплю ...

, если вы заинтересованы только в FileName и получать ровно 2 обменов, то вы можете просто использовать UseLatestAggregationStrategy для всего передайте новейший Exchange через один раз, «2» были «агрегированы» ...

, который сказал, что вам нужно сохранить оба обмена (по одному для каждого clientId), чтобы вы могли передать эту информацию на этап «exec» ... если это так, вы можете просто объединить биржи в держатель GroupedExchange, используя встроенную стратегию агрегации enab управляемый с помощью опции groupExchanges ... или специфической пользовательской AggregationStrategy, чтобы объединить их, как вам хотелось бы. просто нужно иметь в виду, что ваш «Exec» шаг должен обрабатывать любой агрегированная структура вы решили использовать ...

увидеть эти модульные тесты для примеров:

https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTest.java

https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java

+0

Так AggregationStrategy используется для объединения сообщений каким-то образом, которые затем могут быть переданы «exec»? Что вы подразумеваете под «обоими биржами»? Я только начал использовать Camel вчера, поэтому все по-прежнему для меня совершенно новое. – thaweatherman

+0

np, добро пожаловать в Camel ... и Aggregator - довольно сложный/мощный инструмент ... короче говоря, Exchange обертывает сообщение (из вашей очереди и т. Д.), , поэтому, если вы ожидаете 2 сообщения (соотнесенные с именем файла), вы получите 2 обмена, которые сгруппированы вместе ... тогда вам нужно вытащить соответствующие данные из них, чтобы передать exec (имя файла, идентификаторы клиента, и т. д.) ... –

+0

Хотя имена файлов могут быть одинаковыми от разных клиентов, так что я могу сопоставить ID и имя файла? – thaweatherman