2016-10-19 3 views
0

У меня есть следующий код, который работает нормально, извлекает файлы с FTP-сервера в поток, но мне нужно получить строку каждого файла, кажется, мне нужно использовать Transformer, передающий кодировку, но что мне не хватает? Как точно передать содержимое строки каждого файла?Получить строку из Spring FTP потокового адаптера входящего канала

спасибо заранее

@SpringBootApplication 
@EnableIntegration 
public class FtpinboundApp extends SpringBootServletInitializer implements WebApplicationInitializer { 

    final static Logger logger = Logger.getLogger(FtpinboundApp.class); 

    public static void main(String[] args) { 
     SpringApplication.run(FtpinboundApp.class, args); 
    } 

    @Bean 
    public SessionFactory<FTPFile> ftpSessionFactory() { 
     DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory(); 
     sf.setHost("X.X.X.X"); 
     sf.setPort(21); 
     sf.setUsername("xxx"); 
     sf.setPassword("XXX"); 
     return new CachingSessionFactory<FTPFile>(sf); 
    } 


    @Bean 
    @ServiceActivator(inputChannel = "stream") 
    public MessageHandler handler() { 
     return new MessageHandler() { 

      @Override 
      public void handleMessage(Message<?> message) throws MessagingException { 
       System.out.println("trasnferred file:" + message.getPayload()); 

      } 

     }; 
    } 

    @Bean 
    @InboundChannelAdapter(value = "stream", poller = @Poller(fixedRate = "1000")) 
    public MessageSource<InputStream> ftpMessageSource() { 

     FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template(), null); 
     messageSource.setRemoteDirectory("/X/X/X"); 
     messageSource.setFilter(new FtpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "streaming")); 
     return messageSource; 
    } 

    @Bean 
     @Transformer(inputChannel = "stream", outputChannel = "data") 
     public org.springframework.integration.transformer.Transformer transformer() { 
      return new StreamTransformer("UTF-8"); 
     } 

     @Bean 
     public FtpRemoteFileTemplate template() { 
      return new FtpRemoteFileTemplate(ftpSessionFactory()); 
     } 


    @Bean(name = PollerMetadata.DEFAULT_POLLER) 
    public PollerMetadata defaultPoller() { 
     PollerMetadata pollerMetadata = new PollerMetadata(); 
     pollerMetadata.setTrigger(new PeriodicTrigger(5000)); 
     return pollerMetadata; 
    } 

} 

ответ

1

Используйте StreamTransformer, чтобы получить весь файл в виде одной строки, или FileSplitter, чтобы получить сообщение для каждой строки.

EDIT (фильтр конфигурации)

@Bean 
@InboundChannelAdapter(channel = "stream") 
public MessageSource<InputStream> ftpMessageSource() { 
    FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template(), null); 
    messageSource.setRemoteDirectory("ftpSource/"); 
    messageSource.setFilter(filter()); 
    return messageSource; 
} 

public FileListFilter<FTPFile> filter() { 
    CompositeFileListFilter<FTPFile> filter = new CompositeFileListFilter<>(); 
    filter.addFilter(new FtpSimplePatternFileListFilter("*.txt")); 
    filter.addFilter(acceptOnceFilter()); 
    return filter; 
} 

@Bean 
public FtpPersistentAcceptOnceFileListFilter acceptOnceFilter() { 
    FtpPersistentAcceptOnceFileListFilter filter = new FtpPersistentAcceptOnceFileListFilter(meta(), 
      "streaming"); // keys will be, e.g. "streamingfoo.txt" 
    filter.setFlushOnUpdate(true); 
    return filter; 
} 

@Bean 
public ConcurrentMetadataStore meta() { 
    PropertiesPersistingMetadataStore meta = new PropertiesPersistingMetadataStore(); 
    meta.setBaseDirectory("/tmp/foo"); 
    meta.setFileName("ftpStream.properties"); 
    return meta; 
} 

EDIT2 - удалить удаленный файл с советами

@ServiceActivator(inputChannel = "data", adviceChain = "after") 
@Bean 
public MessageHandler handle() { 
    return System.out::println; 
} 

@Bean 
public ExpressionEvaluatingRequestHandlerAdvice after() { 
    ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice(); 
    advice.setOnSuccessExpression(
      "@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])"); 
    advice.setPropagateEvaluationFailures(true); 
    return advice; 
} 
+0

Гэри, спасибо. да на самом деле у меня есть: – viruskimera

+0

Гэри, спасибо да на самом деле у меня есть: \t @Bean \t @Transformer (inputChannel = "поток", outputChannel = "данные") \t общественного org.springframework.integration.transformer.Transformer трансформатор() { \t вернуть новый StreamTransformer («UTF-8»); \t}, но я не уверен, где его использовать в коде. – viruskimera

+1

В настоящее время у вашего активатора обслуживания и трансформатора подписаны «поток» - это означает, что сообщения будут поочередно передаваться каждому из них - вам нужно подписаться на активатор услуги на выходной канал трансформатора. –

Смежные вопросы