2014-04-07 3 views
0

Я пытаюсь выполнить дорогостоящую задачу переменной длины для элементов, полученных последовательным образом. Крайне важно, чтобы порядок элементов поддерживался при быстрой обработке каждого элемента.Поддержание FIFO при использовании потоков в Java

Ниже SSCWE (W для неверного!) И моя попытка распараллеливания обработки. Есть ли способ гарантировать, что каждый вызов processSomething() выполняется в своем потоке, сохраняя при этом FIFO, когда я смотрю на ExecutorCompletionService?

import java.util.concurrent.Callable; 
import java.util.concurrent.CompletionService; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorCompletionService; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 

public class ThreadingHelper { 

public static void main(String[] args) { 

    ExecutorService pool = Executors.newFixedThreadPool(20); 
    ExecutorCompletionService<String> compService 
     = new ExecutorCompletionService<String>(pool); 

    // process some data 
    processSomething(compService, "1."); 
    processSomething(compService, "2.."); 
    processSomething(compService, "3..."); 
    processSomething(compService, "4...."); 
    processSomething(compService, "5....."); 

    // print out the processed data 
    try { 
     System.out.println(compService.take().get()); 
     System.out.println(compService.take().get()); 
     System.out.println(compService.take().get()); 
     System.out.println(compService.take().get()); 
     System.out.println(compService.take().get()); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } catch (ExecutionException e) { 
     e.printStackTrace(); 
    } 

} 

public static synchronized void processSomething(
     CompletionService<String> compService, final String x) { 
    Callable<String> c = new Callable<String>() { 

     @Override 
     public String call() throws Exception { 
      // this represents the variable and expensive 
          // amount of time it takes to process x 
      long rand = (long) (Math.random() * 100); 
      Thread.sleep(rand); 

      // this represents the processing of x 
      String xProcessed = x.replace(".", "!"); 

      return xProcessed; 
     } 
    }; 
    compService.submit(c); 
} 
} 

Типичный выход

4!!!! 
2!! 
1! 
5!!!!! 
3!!! 

, но я хочу

1! 
2!! 
3!!! 
4!!!! 
5!!!!! 
+0

Метод 'processSomething' синхронизирован, но ваш критический раздел находится в' call'. –

+0

Если вы хотите, чтобы результат был в порядке, вам нужно сделать ваш конвейер более асинхронным. Служба завершения предназначена точно для того, чтобы обрабатывать вещи не по порядку - чтобы получить все, как они сделаны. –

+0

@BoristheSpider, вы имеете в виду более синхронный? Кажется, что там должен быть класс, который помогает мне идти полным ходом вперед, но в конце убедитесь, что все фигуры все еще в порядке. – chessofnerd

ответ

2

Используйте Futures вместо CompletionService, чтобы получить результаты в определенном порядке, и по-прежнему пользоваться параллельно Исполнение:

ExecutorService pool = Executors.newFixedThreadPool(20); 
List<Future<String>> futures = new ArrayList<>(); 
futures.add(pool.submit(makeCallable("1."))); 
// ... 
for (Future<String> future : futures) { 
    try { 
     System.out.println(future.get()); 
    } catch (...) { 
     ... 
    } 
} 

public static Callable<String> makeCallable(String x) { 
    Callable<String> c = ...; 
    return c; 
} 
+0

Большое спасибо! Я тестировал это с помощью System.currentTimeMillis(), насколько я могу судить, он удовлетворяет как требованиям порядка, так и распараллеливанию. – chessofnerd

+0

Если входящий msg является непрерывным, означает ли это, что будущий список является кольцевым буфером? – ying

+0

@ying: в этой ситуации вы можете использовать 'Queue' вместо' List'. Буфер _ring - это специальная реализация 'Queue' с фиксированным максимальным размером. – nosid

Смежные вопросы