1

У меня есть служба, которая потребляет от входящей очереди и производит некоторую исходящую очередь (где другой поток, созданный этой службой, берет сообщения и «переносит» их в пункт назначения).Как запустить потребитель блокирования опроса в Java?

В настоящее время я использую две простые Thread s, как показано ниже в коде, но я знаю, что в целом вы больше не должны их использовать и вместо этого используйте абстракции более высокого уровня, такие как ExecutorService.

Будет ли это иметь смысл в моем случае? В частности я имею в виду ->

  • ли это сокращение кода?
  • сделать код более надежным в случае отказа?
  • разрешить более плавное прекращение резьбы? (что полезно при выполнении тестов)

Я пропустил что-то важное здесь? (Maybee некоторых других классов от java.util.concurrent)

// called on service startup 
private void init() { 
    // prepare everything here 
    startInboundWorkerThread(); 
    startOutboundTransporterWorkerThread(); 
} 

private void startInboundWorkerThread() { 
    InboundWorkerThread runnable = injector.getInstance(InboundWorkerThread.class); 
    inboundWorkerThread = new Thread(runnable, ownServiceIdentifier); 
    inboundWorkerThread.start(); 
} 

// this is the Runnable for the InboundWorkerThread 
// the runnable for the transporter thread looks almost the same 
@Override 
public void run() {  
    while (true) { 
     InboundMessage message = null; 
     TransactionStatus transaction = null; 

     try { 
      try { 
       transaction = txManager.getTransaction(new DefaultTransactionDefinition()); 
      } catch (Exception ex) { 
       // logging 
       break; 
      } 

      // blocking consumer 
      message = repository.takeOrdered(template, MESSAGE_POLL_TIMEOUT_MILLIS); 
      if (message != null) {     
       handleMessage(message); 
       commitTransaction(message, transaction); 
      } else { 
       commitTransaction(transaction); 
      } 
     } catch (Exception e) { 
      // logging 
      rollback(transaction); 
     } catch (Throwable e) { 
      // logging 
      rollback(transaction); 
      throw e; 
     } 

     if (Thread.interrupted()) { 
      // logging 
      break; 
     } 
    } 

    // logging 
} 

// called when service is shutdown 
// both inbound worker thread and transporter worker thread must be terminated 
private void interruptAndJoinWorkerThread(final Thread workerThread) { 
    if (workerThread != null && workerThread.isAlive()) { 
     workerThread.interrupt(); 

     try { 
      workerThread.join(TimeUnit.SECONDS.toMillis(1)); 
     } catch (InterruptedException e) { 
      // logging 
     } 
    } 
} 

ответ

0

Основное преимущество для меня в использовании ThreadPools происходит от структурирования работы в отдельных, независимых и, как правило коротких рабочих местах и ​​улучшение абстракции потоков в ThreadPool s частный Worker s. Иногда вам может понадобиться более прямой доступ к ним, чтобы узнать, все ли они работают и т. Д. Но обычно лучше, , ориентированных на работу, способов сделать это.

Что касается неудачи обработки, вы можете отправить свой собственный ThreadFactory создавать темы с обычаем UncaughtExceptionHandler и вообще, ваши Runnable рабочих мест должны обеспечивать хорошую обработку исключений тоже для того, чтобы войти больше информации о конкретной работе, которая не смогли. Сделать эти задания неблокирующими, так как вы не хотите заполнять ThreadPool заблокированными рабочими. Перемещайте операции блокировки до того, как задание будет поставлено в очередь.

Как правило, shutdown и shutdownNow, как предусмотрено ExecutorService с, в сочетании с надлежащей обработкой прерываний на ваших рабочих местах, позволят добиться плавного завершения работы.