Добрый день,Запуск время работы отправленного на ExecutorService
Я пишу программу, в которой метод вызывается для каждой строки читать из текстового файла. Поскольку каждый вызов этого метода не зависит от какой-либо другой строки, я могу вызвать их параллельно. Чтобы максимизировать использование процессора, я использую ExecutorService, где я отправляю каждый вызов run(). Поскольку текстовый файл имеет 15 миллионов строк, мне нужно пошатнуть выполнение ExecutorService, чтобы не создавать слишком много заданий сразу (исключение OutOfMemory). Я также хочу отслеживать время выполнения каждого представленного запуска, поскольку я видел, что некоторые из них не заканчиваются. Проблема в том, что когда я пытаюсь использовать метод Future.get с таймаутом, тайм-аут относится к тому времени, когда он попал в очередь ExecutorService, а не с тех пор, как он начал работать, даже если он даже начался. Я хотел бы получить время с момента его запуска, а не с тех пор, как он попал в очередь.
код выглядит следующим образом:
ExecutorService executorService= Executors.newFixedThreadPool(ncpu);
line = reader.readLine();
long start = System.currentTimeMillis();
HashMap<MyFut,String> runs = new HashMap<MyFut, String>();
HashMap<Future, MyFut> tasks = new HashMap<Future, MyFut>();
while ((line = reader.readLine()) != null) {
String s = line.split("\t")[1];
final String m = line.split("\t")[0];
MyFut f = new MyFut(s, m);
tasks.put(executorService.submit(f), f);
runs.put(f, line);
while (tasks.size()>ncpu*100){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Iterator<Future> i = tasks.keySet().iterator();
while(i.hasNext()){
Future task = i.next();
if (task.isDone()){
i.remove();
} else {
MyFut fut = tasks.get(task);
if (fut.elapsed()>10000){
System.out.println(line);
task.cancel(true);
i.remove();
}
}
}
}
}
private static class MyFut implements Runnable{
private long start;
String copy;
String id2;
public MyFut(String m, String id){
super();
copy=m;
id2 = id;
}
public long elapsed(){
return System.currentTimeMillis()-start;
}
@Override
public void run() {
start = System.currentTimeMillis();
do something...
}
}
Как вы можете видеть, что я стараюсь следить за сколько рабочих мест я послал и если принят порог, я немного подождать, пока некоторые не закончили. Я также пытаюсь проверить, не слишком ли длится какое-либо из заданий, чтобы отменить его, имея в виду, что не удалось, и продолжить выполнение. Это не работает, как я надеялся. Выполнение 10 секунд для одной задачи намного больше, чем необходимо (я получаю 1000 строк, сделанных в 70-130 сек. В зависимости от машины и количества процессоров).
Что я делаю неправильно? Не следует ли вызывать метод запуска в моем классе Runnable только тогда, когда какой-либо поток в ExecutorService свободен и начинает работать над ним? Я получаю много результатов, которые занимают более 10 секунд. Есть ли лучший способ добиться того, что я пытаюсь?
Спасибо.
Спасибо за информацию. Теперь я внимательно читаю страницы javadoc «ThreadPoolExecutor» и «RejectedExecutionHandler». Я думаю, что они будут делать работу более грациозно, чем то, что я закодировал, хотя мне все еще нужно будет программировать логику, чтобы позаботиться о том, чтобы одна работа заходила слишком долго (повешена или какая-то другая проблема). И спасибо за разъяснение метода 'Future.get'. – Antonio
Имейте в виду, что отмена висячего задания требует, чтобы ваша задача могла реагировать на прерывание потока. Таким образом, тяжелые вычисления должны регулярно проверять ['Thread.interrupted()'] (http://docs.oracle.com/javase/7/docs/api/java/lang/Thread.html#interrupted()) и блокировать (I/O) должны быть прерывистыми, например объявить, чтобы бросить 'InterruptedException'. Это сложно, если вы не знаете, почему некоторые задания занимают гораздо больше времени, чем ожидалось (другими словами, при какой операции они висят). – Holger