2015-02-08 1 views
1

Весной интеграция - я пытаюсь читать файл с FTP и обрабатывать его. Как мы можем достичь метастора и обработать его без опроса. В приведенной ниже конфигурации, чтобы избежать чтения одного и того же файла, если перезагрузка сервера произойдет, я представил хранилище сообщений в ftpChannel. Теперь процессор файла является активатором службы, который нуждается в опросе. Как я могу избежать опроса в активаторе службы и немедленно прочитать файл из очереди ftpChannel. Если я использую int: dispatcher, тогда я не мог бы использовать хранилище сообщений.Весенняя интеграция - прочитайте файл с FTP и обработайте его. Как мы можем достичь метастора и обработать его без опроса

Как мы можем решить это?

<int:channel id="ftpChannel"> 
    <int:queue message-store="mongoDbMessageStore" /> 
    <!-- <int:dispatcher task-executor="taskExecutor"/> --> 
</int:channel> 

<bean id="mongoDbMessageStore" 
    class="org.springframework.integration.mongodb.store.MongoDbMessageStore"> 
    <constructor-arg name="mongoDbFactory" ref="mongoDbFactory" /> 
    <constructor-arg name="collectionName" value="ftpInfo" /> 
</bean> 

<int-ftp:inbound-channel-adapter id="ftpInbound" 
    channel="ftpChannel" session-factory="ftpClientFactory" charset="UTF-8" 
    auto-create-local-directory="true" delete-remote-files="false" 
    filename-pattern="*.gz" remote-directory="/myfilerepo/#{istDate.getISTDate()}" 
    remote-file-separator="/" local-filename-generator-expression="#this.toUpperCase()" 
    temporary-file-suffix=".writing" preserve-timestamp="true" 
    local-directory="/temp/spring/#{istDate.getISTDate()}"> 

    <int:poller cron="0-5 0/5 * * * ?" max-messages-per-poll="-1"/> 
</int-ftp:inbound-channel-adapter> 

<int:service-activator id="jobServiceActivator" 
    input-channel="ftpChannel" ref="triggerJobLauncher" method="launch"> 
    <int:poller fixed-delay="10" /> 
</int:service-activator> 

<!-- job context --> 
<bean id="jobRepository" 
    class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> 
    <property name="transactionManager" ref="transactionManager" /> 
</bean> 

<bean id="transactionManager" 
    class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" /> 

<bean id="jobLauncher" 
    class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> 
    <property name="jobRepository" ref="jobRepository" /> 
</bean> 
<!-- job context --> 

ответ

2

Вам не нужен message-store на канале; вам необходимо использовать FtpPersistentAcceptOnceFileListFilter в filter и/или FileSystemPersistentAcceptOnceFileListFilter в local-filter, чтобы избежать повторной обработки файлов после перезагрузки системы.

Им нужен MetadataStore; , если вы хотите использовать монго, вам нужно будет его реализовать; рамки в настоящее время не имеют реализации mongo.

EDIT:

В версии 4.2, структура now has a mongo MetadataStore.

+0

Я пытаюсь эту технику., –

+0

'<идентификатор = класс компонента "metadataStore"= "org.springframework.integration.metadata.PropertiesPersistingMetadataStore"/> <боб ID = "acceptOnceFilter" класс =" org.springframework. integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter "> \t \t <конструктор-Arg имя =" магазин»исх = "metadataStore"/> \t \t <конструктор-Arg имя = "" значение = "префикс тест-"/> \t ' –

+0

Его работа .. !! Спасибо –

0

В соответствии с вашей озабоченностью to avoid reading the same file from the QueueChannel, я бы сказал, что вы беспокоитесь, что приложение Spring всегда обрабатывает одно и то же сообщение после запуска приложения. Но это неправда. Сообщение удаляется из очереди (и, конечно, из MessageStore), когда это polled оттуда. Если вы MessageStore является транзакционным ресурсом (например, JDBC), запись для сообщений помечена для удаления до тех пор, пока не будет выполнена передача или откат TX. Это не случай для MongoDB, хотя любое сообщение из очереди опрошено только один раз, даже если у вас есть кластер такого приложения.

Итак, я не понимаю, почему вы уверены, что ваш <service-activator> принимает тот же файл (как payload), потому что вы перезапустите приложение.

С другой стороны, если вы хотите получить доступ к MessageGroup поддержке вашего ftpChannel, вы можете сделать это:

mongoDbMessageStore.getMessageGroup("mongoDbMessageStore:ftpChannel"); 

С другой стороны, вы всегда можете purgeQueueChannel вручную, инъекции ftpChannel в какой-то службы, как QueueChannelOperations ,

+0

Я согласился с логикой выше, обработаю сообщение только один раз, но мне нужно опросить очередь (от активатора службы), здесь период опроса составляет 10 мс, что является задержкой для меня. Неужели мы все равно читаем сообщение в очереди без опроса? –

+0

'fidex-delay' переводится в' PeriodicTrigger', так как у вас нет 'initialDelay' (и мы не выставляем его в' '), поэтому не будет задержки для первого' poll' после запуска приложения. –

+0

нас беспокоит периодическая задержка в обработке очереди, вместо этого она должна быть в сообщении на основе оповещения. –

Смежные вопросы