2014-09-04 4 views
0

Чтобы упростить мой случай, предположим, что я реализую двоичный поиск с использованием инфраструктуры виджета Java. Моя цель - найти определенное целочисленное значение (целевое целое число) в массиве целых чисел. Это можно сделать, разбив массив на половину, пока он не станет достаточно маленьким, чтобы выполнить серийный поиск. Результатом алгоритма должно быть логическое значение, указывающее, было ли целевое целое найдено в массиве или нет.Добавление условия остановки к рекурсии fork-join

Аналогичная проблема исследуется в Klaus Kreft's presentation в слайде 28 дальше. Однако целью Kreft является найти наибольшее количество в массиве, поэтому все записи должны быть отсканированы. В моем случае нет необходимости сканировать полный массив, поскольку, как только целевое целое было найдено, поиск можно остановить.

Моя проблема в том, что как только я столкнулся с целым целым числом, многие задачи уже вставлены в пул потоков, и мне нужно их отменить, так как нет смысла продолжать поиск. Я попытался вызвать getPool(). Terminate() изнутри RecursiveTask, но это не помогло, поскольку многие задачи уже поставлены в очередь, и я даже заметил, что новые onces также поставлены в очередь даже после выключения.

My текущее решение - использовать статическое volatile boolean, которое инициируется как «false» и проверять его значение в начале задачи. Если он все еще «ложный», тогда задача начинает свои работы, если это «истина», задача немедленно возвращается. На самом деле я могу использовать для этого рекурсивную атаку.

Поэтому я думаю, что это решение должно работать, но мне интересно, предлагает ли структура стандартный способ обработки таких случаев - то есть определение условия остановки для рекурсии, которая, следовательно, отменяет все задачи в очереди.

Обратите внимание, что если я хочу немедленно остановить все запущенные задачи, когда целевое целое найдено (по одной из запущенных задач), я должен проверить логическое значение после каждой строки в этих задачах и это может повлиять на производительность, поскольку значение что boolean не может быть кэширован (он определяется как изменчивый).

Так что, действительно, я думаю, что требуется некоторое стандартное решение и может быть предоставлено в виде очистки очереди и перехвата выполняемых задач. Но я не нашел такого решения, и мне интересно, знает ли кто-нибудь еще об этом или имеет лучшую идею.

Спасибо за ваше время, Ассаф

EDIT: Вот мой код тестирования:

package xxx; 

import java.util.Arrays; 
import java.util.Random; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ForkJoinPool; 
import java.util.concurrent.RecursiveAction; 

public class ForkJoinTest { 

    static final int ARRAY_SIZE = 1000; 
    static final int THRESHOLD = 10; 

    static final int MIN_VALUE = 0; 
    static final int MAX_VALUE = 100; 

    static Random rand = new Random(); 


    // a function for retrieving a random int in a specific range 
    public static int randInt(int min, int max) { 
     return rand.nextInt((max - min) + 1) + min; 
    } 

    static volatile boolean result = false; 
    static int[] array = new int[ARRAY_SIZE]; 
    static int target; 

    @SuppressWarnings("serial") 
    static class MyAction extends RecursiveAction { 

     int startIndex, endIndex; 

     public MyAction(int startIndex, int endIndex) { 
      this.startIndex = startIndex; 
      this.endIndex = endIndex; 
     } 

     // if the target integer was not found yet: we first check whether 
     // the entries to search are too few. In that case, we perform a 
     // sequential search and update the result if the target was found. 
     // Otherwise, we break the search into two parts and invoke the 
     // search in these two tasks. 
     @Override 
     protected void compute() { 
      if (!result) { 
       if (endIndex-startIndex<THRESHOLD) { 
        // 
        for (int i=startIndex ; i<endIndex ; i++) { 
         if (array[i]==target) { 
          result = true; 
         } 
        } 
       } else { 
        int middleIndex = (startIndex + endIndex)/2; 
        RecursiveAction action1 = new MyAction(startIndex, middleIndex); 
        RecursiveAction action2 = new MyAction(middleIndex+1, endIndex); 
        invokeAll(Arrays.asList(action1,action2)); 
       } 
      } 
     } 
    } 

    public static void main(String[] args) throws InterruptedException, ExecutionException { 
     for (int i=0 ; i<ARRAY_SIZE ; i++) { 
      array[i] = randInt(MIN_VALUE, MAX_VALUE); 
     } 
     target = randInt(MIN_VALUE, MAX_VALUE); 
     ForkJoinPool pool = new ForkJoinPool(); 
     pool.invoke(new MyAction(0,ARRAY_SIZE)); 
     System.out.println(result); 
    } 

} 
+0

Можете ли вы разместить код? Вы можете использовать определенную очередь, которую вы можете очистить, и, возможно, прервать выполняемые потоки, но просмотр кода проще дать вам правильные предложения. –

+0

Я поддерживаю фреймворк fork/join с открытым исходным кодом, который обеспечивает параллельный последовательный поиск, который обрабатывает вашу потребность в «поиске первой». Вы можете использовать его как есть или использовать код в качестве примера того, как сделать это самостоятельно. Ссылка sourceForge: http://sourceforge.net/projects/tymeacdse/?source=navbar – edharned

+0

Спасибо @edharned, я посмотрю. Это зависит от платформы fork/join от Java? Вы также используете volatile boolean/AtomicBoolean, чтобы остановить поиск? – Assaf

ответ

0

Я думаю, вы можете изобрести барьер для правильного решения.

Вы говорите, что ваш boolean stop флаг должен быть volatile и так будет мешать скорости решения - ну, да и нет - доступ к volatile действительно сделать кэш гиперемии, но вы рассмотрели AtomicBoolean?

Я считаю правильным решение использовать флаг AtomicBoolean, чтобы остановить все процессы. Вы должны проверить, как мелкозернистый способ, так как разумно заставить вашу систему быстро остановиться.

Было бы ошибкой пытаться очистить все очереди и прервать все потоки - это привело бы к ужасному беспорядку.

static AtomicBoolean finished = new AtomicBoolean(); 
    .... 

     protected void compute() { 
      if (!finished.get()) { 
       if (endIndex - startIndex < THRESHOLD) { 
        // 
        for (int i = startIndex; i < endIndex && !finished.get(); i++) { 
         if (array[i] == target) { 
          finished.set(true); 
          System.out.print("Found at " + i); 
         } 
        } 
       } else { 
        ... 
       } 
      } 
     } 
+0

Спасибо, поэтому вы предлагаете переключиться на AtomicBoolean и добавить проверку на его значение как часть цикла. Я должен был добавить этот тип проверки также к моему исходному коду с изменчивым. Но можете ли вы объяснить, почему AtomicBoolean предпочтительнее более неустойчивого в этом случае? с точки зрения производительности, я думал, что они почти одинаковы, поскольку оба они блокируются и не кэшируются. – Assaf

+0

@Assaf - Существует небольшая разница - доступ к 'volatile' сбрасывает все кеши при доступе к' AtomicBoolean' * должен * быть гораздо менее навязчивым - это не всегда лучше, но это не будет хуже. В вашем сценарии мало различий [Volatile boolean vs AtomicBoolean] (http://stackoverflow.com/questions/3786825/volatile-boolean-vs-atomicboolean) - я хочу сказать, что вы не должны * брать другой маршрут очистки очереди и прерывания потоков. – OldCurmudgeon

0

Я оставил комментарий выше о том, как сделать это, глядя на продукт с открытым исходным кодом, что делает это во многих встроенных-функций.Позвольте мне подробно остановиться здесь.

Если вы хотите отменить задачи, которые начинаются или в настоящее время выполняются, каждая задача должна знать о каждой другой задаче. Когда одна задача находит то, что она хочет, эта задача должна информировать каждую другую задачу для остановки. Вы не можете сделать это с помощью двоично-рекурсивного разделения (RecursiveTask и т. Д.), Так как вы рекурсивно создаете новые задачи, а старые задачи никогда не узнают о новых. Я уверен, что вы могли бы передать ссылку на поле стоп-я для каждой новой задачи, но это будет очень грязно, и отладка будет «интересной».

Вы можете сделать это с помощью Java8 CountedCompleter(). Рамки были заблокированы для поддержки этого класса, поэтому все, что должно выполняться каркасом, нужно делать вручную, но оно может работать.

Для каждой задачи требуется volatile boolean и метод, чтобы установить значение true. Каждой задаче нужен массив ссылок на все остальные задачи. Создайте все задачи впереди, каждый из которых имеет пустой массив текущих ссылок на другие задачи. Заполните массив ссылок на каждую другую задачу. Теперь отправьте каждую задачу (см. Документ для этого класса, fork() addPendingCount() и т. Д.)

Когда одна задача находит то, что она хочет, она использует массив ссылок на другие задачи, чтобы установить их логическое значение в true. Если есть условие гонки с несколькими потоками, это не имеет значения, поскольку все потоки заданы как «true». Вам также нужно будет обрабатывать tryComplete(), onCompletion() и т. Д. Этот класс очень запутан. Он используется для обработки потока Java8, которая сама по себе является историей.

То, что вы не можете сделать, это очистить ожидающие задания от требований до их начала. Вам нужно подождать, пока начнется задание, и проверьте значение boolean для true. Если выполнение является длительным, то вы также можете периодически проверять логическое значение true. Накладные расходы волатильного чтения не так уж плохи, и нет другого пути.

+0

Еще раз спасибо за ссылки и за подробное объяснение. Я заметил, что Java 8 имеет некоторые новые разработки, но немного сложно понять, когда нужно что-то делать. Вам нужно сделать еще немного чтения. Одна вещь, которую я не понимаю в вашем описании, - это то, почему использование массива флагов (который требует, чтобы все задачи должны были быть созданы заранее) вместо одного глобального флага (либо volatile, либо AtomicBoolean, как это было предложено @OldCurmudgeon выше). Как массив, так и решения с одним флагом аннулируют задачи, которые собираются начать, и оба могут «останавливать» задачи посередине, если флажок установлен. – Assaf

+0

@Assaf Точно, как вы это делаете, зависит от вас. Если у вас есть один volatile boolean в одном объекте, вам нужен указатель на эту ссылку: pointer.isTrue(); что больше накладных расходов, а затем просто проверить вашу собственную локальную переменную: if (stop-me) ... Когда вы только проверяете в начале задачи, кого это волнует. Но когда вы периодически проверяете, то накладные вопросы имеют значение. – edharned

+0

В примере поиска на странице https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CountedCompleter.html показан пример создания AtomicReference/Atomic , а затем передача этого атома в подзадачи , Подзадачи должны вызвать atomic.get() и ничего не делать, если результат не равен нулю. Я не понимаю, почему вам нужно CountedCompleter, потому что RecursiveAction будет делать то же самое. – snaran

Смежные вопросы