2014-10-09 3 views
4

Основная проблема заключается в том, что я могу обрабатывать только 7000 сообщений из одной из моих очередей (через все машины) в течение одного часа. Я не вижу способа сделать это с помощью верблюда или activemq, поэтому я прибегал к реализации собственной логики остановки маршрута/запуска. Я вижу несколько способов сделать это, и я попробовал несколько из них (только для того, чтобы столкнуться с проблемами).Каков правильный способ «приостановить» маршрут с помощью верблюда с activemq?

  1. camelContext.stopRoute(route): Это работает в том, что сообщения перестают быть обработано, но когда я называю camelContext.startRoute(route), это просачивается соединение TCP, который в конечном итоге приводит к тому, ActiveMQ сервера, чтобы ударить свой предел и умереть.
  2. camelContext.suspendRoute(route): Это также предотвращает обработку сообщений и не вызывает утечки соединений, но, похоже, он убивает активных потребителей (видимых на панели администратора), которые не активируются при вызове camelContext.resumeRoute(route). Я думаю, что в конечном итоге это может привести к тому, что никакие сообщения не будут обработаны из этой очереди вообще, даже если я возобновляю.
  3. Осуществление пользовательских RoutePolicy. Справедливости ради, я еще не пробовал этого, но кажется, что он станет жертвой тех же проблем, которые у меня были в соответствии с методом паузы, который я выбрал выше.

Есть ли способ решения этой проблемы, с которым я еще не сталкивался?

+0

Политика маршрутизации клиентов будет такой, как я бы это сделал. Когда вы говорите «в соответствии с методом паузы, который я выбрал выше», я подумал о политике маршрута, которую вы только что назвали stopConsumer() и startConsumer(), как это делает ThrottlingInflightRoutePolicy. –

+0

Помог бы «троллей»? http://camel.apache.org/throttler.html – vikingsteve

+0

@vikingsteve Мне нужно дросселировать уровень очереди на всех машинах, которые обрабатывают эту очередь. Указанное дросселирование ограничивает обработку только одной машины. – Denise

ответ

3

Вместо того, чтобы останавливать маршрут, я бы рекомендовал использовать Throttler EIP.

from("jms:queue:inbox") 
    .throttle(7000) 
    .timePeriodMillis(1000*60*60) 
    .to("log:result", "mock:result"); 

В приведенном выше примере будет дросселировать сообщения, полученные на jms:queue:inbox перед отправкой mock:result гарантируя, что максимум 7000 сообщений посылается в любом окне 1 часов.

Кроме того, для более мелкозернистого управления вы можете определить политику дросселирования маршрута, как показаны на throttling example верблюжьих:

<route routePolicyRef="myPolicy"> 
    <from uri="jms:queue:inbox"/> 
    <transacted/> 
    <to uri="log:+++JMS +++?groupSize=100"/> 
    <to ref="foo"/> 
</route> 

Дроссельная полиция определяются следующим образом:

<bean id="myPolicy" class="org.apache.camel.impl.ThrottlingInflightRoutePolicy"> 
    <property name="scope" value="Context"/> 
    <!-- when we hit > 20 inflight exchanges then kick in and suspend the routes --> 
    <property name="maxInflightExchanges" value="20"/> 
    <!-- when we hit lower than 10% of the max = 2 then kick in and resume the routes the default percentage is 70% but in this demo we want a low value --> 
    <property name="resumePercentOfMax" value="10"/> 
    <!-- output throttling activity at WARN level --> 
    <property name="loggingLevel" value="WARN"/> 
</bean> 

EDIT 1:

Если вам требуется глобальное регулирование, вы можете сначала позволить один пользователь читает сообщения, регулируя все сообщения, как описано выше, а затем пересылает их в другую очередь и позволяет перечитывать и обрабатывать их по > = 1 распределенных потребителей.

EDIT 2:

В качестве альтернативы, вы можете реализовать свой собственный ThrottlingInflightRoutePolicy доступ к центральной базе данных, проведение обработки информации. Таким образом, вам не нужен «главный дроссель» с одним узлом. Однако также БД может быть одной точкой отказа.

+0

Можно ли применить политику дросселирования в контекстах/машинах? В противном случае существует потенциально одна точка отказа. – vikingsteve

+0

@vikingsteve Спасибо за вопрос. См. Мой * EDIT 2 *, как это можно было бы обработать. –

+0

@Peter Интересно. Я не считал, что один из потребителей делает дросселирование, но это приведет к большим накладным расходам, чтобы управлять асимметричными потребителями. Я не знаю, как ThrottlingInflightRoutePolicy реализует маршруты остановки/запуска, которые отличаются от camelContext.startRoute() или camelContext.resumeRoute(), которые я упомянул в моем вопросе, но поскольку все выступили так сильно в пользу этого, я Я сделаю это! – Denise

0

У Петя есть лучший ответ (ы), но в итоге я получил ThrottlingInflightRoutePolicy, и нет никакого объяснения, как это работает, поэтому я подумал, что немного подкрепил бы этот вопрос и покажу, как я действительно решил проблему.

public class MyRoutePolicy extends RoutePolicySupport implements CamelContextAware { 

    private CamelContext camelContext; 
    private final Lock lock = new ReentrantLock(); 
    private ContextScopedEventNotifier eventNotifier; 

    @Override 
    public final void setCamelContext(final CamelContext camelContext) { 
     this.camelContext = camelContext; 
    } 

    @Override 
    public final CamelContext getCamelContext() { 
     return this.camelContext; 
    } 

    @Override 
    public final void onExchangeDone(final Route route, final Exchange exchange) { 
     throttle(route); 
    } 

    private void throttle(final Route route) { 
     // this works the best when this logic is executed when the exchange is done 
     Consumer consumer = route.getConsumer(); 

     boolean stop = isRouteMarkedForSuspension(route.getId()) && ((JmsConsumer) route.getConsumer()).isStarted(); 
     if (stop) { 
      try { 
       lock.lock(); 
       stopConsumer(consumer); 
      } catch (Exception e) { 
       handleException(e); 
      } finally { 
       lock.unlock(); 
      } 
     } 

     // reload size in case a race condition with too many at once being invoked 
     // so we need to ensure that we read the most current size and start the consumer if we are already to low 
     boolean start = !isRouteMarkedForSuspension(route.getId()) && ((JmsConsumer) route.getConsumer()).isSuspended(); 
     if (start) { 
      try { 
       lock.lock(); 
       startConsumer(consumer); 
      } catch (Exception e) { 
       handleException(e); 
      } finally { 
       lock.unlock(); 
      } 
     } 
    } 

    @Override 
    protected final void doStart() throws Exception { 
     ObjectHelper.notNull(camelContext, "CamelContext", this); 
     eventNotifier = new ContextScopedEventNotifier(); 
     // must start the notifier before it can be used 
     ServiceHelper.startService(eventNotifier); 
     // we are in context scope, so we need to use an event notifier to keep track 
     // when any exchanges is done on the camel context. 
     // This ensures we can trigger accordingly to context scope 
     camelContext.getManagementStrategy().addEventNotifier(eventNotifier); 
    } 

    @Override 
    protected final void doStop() throws Exception { 
     ObjectHelper.notNull(camelContext, "CamelContext", this); 
     camelContext.getManagementStrategy().removeEventNotifier(eventNotifier); 
    } 

    private class ContextScopedEventNotifier extends EventNotifierSupport { 

     @Override 
     public void notify(final EventObject event) throws Exception { 
      for (Route route : camelContext.getRoutes()) { 
       throttle(route); 
      } 
     } 

     @Override 
     public boolean isEnabled(final EventObject event) { 
      return event instanceof ExchangeCompletedEvent; 
     } 

     @Override 
     protected void doStart() throws Exception { 
      // noop 
     } 

     @Override 
     protected void doStop() throws Exception { 
      // noop 
     } 

     @Override 
     public String toString() { 
      return "ContextScopedEventNotifier"; 
     } 
    } 
} 

Так я добавил RoutePolicy выше все мои маршруты, например:

from(uri).routePolicy(routePolicy).process(runner); 

MyRoutePolicy является внутренним классом и isRouteMarkedForSuspension определяется в основном классе.

throttle ударил в двух точках:

  • после обмена (сообщение) в обработке. Это полезно для выяснения, следует ли приостанавливать работу потребителя.
  • на уведомлении о событии через ContextScopedEventNotifier. Это полезно для выяснения, следует ли возобновлять работу потребителя.
Смежные вопросы