2017-02-22 12 views
0

У меня есть приложение интеграции Spring, которое обычно ежедневно проверяет файл через SFTP с помощью триггера cron. Но если он не найдет файл, который он ожидает, он должен опросить каждые x минут через периодический триггер до тех пор, пока не попытается выполнить y. Для этого я использую следующий компонент:Как установить переопределение сложного триггера?

@Component 
public class RetryCompoundTriggerAdvice extends AbstractMessageSourceAdvice { 

    private final static Logger logger = LoggerFactory.getLogger(RetryCompoundTriggerAdvice.class); 

    private final CompoundTrigger compoundTrigger; 

    private final Trigger override; 

    private final ApplicationProperties applicationProperties; 

    private final Mail mail; 

    private int attempts = 0; 

    public RetryCompoundTriggerAdvice(CompoundTrigger compoundTrigger, 
      @Qualifier("secondaryTrigger") Trigger override, 
      ApplicationProperties applicationProperties, 
      Mail mail) { 
     this.compoundTrigger = compoundTrigger; 
     this.override = override; 
     this.applicationProperties = applicationProperties; 
     this.mail = mail; 
    } 

    @Override 
    public boolean beforeReceive(MessageSource<?> source) { 
     return true; 
    } 

    @Override 
    public Message<?> afterReceive(Message<?> result, MessageSource<?> source) { 
     final int maxOverrideAttempts = applicationProperties.getMaxFileRetry(); 
     attempts++; 
     if (result == null && attempts < maxOverrideAttempts) { 
      logger.info("Unable to find load file after " + attempts + " attempt(s). Will reattempt"); 
      this.compoundTrigger.setOverride(this.override); 
     } else if (result == null && attempts >= maxOverrideAttempts) { 
      mail.sendAdminsEmail("Missing File"); 
      attempts = 0; 
      this.compoundTrigger.setOverride(null); 
     } 
     else { 
      attempts = 0; 
      this.compoundTrigger.setOverride(null); 
      logger.info("Found load file"); 
     } 
     return result; 
    } 

    public void setOverrideTrigger() { 
     this.compoundTrigger.setOverride(this.override); 
    } 

    public CompoundTrigger getCompoundTrigger() { 
     return compoundTrigger; 
    } 
} 

Если файл не существует, это отлично работает. То есть переопределение (т. Е. Периодический триггер) вступает в силу и опроса каждые x минут до тех пор, пока не будет предпринята попытка y.

Однако, если файл существует, но он не является ожидаемым файлом (например, данные не совпадают), другой класс (который читает файл) вызывает setOverrideTrigger класса RetryCompoundTriggerAdvice. Но afterReceive не вызывается через каждые x минут. Почему это должно быть?

Вот больше кода приложения:

SftpInboundFileSynchronizer:

@Bean 
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() { 
    SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory()); 
    fileSynchronizer.setDeleteRemoteFiles(false); 
    fileSynchronizer.setRemoteDirectory(applicationProperties.getSftpDirectory()); 
    CompositeFileListFilter<ChannelSftp.LsEntry> compositeFileListFilter = new CompositeFileListFilter<ChannelSftp.LsEntry>(); 
    compositeFileListFilter.addFilter(new SftpPersistentAcceptOnceFileListFilter(store, "sftp")); 
    compositeFileListFilter.addFilter(new SftpSimplePatternFileListFilter(applicationProperties.getLoadFileNamePattern())); 
    fileSynchronizer.setFilter(compositeFileListFilter); 
    fileSynchronizer.setPreserveTimestamp(true); 
    return fileSynchronizer; 
} 

Сессия Фабрика:

@Bean 
public SessionFactory<LsEntry> sftpSessionFactory() { 
    DefaultSftpSessionFactory sftpSessionFactory = new DefaultSftpSessionFactory(); 
    sftpSessionFactory.setHost(applicationProperties.getSftpHost()); 
    sftpSessionFactory.setPort(applicationProperties.getSftpPort()); 
    sftpSessionFactory.setUser(applicationProperties.getSftpUser()); 
    sftpSessionFactory.setPassword(applicationProperties.getSftpPassword()); 
    sftpSessionFactory.setAllowUnknownKeys(true); 
    return new CachingSessionFactory<LsEntry>(sftpSessionFactory); 
} 

SftpInboundFileSynchronizingMessageSource установлен на опрос с помощью триггера соединения.

@Bean 
@InboundChannelAdapter(autoStartup="true", channel = "sftpChannel", poller = @Poller("pollerMetadata")) 
public SftpInboundFileSynchronizingMessageSource sftpMessageSource() { 
    SftpInboundFileSynchronizingMessageSource source = 
      new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer()); 
    source.setLocalDirectory(applicationProperties.getScheduledLoadDirectory()); 
    source.setAutoCreateLocalDirectory(true); 
    CompositeFileListFilter<File> compositeFileFilter = new CompositeFileListFilter<File>(); 
    compositeFileFilter.addFilter(new LastModifiedFileListFilter()); 
    compositeFileFilter.addFilter(new FileSystemPersistentAcceptOnceFileListFilter(store, "dailyfilesystem")); 
    source.setLocalFilter(compositeFileFilter); 
    source.setCountsEnabled(true); 
    return source; 
} 

@Bean 
public PollerMetadata pollerMetadata(RetryCompoundTriggerAdvice retryCompoundTriggerAdvice) { 
    PollerMetadata pollerMetadata = new PollerMetadata(); 
    List<Advice> adviceChain = new ArrayList<Advice>(); 
    adviceChain.add(retryCompoundTriggerAdvice); 
    pollerMetadata.setAdviceChain(adviceChain); 
    pollerMetadata.setTrigger(compoundTrigger()); 
    pollerMetadata.setMaxMessagesPerPoll(1); 
    return pollerMetadata; 
} 

@Bean 
public CompoundTrigger compoundTrigger() { 
    CompoundTrigger compoundTrigger = new CompoundTrigger(primaryTrigger()); 
    return compoundTrigger; 
} 

@Bean 
public CronTrigger primaryTrigger() { 
    return new CronTrigger(applicationProperties.getSchedule()); 
} 

@Bean 
public PeriodicTrigger secondaryTrigger() { 
    return new PeriodicTrigger(applicationProperties.getRetryInterval()); 
} 

Update

Вот обработчик сообщений:

@Bean 
@ServiceActivator(inputChannel = "sftpChannel") 
public MessageHandler dailyHandler(SimpleJobLauncher jobLauncher, Job job, Mail mail) { 
    JobRunner jobRunner = new JobRunner(jobLauncher, job, store, mail); 
    jobRunner.setDaily("true"); 
    jobRunner.setOverwrite("false"); 
    return jobRunner; 
} 

JobRunner пинает Spring Batch работы. После обработки задания мое приложение смотрит, имеет ли файл данные, которые он ожидал в течение дня. Если нет, он устанавливает триггер переопределения.

ответ

1

Это срабатывает триггеры - вы получаете возможность изменять триггер при срабатывании триггера.

С тех пор, как вы сбрасываете триггер cron, следующая возможность изменения - это когда срабатывает триггер (если поток опроса освобождается потоком вниз по потоку перед сменой триггера).

Отправляете ли вы файл в другой поток (канал или исполнитель очереди)? Если нет, я бы ожидал, что любые изменения в триггере должны применяться, потому что nextExecutionTime() не будет вызываться до тех пор, пока поток нисходящего потока не будет возвращен.

Если есть передача передачи нити, у вас нет возможности сменить триггер.

+0

Спасибо. Да, это отличная тема из того, что я могу сказать. Итак, это объясняет это. Добавлен соответствующий код в разделе обновления OP. Не уверен, как выполнить мою задачу, не вызывая логики, которая решает повторить попытку из 'RetryCompoundTriggerAdvice'. Логика, однако, основана на нескольких факторах, включая чтение и сравнение всех строк в файле (именно поэтому я применил его после завершения задания Spring Batch). – James

+1

«CompoundTriggerAdvice» предоставляет возможности [smart poller] (http://docs.spring.io/spring-integration/reference/html/messaging-channels-section.html#__smart_polling). Одна из возможностей заключается в том, что, пока сообщение обрабатывается, держите кратковременного опросчика на месте, пока не будет оценено состояние файла. Попросите сервис «подтвердить» советы, что все в порядке, и смените триггер на следующий опрос.Пока файл обрабатывается (и до подтверждения), используйте 'beforeReceive()' return 'false', который отменяет текущий опрос –

Смежные вопросы