У меня есть сценарий, гдеСинхронизация Проблема при использовании ExecutorCompletionService
- текстовые файлы генерируются динамически на ежедневной основе. 0 до 8 в день. размер каждого файла может быть небольшим и большим. в зависимости от дня данные.
- Необходимо выполнить некоторые проверки (бизнес-проверки, правила) на них.
я реализовал в следующем образе, Его не ведет себя, как и следовало ожидать, кажется, я сделал что-то неправильно
для хранения результатов я следующий класс, будет 1 Результат класс для 1 файл
public class Result {
private String fileName;
private Map<RuleTypes, String> allResult = new HashMap<RuleTypes, String>();
// setter , getter , constructor .. POJO
}
Правила подобны
public class ValidateRule1 implements Rule {
private String fileName;
public String getFileName() {
return fileName;
}
public void setFileName(String fileName) {
this.fileName = fileName;
}
@Override
public void init() {
// TODO Auto-generated method stub
}
@Override
public void runRule() {
System.out.println("Start running ... Rule 1 for "+fileName);
try {
Random r = new Random();
int sleepRandomTime = r.nextInt(15-1) + 1;
Thread.sleep(sleepRandomTime) ; // simulate rule execution
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("End running ... Rule 1 for "+fileName);
}
@Override
public RuleTypes getRuleName() {
return RuleTypes.Rule1;
}
}
Правило Фабрика как
public static Rule getRule(RuleTypes ruleName) {
Rule result=null;
switch(ruleName) {
case Rule1 :
result = new ValidateRule1(); // todo singleton
break;
case Rule2 :
result = new ValidateRule2(); // todo singleton
break;
case Rule3 :
result = new ValidateRule3(); // todo singleton
break;
...
}
}
Я называю Правила по следующим образом, я использую RuleFactory создать правило (создает одноэлементные объекты для правил)
final ConcurrentLinkedQueue<Rule> rulesToExecuteForModel = new ConcurrentLinkedQueue<Rule>();
rulesToExecuteForModel.add(RuleFactory.getRule(RuleTypes.Rule1));
rulesToExecuteForModel.add(RuleFactory.getRule(RuleTypes.Rule2));
rulesToExecuteForModel.add(RuleFactory.getRule(RuleTypes.Rule3));
rulesToExecuteForModel.add(RuleFactory.getRule(RuleTypes.Rule4));
rulesToExecuteForModel.add(RuleFactory.getRule(RuleTypes.Rule5));
rulesToExecuteForModel.add(RuleFactory.getRule(RuleTypes.Rule6));
rulesToExecuteForModel.add(RuleFactory.getRule(RuleTypes.Rule7));
rulesToExecuteForModel.add(RuleFactory.getRule(RuleTypes.Rule8));
// pick 1 file and run all rules for it , different threads can pick up different files concurrently ... dont think will need synchronization here
List<File> fileQueue = new LinkedList<File>();
fileQueue.add(new File("../test/files/File1.20140203"));
fileQueue.add(new File("../test/files/File2.20140203"));
fileQueue.add(new File("../test/files/File3.20140203"));
fileQueue.add(new File("../test/files/File4.20140203"));
fileQueue.add(new File("../test/files/File5.20140203"));
fileQueue.add(new File("../test/files/File6.20140203"));
// Results Display ... 1 Result obj for 1 File
ConcurrentLinkedQueue<Result> fileWiseResult = new ConcurrentLinkedQueue<Result>();
int maxNumOfFiles = fileQueue.size();
// TODO : how can i exploit the fact that this program runs on 8 core machine ? does 1 thread correspond to 1 CPU ? i kept 8 here because it will run on 8 core machine
final ExecutorService pool = Executors.newFixedThreadPool(8);
final ExecutorCompletionService<Result> completionService = new ExecutorCompletionService<Result>(pool);
for (final File file : fileQueue) {
System.out.println("picked file "+file.getName()+" running ALL rules for it");
final Future<Result> contentFuture = completionService.submit(new Callable<Result>() {
@Override
public Result call() throws Exception {
Result r = new Result(); // 1 file 1 Result object
r.setFileName(file.getName());
Iterator<Rule> itr=rulesToExecuteForModel.iterator();
// sequentially run different rules for same file
while (itr.hasNext()) {
Rule currentRule = itr.next();
currentRule.setFileName(file.getName());
currentRule.runRule();
// take fileName/File as parameter , String result for currentFile and currentRule
r.getFileResult().put(currentRule.getRuleName(), "result for "+currentRule.getRuleName().toString());
}
return r;
}
});
}
for(int i = 0; i <maxNumOfFiles; ++i) {
Future<Result> future;
try {
future = completionService.take();
Result currentResult=null;
try {
currentResult = future.get();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Result for file ["+currentResult.getFileName()+"] is ["+currentResult.getFileResult()+"]");
fileWiseResult.add(currentResult);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
выход является как
picked file File1.20140203 running rules for it
Start running ... Rule 1 for File1.20140203
End running ... Rule 1 for File1.20140203
E
Start running ... Rule 2 for File1.20140203
End running ... Rule 2 for File1.20140203
End running ... Rule 2 for File1.20140203
Start running ... Rule 3 for File1.20140203
End running ... Rule 3 for File1.20140203
Start running ... Rule 4 for File1.20140203
End running ... Rule 4 for File1.20140203
End running ... Rule 4 for File1.20140203
Start running ... Rule 5 for File1.20140203
End running ... Rule 5 for File1.20140203
End running ... Rule 5 for File1.20140203
End running ... Rule 5 for File1.20140203
End running ... Rule 5 for File1.20140203
Start running ... Rule 6 for File1.20140203
End running ... Rule 6 for File1.20140203
End running ... Rule 6 for File1.20140203
Start running ... Rule 7 for File1.20140203
End running ... Rule 7 for File1.20140203
End running ... Rule 7 for File1.20140203
Start running ... Rule 8 for File1.20140203
End running ... Rule 8 for File1.20140203
End running ... Rule 8 for File1.20140203
Result for file [File1.20140203] is [{Rule2=result for Rule2, Rule5=result for Rule5, Rule1=result for Rule1, Rule6=result for Rule6, Rule4=result for Rule4, Rule7=result for Rule7, Rule3=result for Rule3, Rule8=result for Rule8}]
Я ожидал ONE, как «Начало работы ... Правило 2 для файла1.20140203» и ОДНА ЛЮБОВЬ «Завершение работы ... Правило 2 для File1.20140203"
Но, как видно на выходе, число Таймс "End"> число Times "Start"
Также я наблюдать
Start running ... Rule1 for File5.20140203
Start running ... Rule1 for File6.20140203
Start running ... Rule1 for File6.20140203
Start running ... Rule1 for File4.20140203
Start running ... Rule1 for File5.20140203
Start running ... Rule1 for File4.20140203
Я ожидая 6 уникальных имен файлов в приведенном выше сообщении журнала
Первые вопросы: Что я делаю неправильно? как я могу исправить это?
Второй вопрос (оптимизация .. не актуальная проблема) Эта программа будет работать на 8 основной машине .... если я держать размер пула 8 означает 8 потоков будет работать паралельно ... один каждое ядро ... есть способ, которым я могу это обеспечить?
Не похоже, что вы используете 'ExecutorCompletionService' вообще. Как правило, вам не нужно управлять «Будущим», когда вы используете ECS. – Gray
спасибо Серый за быстрый ответ ..... я сейчас не управляю объектом Futrure (изменен код, о котором идет речь, все еще такая же проблема) ... ТАКЖЕ я выделил вывод (надеюсь, что вы сможете увидеть число END> количество запусков) .... Дайте мне знать, если мне нужно уточнить еще – Lav
Я думаю, что проблема связана с синхронизацией правил, мои правила не реализуют Runnable/Callable ... когда я контролирую выходной журнал, то вижу, что «Начните работать. .. Правило 1 для File1.20140203 «Я ожидал это сообщение для правила1 для 6 DIFFERENT файлов .... но я заметил, что имена файлов повторяются (добавление же к вопросу) – Lav