2011-12-18 3 views
0

У меня есть вычисление (шифрование CTR), которое требует результатов в точном порядке.Java: результаты заказа, полученные из асинхронных задач

Для этого я создал многопоточный дизайн, который вычисляет указанные результаты, в этом случае результатом является ByteBuffer. Само вычисление, конечно, выполняется асинхронно, поэтому результаты могут быть доступны в любое время и в любом порядке. «Пользователь» - однопоточное приложение, которое использует результаты, вызывая метод, после чего байтовые буферы возвращаются в пул ресурсов указанным методом - управление ресурсами уже обрабатывается (с использованием поточного безопасного стека).

Теперь вопрос: мне нужно что-то, что агрегирует результаты и делает их доступными в в правильном порядке. Если следующий результат недоступен, метод, который должен вызвать пользователь, должен блокироваться до тех пор, пока он не будет. Кто-нибудь знает хорошую стратегию или класс в java.util.concurrent, который может возвращать асинхронно рассчитанные результаты в порядке?

Решение должно быть потокобезопасным. Я хотел бы избежать сторонних библиотек, Thread.sleep()/Thread.wait() и тегов, связанных с другими словами, отличными от «synchronized». Более того, задачи могут быть заданы, например, Исполнитель в правильном порядке, если это требуется. Это для исследований, поэтому не стесняйтесь использовать Java 1.6 или даже 1.7 конструкции.

Примечание: Я отметил те запросы, которые я хочу сохранить в классах, определенных в JRE и [шифровании], поскольку кому-то, возможно, уже пришлось иметь дело с этим, но сам вопрос касается исключительно Java & многопоточность.

+0

Почему бы не создать массив ByteBuffer и передать Bytebuffer в потоки? Когда все потоки выполняются, просто напишите ByteBuffers в порядке массива – fge

+2

Я удалил тег 'encryption', поскольку это не имеет никакого отношения к шифрованию. –

ответ

4

Используйте executors framework:

ExecutorService executorService = Executors.newFixedThreadPool(5); 
List<Future> futures = executorService.invokeAll(listOfCallables); 
for (Future future : futures) { 
    //do something with future.get(); 
} 
executorService.shutdown(); 

listOfCallables будет List<Callable<ByteBuffer>>, что вы построили для работы с данными. Например:

list.add(new SubTaskCalculator(1, 20)); 
list.add(new SubTaskCalculator(21, 40)); 
list.add(new SubTaskCalculator(41, 60)); 

(произвольные диапазоны чисел, регулировать, что к вашей задаче)

.get() блоков, пока результат не является полным, но в то же время другие задачи, также работает, поэтому, когда вы достигните их, их .get() будет готов.

+0

Я боялся, что это будет один из первых ответов (как это было ранее описано в StackOverflow). К сожалению, ByteBuffers становятся доступными после использования, и следующая задача должна быть запущена, когда другие ресурсы станут доступными. Другими словами, я не могу запустить все задачи одновременно. –

+0

+1 для использования исполнителей. @owlstead, я не понимаю вашего ответа. Что означает «стать доступным после использования»? Вы можете ограничить количество задач в действии за один раз (в примере код установлен на 5), и есть CompletionServices, чтобы делать причудливые вещи по мере завершения заданий. – user949300

+0

Я тоже этого не понял. Если ваша следующая задача зависит от предыдущей, сделайте это синхронно - у вас нет другого варианта – Bozho

1

возвращение результата в нужном порядке тривиально. По мере того, как каждый результат приходит, храните его в arraylist, и как только у вас есть ВСЕ результаты, просто отберите arraylist. Вы можете использовать PriorityQueue, чтобы сохранить результаты отсортированы в любое время по мере их поступления, но нет смысла делать это, так как вы не будете использовать результаты, прежде чем все они придут в любом случае.

Итак, что вы могли бы сделать это:

Объявите класс «WorkItem», который содержит один из вашего ByteArray, и его порядкового номера, так что они могут быть отсортированы по порядковому номеру.

В ваших рабочих потоков, сделать что-то вроде этого:

...do work and produce a work_item... 

synchronized(LockObject) 
{ 
    ResultList.Add(work_item); 
    number_of_results++; 
    LockObject.notifyAll(); 
} 

В основном потоке, сделать что-то вроде этого:

synchronized(LockObject) 
    while(number_of_results != number_of_items) 
     LockObject.wait(); 
ResultList.Sort(); 
...go ahead and use the results... 
+0

Спасибо MikeNakis.Однако идея состоит в том, что я делаю результаты доступными как можно скорее, иначе пользователь будет блокировать, пока вычисления выполняются по результатам, отличным от того, что было необходимо в то время. –

+0

ОК, это не было указано в вопросе. Итак, вы говорите, что вызывающий будет работать над результатами параллельно с кодом, который дает результаты, и вы хотите, чтобы он работал, пока следующий результат в порядке, но подождите, если еще нет? –

+0

Точно! Ты понял. –

0

Мой новый ответ после получения лучшего понимания того, что вы хотите

Объявите класс «WorkItem», который содержит один из ваших bytearrays и его порядковый номер, чтобы их можно сортировать по порядковому номеру.

Используйте java.util.PriorityQueue, который сортируется по порядковому номеру. По существу, все, что нам нужно, это то, что первым элементом в очереди приоритетов в любой момент будет следующий элемент для обработки.

Каждый рабочий поток сохраняет свой результат в PriorityQueue и выдает NotifyAll на какой-либо объект блокировки.

Главный поток ждет объекта блокировки, а затем, если в очереди находятся элементы, и если порядковый номер первого (в порядке очереди, не вычеркнутого) элемента в очереди равен количеству обработанных элементов , то он удаляет объект и обрабатывает его. Если нет, он ждет. Если все предметы были произведены и обработаны, это делается.

+0

Это звучит как очень перспективный ответ, будет исследовать использование объекта блокировки и очереди приоритетов. –

+0

Я пробовал какой-то код, но я продолжаю работать в условиях гонки, помещая элемент в очередь и просматривая/удаляя элемент вместе с объектом блокировки. –

+0

Многопоточность не проста. Обратите внимание, что PriorityQueue не синхронизируется, поэтому вам нужно предоставить свою собственную синхронизацию. Вы должны быть очень осторожны, когда вы блокируете вещи, и когда вы обращаетесь к ним. Вам нужно проверить условие перед вводом wait(), иначе вы никогда не сможете получать уведомления. Пробуждение от wait() не означает, что условие, которое вы ожидали, произошло, поэтому вам обычно нужно ждать в цикле до тех пор, пока условие не будет выполнено. Если вы не можете его решить, вы можете опубликовать свой код, чтобы люди могли помочь вам выявить проблему. –

Смежные вопросы