2013-12-05 2 views
2

Добрый день,Запуск время работы отправленного на ExecutorService

Я пишу программу, в которой метод вызывается для каждой строки читать из текстового файла. Поскольку каждый вызов этого метода не зависит от какой-либо другой строки, я могу вызвать их параллельно. Чтобы максимизировать использование процессора, я использую ExecutorService, где я отправляю каждый вызов run(). Поскольку текстовый файл имеет 15 миллионов строк, мне нужно пошатнуть выполнение ExecutorService, чтобы не создавать слишком много заданий сразу (исключение OutOfMemory). Я также хочу отслеживать время выполнения каждого представленного запуска, поскольку я видел, что некоторые из них не заканчиваются. Проблема в том, что когда я пытаюсь использовать метод Future.get с таймаутом, тайм-аут относится к тому времени, когда он попал в очередь ExecutorService, а не с тех пор, как он начал работать, даже если он даже начался. Я хотел бы получить время с момента его запуска, а не с тех пор, как он попал в очередь.

код выглядит следующим образом:

ExecutorService executorService= Executors.newFixedThreadPool(ncpu); 
line = reader.readLine(); 
long start = System.currentTimeMillis(); 
HashMap<MyFut,String> runs = new HashMap<MyFut, String>(); 
HashMap<Future, MyFut> tasks = new HashMap<Future, MyFut>(); 
while ((line = reader.readLine()) != null) { 

String s = line.split("\t")[1]; 
final String m = line.split("\t")[0]; 
MyFut f = new MyFut(s, m); 
tasks.put(executorService.submit(f), f); 

runs.put(f, line); 

while (tasks.size()>ncpu*100){ 
    try { 
     Thread.sleep(100); 
    } catch (InterruptedException e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } 

    Iterator<Future> i = tasks.keySet().iterator(); 
    while(i.hasNext()){ 
     Future task = i.next(); 
     if (task.isDone()){ 
      i.remove(); 

     } else { 
      MyFut fut = tasks.get(task); 
      if (fut.elapsed()>10000){ 
       System.out.println(line); 
       task.cancel(true); 
       i.remove(); 
      } 
     } 
    } 
} 
} 

private static class MyFut implements Runnable{ 

private long start; 
String copy; 
String id2; 

public MyFut(String m, String id){ 
    super(); 

    copy=m; 
    id2 = id; 
} 

public long elapsed(){ 
    return System.currentTimeMillis()-start; 
} 



@Override 
public void run() { 
    start = System.currentTimeMillis(); 
    do something... 
} 

} 

Как вы можете видеть, что я стараюсь следить за сколько рабочих мест я послал и если принят порог, я немного подождать, пока некоторые не закончили. Я также пытаюсь проверить, не слишком ли длится какое-либо из заданий, чтобы отменить его, имея в виду, что не удалось, и продолжить выполнение. Это не работает, как я надеялся. Выполнение 10 секунд для одной задачи намного больше, чем необходимо (я получаю 1000 строк, сделанных в 70-130 сек. В зависимости от машины и количества процессоров).

Что я делаю неправильно? Не следует ли вызывать метод запуска в моем классе Runnable только тогда, когда какой-либо поток в ExecutorService свободен и начинает работать над ним? Я получаю много результатов, которые занимают более 10 секунд. Есть ли лучший способ добиться того, что я пытаюсь?

Спасибо.

ответ

0

Вы делаете свою работу более трудной, как и должно быть. Рамка Java обеспечивает все, что вы хотите, вам только нужно ее использовать.

Ограничения количества ожидающих рабочих элементов работает с использованием ограниченной очереди, но ExecutorService возвращаемого Executors.newFixedThreadPool() использует несвязанные очереди. Политика ожидания после завершения полной очереди может быть реализована через RejectedExecutionHandler. Все это выглядит так:

static class WaitingRejectionHandler implements RejectedExecutionHandler { 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
    try { 
     executor.getQueue().put(r);// block until capacity available 
    } catch(InterruptedException ex) { 
     throw new RejectedExecutionException(ex); 
    } 
    } 
} 
public static void main(String[] args) 
{ 
    final int nCPU=Runtime.getRuntime().availableProcessors(); 
    final int maxPendingJobs=100; 
    ExecutorService executorService=new ThreadPoolExecutor(nCPU, nCPU, 1, TimeUnit.MINUTES, 
    new ArrayBlockingQueue<Runnable>(maxPendingJobs), new WaitingRejectionHandler()); 

    // start flooding the `executorService` with jobs here 

Вот и все.

Измерение истекшего времени в работы довольно легко, как это не имеет ничего общего с многопоточностью:

long startTime=System.nanoTime(); 
// do your work here 
long elpasedTimeSoFar = System.nanoTime()-startTime; 

Но, возможно, вам не нужно больше, когда вы используете ограниченную очередь ,


Кстати Future.get метод с тайм-аутом делает не относятся к тому времени, так как он попал в очередь на ExecutorService, это относится ко времени вызова метода get самих. Другими словами, это говорит о том, как долго метод get может ждать, не более того.

+0

Спасибо за информацию. Теперь я внимательно читаю страницы javadoc «ThreadPoolExecutor» и «RejectedExecutionHandler». Я думаю, что они будут делать работу более грациозно, чем то, что я закодировал, хотя мне все еще нужно будет программировать логику, чтобы позаботиться о том, чтобы одна работа заходила слишком долго (повешена или какая-то другая проблема). И спасибо за разъяснение метода 'Future.get'. – Antonio

+0

Имейте в виду, что отмена висячего задания требует, чтобы ваша задача могла реагировать на прерывание потока. Таким образом, тяжелые вычисления должны регулярно проверять ['Thread.interrupted()'] (http://docs.oracle.com/javase/7/docs/api/java/lang/Thread.html#interrupted()) и блокировать (I/O) должны быть прерывистыми, например объявить, чтобы бросить 'InterruptedException'. Это сложно, если вы не знаете, почему некоторые задания занимают гораздо больше времени, чем ожидалось (другими словами, при какой операции они висят). – Holger

1

Если вы используете Future, я бы порекомендовал изменить Runnable to Callable и вернуть общее время выполнения исполнения в качестве результата. Ниже приведен пример кода:

import java.util.concurrent.Callable; 

public class MyFut implements Callable<Long> { 

    String copy; 
    String id2; 

    public MyFut(String m, String id) { 
     super(); 

     copy = m; 
     id2 = id; 
    } 

    @Override 
    public Long call() throws Exception { 
     long start = System.currentTimeMillis(); 
     //do something... 
     long end = System.currentTimeMillis(); 
     return (end - start); 
    } 
} 
+0

Это будет то же самое, но более прямолинейное, как мое. Кажется, что ОП происходит после чего-то другого. –

+0

Да, мне нужно проверить во время какой-то части, как долго это происходит. Дело в том, что какая-то линия, похоже, блокируется бесконечно и никогда не закончится. Я хотел бы знать, когда это происходит, когда-либо, часть времени, с тех пор, как она началась. – Antonio

+0

@ Антонио. Теперь я вижу вашу точку. Вы хотите вызвать 'истек ', если задача все еще запущена. Удаление моего ответа. –

Смежные вопросы