2014-02-02 2 views
0

У меня есть сценарий, гдеСинхронизация Проблема при использовании ExecutorCompletionService

  1. текстовые файлы генерируются динамически на ежедневной основе. 0 до 8 в день. размер каждого файла может быть небольшим и большим. в зависимости от дня данные.
  2. Необходимо выполнить некоторые проверки (бизнес-проверки, правила) на них.

я реализовал в следующем образе, Его не ведет себя, как и следовало ожидать, кажется, я сделал что-то неправильно

для хранения результатов я следующий класс, будет 1 Результат класс для 1 файл

public class Result { 

    private String fileName; 
    private Map<RuleTypes, String> allResult = new HashMap<RuleTypes, String>(); 

     // setter , getter , constructor .. POJO 
} 

Правила подобны

public class ValidateRule1 implements Rule { 

    private String fileName; 

    public String getFileName() { 
     return fileName; 
    } 

    public void setFileName(String fileName) { 
     this.fileName = fileName; 
    } 

    @Override 
    public void init() { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public void runRule() { 
     System.out.println("Start running ... Rule 1 for "+fileName); 
     try { 
      Random r = new Random(); 
      int sleepRandomTime = r.nextInt(15-1) + 1; 
      Thread.sleep(sleepRandomTime) ; // simulate rule execution 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
     System.out.println("End running ... Rule 1 for "+fileName); 

    } 

    @Override 
    public RuleTypes getRuleName() { 
     return RuleTypes.Rule1; 
    } 

} 

Правило Фабрика как

public static Rule getRule(RuleTypes ruleName) { 
     Rule result=null; 

     switch(ruleName) { 

      case Rule1 : 
       result = new ValidateRule1(); // todo singleton 
       break; 

      case Rule2 : 
       result = new ValidateRule2(); // todo singleton 
       break; 

      case Rule3 : 
       result = new ValidateRule3(); // todo singleton 
       break; 
      ... 
      } 
      } 

Я называю Правила по следующим образом, я использую RuleFactory создать правило (создает одноэлементные объекты для правил)

final ConcurrentLinkedQueue<Rule> rulesToExecuteForModel = new ConcurrentLinkedQueue<Rule>(); 
rulesToExecuteForModel.add(RuleFactory.getRule(RuleTypes.Rule1)); 
     rulesToExecuteForModel.add(RuleFactory.getRule(RuleTypes.Rule2)); 
     rulesToExecuteForModel.add(RuleFactory.getRule(RuleTypes.Rule3)); 
     rulesToExecuteForModel.add(RuleFactory.getRule(RuleTypes.Rule4)); 
     rulesToExecuteForModel.add(RuleFactory.getRule(RuleTypes.Rule5)); 
     rulesToExecuteForModel.add(RuleFactory.getRule(RuleTypes.Rule6)); 
     rulesToExecuteForModel.add(RuleFactory.getRule(RuleTypes.Rule7)); 
     rulesToExecuteForModel.add(RuleFactory.getRule(RuleTypes.Rule8)); 


     // pick 1 file and run all rules for it , different threads can pick up different files concurrently ... dont think will need synchronization here 
     List<File> fileQueue = new LinkedList<File>(); 
     fileQueue.add(new File("../test/files/File1.20140203")); 
     fileQueue.add(new File("../test/files/File2.20140203")); 
     fileQueue.add(new File("../test/files/File3.20140203")); 
     fileQueue.add(new File("../test/files/File4.20140203")); 
     fileQueue.add(new File("../test/files/File5.20140203")); 
     fileQueue.add(new File("../test/files/File6.20140203")); 

     // Results Display ... 1 Result obj for 1 File 
     ConcurrentLinkedQueue<Result> fileWiseResult = new ConcurrentLinkedQueue<Result>(); 
     int maxNumOfFiles = fileQueue.size(); 

     // TODO : how can i exploit the fact that this program runs on 8 core machine ? does 1 thread correspond to 1 CPU ? i kept 8 here because it will run on 8 core machine 
     final ExecutorService pool = Executors.newFixedThreadPool(8); 
     final ExecutorCompletionService<Result> completionService = new ExecutorCompletionService<Result>(pool); 

     for (final File file : fileQueue) { 
      System.out.println("picked file "+file.getName()+" running ALL rules for it"); 
      final Future<Result> contentFuture = completionService.submit(new Callable<Result>() { 
       @Override 
       public Result call() throws Exception { 
        Result r = new Result(); // 1 file 1 Result object 
        r.setFileName(file.getName()); 
        Iterator<Rule> itr=rulesToExecuteForModel.iterator(); 
        // sequentially run different rules for same file 
        while (itr.hasNext()) { 
         Rule currentRule = itr.next(); 
         currentRule.setFileName(file.getName()); 
         currentRule.runRule(); 
         // take fileName/File as parameter , String result for currentFile and currentRule 
         r.getFileResult().put(currentRule.getRuleName(), "result for "+currentRule.getRuleName().toString()); 
        } 
        return r; 
       } 
      }); 
     } 

     for(int i = 0; i <maxNumOfFiles; ++i) { 
     Future<Result> future; 
     try { 
      future = completionService.take(); 
      Result currentResult=null; 
      try { 
       currentResult = future.get(); 
      } catch (ExecutionException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      System.out.println("Result for file ["+currentResult.getFileName()+"] is ["+currentResult.getFileResult()+"]"); 
      fileWiseResult.add(currentResult); 
     } catch (InterruptedException e1) { 
      e1.printStackTrace(); 
     } 
    } 

выход является как

picked file File1.20140203 running rules for it 
Start running ... Rule 1 for File1.20140203 
End running ... Rule 1 for File1.20140203 

E 
Start running ... Rule 2 for File1.20140203 
End running ... Rule 2 for File1.20140203 
End running ... Rule 2 for File1.20140203 

Start running ... Rule 3 for File1.20140203 
End running ... Rule 3 for File1.20140203 

Start running ... Rule 4 for File1.20140203 
End running ... Rule 4 for File1.20140203 
End running ... Rule 4 for File1.20140203 

Start running ... Rule 5 for File1.20140203 
End running ... Rule 5 for File1.20140203 
End running ... Rule 5 for File1.20140203 
End running ... Rule 5 for File1.20140203 
End running ... Rule 5 for File1.20140203 

Start running ... Rule 6 for File1.20140203 
End running ... Rule 6 for File1.20140203 
End running ... Rule 6 for File1.20140203 

Start running ... Rule 7 for File1.20140203 
End running ... Rule 7 for File1.20140203 
End running ... Rule 7 for File1.20140203 

Start running ... Rule 8 for File1.20140203 
End running ... Rule 8 for File1.20140203 
End running ... Rule 8 for File1.20140203 
Result for file [File1.20140203] is [{Rule2=result for Rule2, Rule5=result for Rule5, Rule1=result for Rule1, Rule6=result for Rule6, Rule4=result for Rule4, Rule7=result for Rule7, Rule3=result for Rule3, Rule8=result for Rule8}] 

Я ожидал ONE, как «Начало работы ... Правило 2 для файла1.20140203» и ОДНА ЛЮБОВЬ «Завершение работы ... Правило 2 для File1.20140203"

Но, как видно на выходе, число Таймс "End"> число Times "Start"

Также я наблюдать

Start running ... Rule1 for File5.20140203 
Start running ... Rule1 for File6.20140203 
Start running ... Rule1 for File6.20140203 
Start running ... Rule1 for File4.20140203 
Start running ... Rule1 for File5.20140203 
Start running ... Rule1 for File4.20140203 

Я ожидая 6 уникальных имен файлов в приведенном выше сообщении журнала

Первые вопросы: Что я делаю неправильно? как я могу исправить это?

Второй вопрос (оптимизация .. не актуальная проблема) Эта программа будет работать на 8 основной машине .... если я держать размер пула 8 означает 8 потоков будет работать паралельно ... один каждое ядро ​​... есть способ, которым я могу это обеспечить?

+0

Не похоже, что вы используете 'ExecutorCompletionService' вообще. Как правило, вам не нужно управлять «Будущим», когда вы используете ECS. – Gray

+0

спасибо Серый за быстрый ответ ..... я сейчас не управляю объектом Futrure (изменен код, о котором идет речь, все еще такая же проблема) ... ТАКЖЕ я выделил вывод (надеюсь, что вы сможете увидеть число END> количество запусков) .... Дайте мне знать, если мне нужно уточнить еще – Lav

+0

Я думаю, что проблема связана с синхронизацией правил, мои правила не реализуют Runnable/Callable ... когда я контролирую выходной журнал, то вижу, что «Начните работать. .. Правило 1 для File1.20140203 «Я ожидал это сообщение для правила1 для 6 DIFFERENT файлов .... но я заметил, что имена файлов повторяются (добавление же к вопросу) – Lav

ответ

1

Но, как видно на выходе, число Таймс "End"> число Times "Start"

Ваша ошибка на этой линии:

currentRule.setFileName (файл .getName());

Несколько потоков используют один и тот же набор правил. Поэтому правила не должны иметь постоянного состояния.Вы должны передать имя файла каждому вызову метода правила.

Вы должны изменить свой метод runRule() взять fileName аргумент и не иметь один в качестве поля для ваших классов правил.

Эта программа будет работать на 8-ядерном компьютере .... если я сохраню размер пула 8, это означает, что 8 потоков будут выполняться параллельно ... по одному каждому ядру ... есть способ, которым я могу обеспечить это?

Должно быть да, но нет способа обеспечить его. Другие процессы, выполняемые на ОС, также должны обслуживаться. Это также зависит от того, сколько операций ввода-вывода и других операций блокировки происходит в вашем приложении, независимо от того, все ли они работают параллельно. Правильнее всего это изменить количество потоков в пуле, пока не получите оптимальную скорость вашего приложения.

+0

Спасибо, что он сработал отлично – Lav

+0

есть способ, который я могу написать тестовые примеры для тестирования «syncronization», а не логики проверки правильности ... I хотеть тестовый пример, который проверяет отсутствие проблемы синхронизации. – Lav

+0

Это чрезвычайно сложно @ Lav. Вы можете написать тесты, которые выполняют код, но они никогда не смогут «проверить» это точно. К сожалению, хороший обзор кода - лучшая практика. Будьте очень осторожны с общими объектами. – Gray

Смежные вопросы