У меня есть приложение интеграции Spring, которое обычно ежедневно проверяет файл через SFTP с помощью триггера cron. Но если он не найдет файл, который он ожидает, он должен опросить каждые x минут через периодический триггер до тех пор, пока не попытается выполнить y. Для этого я использую следующий компонент:Как установить переопределение сложного триггера?
@Component
public class RetryCompoundTriggerAdvice extends AbstractMessageSourceAdvice {
private final static Logger logger = LoggerFactory.getLogger(RetryCompoundTriggerAdvice.class);
private final CompoundTrigger compoundTrigger;
private final Trigger override;
private final ApplicationProperties applicationProperties;
private final Mail mail;
private int attempts = 0;
public RetryCompoundTriggerAdvice(CompoundTrigger compoundTrigger,
@Qualifier("secondaryTrigger") Trigger override,
ApplicationProperties applicationProperties,
Mail mail) {
this.compoundTrigger = compoundTrigger;
this.override = override;
this.applicationProperties = applicationProperties;
this.mail = mail;
}
@Override
public boolean beforeReceive(MessageSource<?> source) {
return true;
}
@Override
public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
final int maxOverrideAttempts = applicationProperties.getMaxFileRetry();
attempts++;
if (result == null && attempts < maxOverrideAttempts) {
logger.info("Unable to find load file after " + attempts + " attempt(s). Will reattempt");
this.compoundTrigger.setOverride(this.override);
} else if (result == null && attempts >= maxOverrideAttempts) {
mail.sendAdminsEmail("Missing File");
attempts = 0;
this.compoundTrigger.setOverride(null);
}
else {
attempts = 0;
this.compoundTrigger.setOverride(null);
logger.info("Found load file");
}
return result;
}
public void setOverrideTrigger() {
this.compoundTrigger.setOverride(this.override);
}
public CompoundTrigger getCompoundTrigger() {
return compoundTrigger;
}
}
Если файл не существует, это отлично работает. То есть переопределение (т. Е. Периодический триггер) вступает в силу и опроса каждые x минут до тех пор, пока не будет предпринята попытка y.
Однако, если файл существует, но он не является ожидаемым файлом (например, данные не совпадают), другой класс (который читает файл) вызывает setOverrideTrigger
класса RetryCompoundTriggerAdvice
. Но afterReceive
не вызывается через каждые x минут. Почему это должно быть?
Вот больше кода приложения:
SftpInboundFileSynchronizer
:
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory(applicationProperties.getSftpDirectory());
CompositeFileListFilter<ChannelSftp.LsEntry> compositeFileListFilter = new CompositeFileListFilter<ChannelSftp.LsEntry>();
compositeFileListFilter.addFilter(new SftpPersistentAcceptOnceFileListFilter(store, "sftp"));
compositeFileListFilter.addFilter(new SftpSimplePatternFileListFilter(applicationProperties.getLoadFileNamePattern()));
fileSynchronizer.setFilter(compositeFileListFilter);
fileSynchronizer.setPreserveTimestamp(true);
return fileSynchronizer;
}
Сессия Фабрика:
@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory sftpSessionFactory = new DefaultSftpSessionFactory();
sftpSessionFactory.setHost(applicationProperties.getSftpHost());
sftpSessionFactory.setPort(applicationProperties.getSftpPort());
sftpSessionFactory.setUser(applicationProperties.getSftpUser());
sftpSessionFactory.setPassword(applicationProperties.getSftpPassword());
sftpSessionFactory.setAllowUnknownKeys(true);
return new CachingSessionFactory<LsEntry>(sftpSessionFactory);
}
SftpInboundFileSynchronizingMessageSource
установлен на опрос с помощью триггера соединения.
@Bean
@InboundChannelAdapter(autoStartup="true", channel = "sftpChannel", poller = @Poller("pollerMetadata"))
public SftpInboundFileSynchronizingMessageSource sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(applicationProperties.getScheduledLoadDirectory());
source.setAutoCreateLocalDirectory(true);
CompositeFileListFilter<File> compositeFileFilter = new CompositeFileListFilter<File>();
compositeFileFilter.addFilter(new LastModifiedFileListFilter());
compositeFileFilter.addFilter(new FileSystemPersistentAcceptOnceFileListFilter(store, "dailyfilesystem"));
source.setLocalFilter(compositeFileFilter);
source.setCountsEnabled(true);
return source;
}
@Bean
public PollerMetadata pollerMetadata(RetryCompoundTriggerAdvice retryCompoundTriggerAdvice) {
PollerMetadata pollerMetadata = new PollerMetadata();
List<Advice> adviceChain = new ArrayList<Advice>();
adviceChain.add(retryCompoundTriggerAdvice);
pollerMetadata.setAdviceChain(adviceChain);
pollerMetadata.setTrigger(compoundTrigger());
pollerMetadata.setMaxMessagesPerPoll(1);
return pollerMetadata;
}
@Bean
public CompoundTrigger compoundTrigger() {
CompoundTrigger compoundTrigger = new CompoundTrigger(primaryTrigger());
return compoundTrigger;
}
@Bean
public CronTrigger primaryTrigger() {
return new CronTrigger(applicationProperties.getSchedule());
}
@Bean
public PeriodicTrigger secondaryTrigger() {
return new PeriodicTrigger(applicationProperties.getRetryInterval());
}
Update
Вот обработчик сообщений:
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler dailyHandler(SimpleJobLauncher jobLauncher, Job job, Mail mail) {
JobRunner jobRunner = new JobRunner(jobLauncher, job, store, mail);
jobRunner.setDaily("true");
jobRunner.setOverwrite("false");
return jobRunner;
}
JobRunner
пинает Spring Batch работы. После обработки задания мое приложение смотрит, имеет ли файл данные, которые он ожидал в течение дня. Если нет, он устанавливает триггер переопределения.
Спасибо. Да, это отличная тема из того, что я могу сказать. Итак, это объясняет это. Добавлен соответствующий код в разделе обновления OP. Не уверен, как выполнить мою задачу, не вызывая логики, которая решает повторить попытку из 'RetryCompoundTriggerAdvice'. Логика, однако, основана на нескольких факторах, включая чтение и сравнение всех строк в файле (именно поэтому я применил его после завершения задания Spring Batch). – James
«CompoundTriggerAdvice» предоставляет возможности [smart poller] (http://docs.spring.io/spring-integration/reference/html/messaging-channels-section.html#__smart_polling). Одна из возможностей заключается в том, что, пока сообщение обрабатывается, держите кратковременного опросчика на месте, пока не будет оценено состояние файла. Попросите сервис «подтвердить» советы, что все в порядке, и смените триггер на следующий опрос.Пока файл обрабатывается (и до подтверждения), используйте 'beforeReceive()' return 'false', который отменяет текущий опрос –