2015-10-19 4 views
2

У нас есть приложение Apache Camel с высокой нагрузкой, которое использует logback/MDC для регистрации информации. Мы обнаруживаем, что некоторые данные MDC устаревают в потоках, как это было указано в документации журнала. Я нашел это так вопрос, который адресует эту проблему:Camel MDC Logback Stale Info Под томом

How to use MDC with thread pools?

Как мы должны применить это к нашему верблюжьего приложению, чтобы избежать несвежей информации? Существует ли простое глобальное изменение стандартного ThreadPoolExecutor по умолчанию, как это предлагается в связанном вопросе? Я вижу, вы можете сделать это для самих пулов, но не видели никаких примеров для исполнителя. Имейте в виду, что наше приложение достаточно велико и ежедневно обслуживает большой объем заказов - я бы хотел как можно меньше воздействовать на существующее приложение.

ответ

2

Я понял это и хотел опубликовать то, что я сделал, если это приносит пользу кому-то другому. Пожалуйста, обратите внимание, я использую JDK 6/camel2.13.2

  • верблюд имеет DefaultExecutorServiceManager, который использует DefaultThreadPoolFactory. Я продлил завод по умолчанию в MdcThreadPoolFactory

  • DefaultThreadPoolFactory имеет методы для генерации RejectableThreadPoolExecutor с и RejectableScheduledThreadPoolExecutor с. Я расширил оба из них в версии Mdc *, которые переопределяют метод execute() для переноса Runnable и передачи информации MDC между потоками (как указано ссылкой в ​​моем исходном вопросе).

  • Я создал экземпляр боб в MdcThreadPoolFactory в моей конфигурации приложения, автоматически подобран Camel и используется в ExecutorServiceManager

MdcThreadPoolExecutor:

package com.mypackage.concurrent 

import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor 
import org.slf4j.MDC; 

import java.util.Map; 
import java.util.concurrent.*; 

/** 
* A SLF4J MDC-compatible {@link ThreadPoolExecutor}. 
* <p/> 
* In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate 
* logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a 
* thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately. 
* <p/> 
* Created by broda20. 
* Date: 10/29/15 
*/ 
public class MdcThreadPoolExecutor extends RejectableThreadPoolExecutor { 

    @SuppressWarnings("unchecked") 
    private Map<String, Object> getContextForTask() { 
     return MDC.getCopyOfContextMap(); 
    } 

    public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
             BlockingQueue<Runnable> workQueue) { 
     super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); 
    } 

    public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
             BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { 
     super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); 
    } 

    public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
             BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { 
     super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); 
    } 

    public MdcThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
             BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { 
     super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); 
    } 

    /** 
    * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.) 
    * all delegate to this. 
    */ 
    @Override 
    public void execute(Runnable command) { 
     super.execute(wrap(command, getContextForTask())); 
    } 

    public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) { 
     return new Runnable() { 
      @Override 
      public void run() { 
       Map previous = MDC.getCopyOfContextMap(); 
       if (context == null) { 
        MDC.clear(); 
       } else { 
        MDC.setContextMap(context); 
       } 
       try { 
        runnable.run(); 
       } finally { 
        if (previous == null) { 
         MDC.clear(); 
        } else { 
         MDC.setContextMap(previous); 
        } 
       } 
      } 
     }; 
    } 
} 

MdcScheduledThreadPoolExecutor:

package com.mypackage.concurrent 

import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor 
import org.slf4j.MDC; 

import java.util.Map; 
import java.util.concurrent.*; 

/** 
* A SLF4J MDC-compatible {@link ThreadPoolExecutor}. 
* <p/> 
* In general, MDC is used to store diagnostic information (e.g. a user's session id) in per-thread variables, to facilitate 
* logging. However, although MDC data is passed to thread children, this doesn't work when threads are reused in a 
* thread pool. This is a drop-in replacement for {@link ThreadPoolExecutor} sets MDC data before each task appropriately. 
* <p/> 
* Created by broda20. 
* Date: 10/29/15 
*/ 
public class MdcScheduledThreadPoolExecutor extends RejectableScheduledThreadPoolExecutor { 

    @SuppressWarnings("unchecked") 
    private Map<String, Object> getContextForTask() { 
     return MDC.getCopyOfContextMap(); 
    } 

    public MdcScheduledThreadPoolExecutor(int corePoolSize) { 
     super(corePoolSize); 
    } 

    public MdcScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { 
     super(corePoolSize, threadFactory); 
    } 

    public MdcScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { 
     super(corePoolSize, handler); 
    } 

    public MdcScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { 
     super(corePoolSize, threadFactory, handler); 
    } 

    /** 
    * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.) 
    * all delegate to this. 
    */ 
    @Override 
    public void execute(Runnable command) { 
     super.execute(wrap(command, getContextForTask())); 
    } 

    public static Runnable wrap(final Runnable runnable, final Map<String, Object> context) { 
     return new Runnable() { 
      @Override 
      public void run() { 
       Map previous = MDC.getCopyOfContextMap(); 
       if (context == null) { 
        MDC.clear(); 
       } else { 
        MDC.setContextMap(context); 
       } 
       try { 
        runnable.run(); 
       } finally { 
        if (previous == null) { 
         MDC.clear(); 
        } else { 
         MDC.setContextMap(previous); 
        } 
       } 
      } 
     }; 
    } 
} 

MdcThreadPoolFactory:

package com.mypackage.concurrent 

import org.apache.camel.impl.DefaultThreadPoolFactory 
import org.apache.camel.spi.ThreadPoolProfile 
import org.apache.camel.util.concurrent.SizedScheduledExecutorService 
import org.slf4j.MDC; 

import java.util.Map; 
import java.util.concurrent.*; 

public class MdcThreadPoolFactory extends DefaultThreadPoolFactory { 

    @SuppressWarnings("unchecked") 
    private Map<String, Object> getContextForTask() { 
     return MDC.getCopyOfContextMap(); 
    } 


    public ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, int maxQueueSize, boolean allowCoreThreadTimeOut, 
              RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory) throws IllegalArgumentException { 

      // the core pool size must be 0 or higher 
      if (corePoolSize < 0) { 
       throw new IllegalArgumentException("CorePoolSize must be >= 0, was " + corePoolSize); 
      } 

      // validate max >= core 
      if (maxPoolSize < corePoolSize) { 
       throw new IllegalArgumentException("MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize); 
      } 

      BlockingQueue<Runnable> workQueue; 
      if (corePoolSize == 0 && maxQueueSize <= 0) { 
       // use a synchronous queue for direct-handover (no tasks stored on the queue) 
       workQueue = new SynchronousQueue<Runnable>(); 
       // and force 1 as pool size to be able to create the thread pool by the JDK 
       corePoolSize = 1; 
       maxPoolSize = 1; 
      } else if (maxQueueSize <= 0) { 
       // use a synchronous queue for direct-handover (no tasks stored on the queue) 
       workQueue = new SynchronousQueue<Runnable>(); 
      } else { 
       // bounded task queue to store tasks on the queue 
       workQueue = new LinkedBlockingQueue<Runnable>(maxQueueSize); 
      } 

      ThreadPoolExecutor answer = new MdcThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue); 
      answer.setThreadFactory(threadFactory); 
      answer.allowCoreThreadTimeOut(allowCoreThreadTimeOut); 
      if (rejectedExecutionHandler == null) { 
       rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); 
      } 
      answer.setRejectedExecutionHandler(rejectedExecutionHandler); 
      return answer; 
     } 

     @Override 
     public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) { 
      RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler(); 
      if (rejectedExecutionHandler == null) { 
       rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); 
      } 

      ScheduledThreadPoolExecutor answer = new MdcScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler); 
      //JDK7: answer.setRemoveOnCancelPolicy(true); 

      // need to wrap the thread pool in a sized to guard against the problem that the 
      // JDK created thread pool has an unbounded queue (see class javadoc), which mean 
      // we could potentially keep adding tasks, and run out of memory. 
      if (profile.getMaxPoolSize() > 0) { 
       return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize()); 
      } else { 
       return answer; 
      } 
     } 
} 

И, наконец, экземпляр компонента:

<bean id="mdcThreadPoolFactory" class="com.mypackage.concurrent.MdcThreadPoolFactory"/> 
+1

Чтобы получить эту работу в Camel 2.16.3 для новых потоков, запрашиваемых org.apache.camel.util.component.AbstractApiProducer. process (Exchange, AsyncCallback) Мне также пришлось переопределить java.util.concurrent.ScheduledThreadPoolExecutor.submit (Runnable) –

+1

cool. Полезно знать, когда я могу обновить наш верблюд. –

+1

Впоследствии я изменил это, чтобы переопределить общедоступное расписание ScheduledFuture (Runnable command, long delay, TimeUnit unit), который представляет собой делегат submit() & execute() bith (в JDK8 на наименее). Я думаю, что это сделало бы хорошее представление для верблюда. Если я найду время, чтобы обойти это, вы согласны с подписанием авторских прав на Apache (или какие-либо действия по лицензированию)? –

Смежные вопросы