У меня есть служба, которая потребляет от входящей очереди и производит некоторую исходящую очередь (где другой поток, созданный этой службой, берет сообщения и «переносит» их в пункт назначения).Как запустить потребитель блокирования опроса в Java?
В настоящее время я использую две простые Thread
s, как показано ниже в коде, но я знаю, что в целом вы больше не должны их использовать и вместо этого используйте абстракции более высокого уровня, такие как ExecutorService
.
Будет ли это иметь смысл в моем случае? В частности я имею в виду ->
- ли это сокращение кода?
- сделать код более надежным в случае отказа?
- разрешить более плавное прекращение резьбы? (что полезно при выполнении тестов)
Я пропустил что-то важное здесь? (Maybee некоторых других классов от java.util.concurrent)
// called on service startup
private void init() {
// prepare everything here
startInboundWorkerThread();
startOutboundTransporterWorkerThread();
}
private void startInboundWorkerThread() {
InboundWorkerThread runnable = injector.getInstance(InboundWorkerThread.class);
inboundWorkerThread = new Thread(runnable, ownServiceIdentifier);
inboundWorkerThread.start();
}
// this is the Runnable for the InboundWorkerThread
// the runnable for the transporter thread looks almost the same
@Override
public void run() {
while (true) {
InboundMessage message = null;
TransactionStatus transaction = null;
try {
try {
transaction = txManager.getTransaction(new DefaultTransactionDefinition());
} catch (Exception ex) {
// logging
break;
}
// blocking consumer
message = repository.takeOrdered(template, MESSAGE_POLL_TIMEOUT_MILLIS);
if (message != null) {
handleMessage(message);
commitTransaction(message, transaction);
} else {
commitTransaction(transaction);
}
} catch (Exception e) {
// logging
rollback(transaction);
} catch (Throwable e) {
// logging
rollback(transaction);
throw e;
}
if (Thread.interrupted()) {
// logging
break;
}
}
// logging
}
// called when service is shutdown
// both inbound worker thread and transporter worker thread must be terminated
private void interruptAndJoinWorkerThread(final Thread workerThread) {
if (workerThread != null && workerThread.isAlive()) {
workerThread.interrupt();
try {
workerThread.join(TimeUnit.SECONDS.toMillis(1));
} catch (InterruptedException e) {
// logging
}
}
}