2016-02-26 2 views
0

Это сценарий. Я должен опросить ftp-сервер в промежутке времени и получить файлы csv. Затем эти файлы CSV необходимо разобрать и отправить в качестве входных данных для некоторой бизнес-логики. Я сделал это таким образом.Модель потребительской модели, подобная сценарию, не работает

FTPClientPolling (продюсер)

public class FTPClientPolling { 

    private static FTPClientPolling instance = null; 

    private FTPClientPolling() { 
    } 

    public synchronized static FTPClientPolling getInstance() { 
     if (instance == null) { 
      logger.info("Object created for Client Polling"); 
      instance = new FTPClientPolling(); 
      initializeFTPClient(); 
     } 
     return instance; 
    } 

    public static void initializeFTPClient() { 
     // initialize the values from properties file 
    } 

    public void startPolling() { 

     FTPClient ftpClient = null; 
     try { 
      //connecting to ftp server 

      //iterating the files in it 
      FTPFile[] filesList = ftpClient.listFiles(); 
      for (FTPFile tmpFile : filesList) { 
       //.. 
       File tempFile = File.createTempFile(tmpFile.getName(), null); 
       FileOutputStream fileOut = new FileOutputStream(tempFile); 
       ftpClient.retrieveFile(tmpFile.getName(), fileOut); 

       //adding the file to the Queue of the file processor 
       FileProcessor.getInstance().getFilesToBeProcessedQueue().add(tempFile); 
      } 

      if (ftpClient.isConnected()) 
       ftpClient.disconnect(); 
     } catch (Exception e) { 
      //logging 
     } finally { 
      //closing ftpclient 
     } 

    } 
} 

FTPClientPollingTasker (Производитель Tasker)

public class FTPClientTasker extends TimerTask { 
    private static Long timeInterval = 10000l; 

    @Override 
    public void run() { 
     FTPClientPolling.getInstance().startPolling(); 
    } 

    public static void start() { 
     TimerTask timerTask = new FTPClientTasker(); 
     Timer timer = new Timer(); 
     timer.scheduleAtFixedRate(timerTask, timeInterval, timeInterval); 
    } 

    public static void main(String[] args) { 
     start(); 
    } 
} 

FileProcessor (Consumer)

public final class FileProcessor { 

    private static FileProcessor instance = null; 
    private Queue<File> filesToBeProcessedQueue = new ArrayBlockingQueue<File>(10); 

    private FileProcessor() { 
    } 

    public synchronized static FileProcessor getInstance() { 
     if (instance == null) { 
      instance = new FileProcessor(); 
     } 
     return instance; 
    } 

    public void run() { 
     while (!filesToBeProcessedQueue.isEmpty()) { 
      processSyncFiles(filesToBeProcessedQueue.poll()); 
     } 
    } 

    private void processSyncFiles(File inputFile) { 
     try { 
      HashMap<String, Boolean> outputConsolidation = new HashMap<String, Boolean>(); 

      FileReader fileReader = new FileReader(inputFile); 
      List<InputBean> csvContentsList = CSVParser.readContentsFromCSV(fileReader, new InputBean()); 
      for (InputBean inputBean : csvContentsList) { 
       boolean output = false; 
       // some business logic 
       outputConsolidation.put(inputBean.toString(), output); 
      } 
     } catch (Exception e) { 
      //logging 
     } 
    } 

    public synchronized Queue<File> getFilesToBeProcessedQueue() { 
     return filesToBeProcessedQueue; 
    } 
} 

FileProcessor Tasker (Consumer Scheduler) Этот класс создает Tasker для FileProcessor и запускает его в запланированный интервал.

public final class FileProcessorTasker extends TimerTask { 

    private static Long timeInterval = 5000l; 

    @Override 
    public void run() { 
     FileProcessor.getInstance().run(); 
    } 

    public static void start() { 
     TimerTask timerTask = new FileProcessorTasker(); 
     Timer timer = new Timer(); 
     timer.schedule(timerTask, timeInterval, timeInterval); 
    } 

    public static void main(String[] args) { 
     FileProcessorTasker.start(); 
    } 
} 

Обе программы работают хорошо индивидуально. Но когда они связаны между собой через filesToBeProcessedQueue, похоже, что это не работает. Проблема FTPClientPolling создает объект FileProcessor и добавляет файл в очередь. НоFileProcessorTasker создает еще один объект FileProcessor, который имеет размер очереди как ноль. Это два разных объекта - проблема. Как создать два объекта, когда класс singleton. Я что-то упустил в реализации singleton?

ответ

1

Прежде всего, не используйте Timer и TimerTask. Для многопоточности используйте ExecutorService.

И используйте Eager Initialization в ваших классах Singleton. Или дважды проверили блокировку нуля, чтобы сделать вас синглтонным синглтоном.

FTPClientPolling.java

public class FTPClientPolling { 

    private static FTPClientPolling instance = new FTPClientPolling(); 

    private FTPClientPolling() { 
     logger.info("Object created for Client Polling"); 
     initializeFTPClient(); 
    } 

    public static FTPClientPolling getInstance() { 
     return instance; 
    } 

    public static void initializeFTPClient() { 
     // initialize the values from properties file 
    } 

    public void startPolling() { 

     FTPClient ftpClient = null; 
     try { 
      //connecting to ftp server 

      //iterating the files in it 
      FTPFile[] filesList = ftpClient.listFiles(); 
      for (FTPFile tmpFile : filesList) { 
       //.. 
       File tempFile = File.createTempFile(tmpFile.getName(), null); 
       FileOutputStream fileOut = new FileOutputStream(tempFile); 
       ftpClient.retrieveFile(tmpFile.getName(), fileOut); 

       //adding the file to the Queue of the file processor 
       FileProcessor.getInstance().getFilesToBeProcessedQueue().add(tempFile); 
      } 

      if (ftpClient.isConnected()) 
       ftpClient.disconnect(); 
     } catch (Exception e) { 
      //logging 
     } finally { 
      //closing ftpclient 
     } 

    } 
} 

FileProcessor.java

public final class FileProcessor { 

    private static FileProcessor instance = new FileProcessor(); 
    private Queue<File> filesToBeProcessedQueue = new ArrayBlockingQueue<File>(10); 

    private FileProcessor() { 
    } 

    public static FileProcessor getInstance() { 
     return instance; 
    } 

    public void run() { 
     while (!filesToBeProcessedQueue.isEmpty()) { 
      processSyncFiles(filesToBeProcessedQueue.poll()); 
     } 
    } 

    private void processSyncFiles(File inputFile) { 
     try { 
      HashMap<String, Boolean> outputConsolidation = new HashMap<String, Boolean>(); 

      FileReader fileReader = new FileReader(inputFile); 
      List<InputBean> csvContentsList = CSVParser.readContentsFromCSV(fileReader, new InputBean()); 
      for (InputBean inputBean : csvContentsList) { 
       boolean output = false; 
       // some business logic 
       outputConsolidation.put(inputBean.toString(), output); 
      } 
     } catch (Exception e) { 
      //logging 
     } 
    } 

    public synchronized Queue<File> getFilesToBeProcessedQueue() { 
     return filesToBeProcessedQueue; 
    } 
} 

Прочитайте это post для получения дополнительной информации.

Смежные вопросы