2013-10-13 4 views
0

Мой основной класс генерирует несколько потоков на основе некоторых правил. (20-40 потоков живут долгое время). Каждый поток создает несколько потоков (короткое время) -> Я использую executer для этого. Мне нужно работать над многоразмерными массивами в коротких временных потоках -> Я написал его, как в коде ниже ->, но я думаю, что он не эффективен, так как я передаю его столько раз, сколько столько потоков/задания --. Я попытался получить доступ к нему непосредственно из потоков (объявив его общедоступным -> без успеха) -> будет рад получить комментарии/советы о том, как его улучшить. Я также рассмотрю следующий шаг, чтобы вернуть массив измерения 1 (что может быть лучше, просто обновить его в классе Assetfactory) -> и я не знаю, как это сделать. см. Код ниже. благодаря PazДоступ к многопоточным массивам Java для больших массивов

import java.util.concurrent.*; 
import java.util.logging.Level; 

public class AssetFactory implements Runnable{ 
    private volatile boolean stop = false; 
    private volatile String feed ; 
    private double[][][] PeriodRates= new double[10][500][4]; 

    private String TimeStr,Bid,periodicalRateIndicator; 
    private final BlockingQueue<String> workQueue; 
    ExecutorService IndicatorPool = Executors.newCachedThreadPool(); 

    public AssetFactory(BlockingQueue<String> workQueue) { 
     this.workQueue = workQueue; 
    } 

    @Override 
    public void run(){ 
     while (!stop) { 
     try{ 
      feed = workQueue.take(); 
      periodicalRateIndicator = CheckPeriod(TimeStr, Bid) ; 
      if (periodicalRateIndicator.length() >0) { 
       IndicatorPool.submit(new CalcMvg(periodicalRateIndicator,PeriodRates)); 
      } 

      } 

      if ("Stop".equals(feed)) { 
       stop = true ; 
      } 

     } // try 
     catch (InterruptedException ex) { 
      logger.log(Level.SEVERE, null, ex); 
      stop = true; 
     } 


     } // while 
    } // run 

Вот класс CalcMVG

public class CalcMvg implements Runnable { 
    private double [][][] PeriodRates = new double[10][500][4]; 

    public CalcMvg(String Periods, double[][][] PeriodRates) { 
     System.out.println(Periods); 
     this.PeriodRates = PeriodRates ; 
    } 

    @Override 
    public void run(){ 
     try{ 
      // do some work with the data of PeriodRates array e.g. print it (no changes to array 
      System.out.println(PeriodRates[1][1][1]); 
      } 
     catch (Exception ex){ 
      System.out.println(Thread.currentThread().getName() + ex.getMessage()); 
      logger.log(Level.SEVERE, null, ex); 
     } 
     }//run 

    } // mvg class 
+0

Я не вижу конкретного вопроса в вашем сообщении выше. Просьба уточнить это для нас. –

+0

Здесь есть два вопроса, один из них - вопрос проверки кода, а другой - вопрос SO, но неясно. Эти вопросы не только не относятся к одному вопросу о SO, они принадлежат на разных * сайтах *. Вопросы обзора кода можно найти здесь: http://codereview.stackexchange.com/ –

+0

спасибо, мои вопросы: 1.Я ищу для лучшего доступа к массиву вместо отправки его в качестве параметра созданного потока. 2. Как получить доступ назад от созданной задачи к массиву в классе AssetFactory. – user2319608

ответ

4

Есть несколько вещей происходит здесь, которые, кажется, неправильно, но это трудно дать хороший ответ с ограниченным количеством представленный код.

Первые фактические вопросы кодирования:

  • Там нет необходимости определять переменную как летучий, если только один поток никогда не обращается к нему (остановка, подача)

  • Вы должны объявить переменные, используется только в локальном контексте (метод запуска) локально в этой функции, а не глобально для всего экземпляра (почти все переменные). Это позволяет JIT выполнять различные оптимизации.

  • Прерывание прерывания должно прерывать поток. Потому что он брошен как запрос прекратить работу потока.

  • В вашем примере кода workQueue, похоже, ничего не делает, кроме как отключить потоки или остановить их. Почему он не сразу загружает фактические рабочие потоки с требуемой рабочей нагрузкой?

И тогда код структура вопросы:

  • Вы используете темы кормить темы с работой. Это неэффективно, поскольку у вас есть только ограниченное количество ядер, которые действительно могут выполнять работу. Поскольку порядок выполнения потоков не определен, вполне вероятно, что IndicatorPool либо в основном бездействует, либо переполняет задачи, которые еще не выполнены.

  • Если у вас есть конечный набор выполняемых работ, ExecutorCompletionService может быть полезен для вашей задачи.

Я думаю, что вы получите лучшее увеличение скорости, изменив структуру кода. Представьте себе следующее (при условии, что я правильно понял ваш вопрос):

  • Существует очередь блокирования задач, которые подают какой-либо источник данных (например, файл поток, сети).

  • Набор рабочих нитей, равный количеству ядер, находится на этом источнике данных для ввода, который затем обрабатывается и помещается в очередь завершения.

  • Конкретный набор данных является «терминатором» для вашей работы (например, «null»). Если поток встречает этот терминатор, он заканчивает цикл и выключается.

Теперь следующий справедлив для этой конструкции:

Случая 1: Источник данных является узким местом. Невозможно ускорить использование нескольких потоков, так как ваш жесткий диск/сеть не будет работать быстрее, если вы попросите более часто.

Корпус 2: мощность обработки на вашей машине - это горлышко бутылки, так как вы не можете обрабатывать больше данных, чем могут обрабатывать рабочие нити/сердечники на вашей машине.

В обоих случаях делается вывод, что рабочие потоки должны быть теми, которые ищут новые данные, как только они будут готовы к его обработке. Поскольку либо они должны быть переведены на удержание, либо они должны дросселировать входящие данные. Это обеспечит максимальную пропускную способность.

Если все рабочие потоки прекращены, работа завершена. Это может быть i.E. отслеживается с использованием класса CyclicBarrier или Phaser.

Псевдо-код для рабочих потоков:

public void run() { 
    DataType e; 
    try { 
    while ((e = dataSource.next()) != null) { 
     process(e); 
    } 
    barrier.await(); 
    } catch (InterruptedException ex) { 
    } 
} 

Я надеюсь, что это полезно в вашем случае.

+0

Может ли подход, основанный на вилке, соответствовать его сценарию? Это позволило бы ему использовать встроенные классы Java ForkJoin вместо того, чтобы сворачивать свой собственный (всегда сложный). – user268396

+0

Зависит от объема данных и способа их создания. У ForkJoin есть две проблемы: одна из них заключается в том, что задача требует, чтобы задача была делящейся, а вторая - в StackOverflowException. – TwoThe

+0

thaks, о первых нескольких комментариях, которые написал u -> Я просто разместил здесь части кода и другие связанные темы (main и т. Д.), Поэтому мне нужен конкретный def. о решении -> в конце я его реализовал -> используя дополнительный blockqueue, который я передаю как ссылку на класс CalcMVG (задача), и он записывает свой результат в эту очередь -> Я собираюсь еще несколько задачи, которые будут записывать свои результаты в эту новую очередь -> в классе assetfactory, который отправляет эти задачи -> у меня есть доступ к очереди, и я использую эти результаты. – user2319608

0

Передача массива в качестве аргумента конструктору является разумным подходом, хотя, если вы не собираетесь копировать массив, нет необходимости инициализировать PeriodRates с большим массивом. Кажется расточительным выделить большой блок памяти, а затем переназначить его единственную ссылку сразу в конструкторе. Я хотел бы инициализировать его следующим образом:

private final double [][][] PeriodRates; 

public CalcMvg(String Periods, double[][][] PeriodRates) { 
    System.out.println(Periods); 
    this.PeriodRates = PeriodRates; 
} 

Другой вариант заключается в определении CalcMvg как внутренний класс AssetFactory и объявить PeriodRate окончательным. Это позволило бы экземплярам CalcMvg получить доступ к PeriodRate во внешнем экземпляре AssetFactory.

Возврат результата сложнее, поскольку он включает публикацию результата по потокам. Один из способов сделать это состоит в использовании синхронизированных методов:

private double[] result = null; 

private synchronized void setResult(double[] result) { 
    this.result = result; 
} 

public synchronized double[] getResult() { 
    if (result == null) { 
     throw new RuntimeException("Result has not been initialized for this instance: " + this); 
    } 

    return result; 
} 

Есть более продвинутые концепции Многопоточности доступны в библиотеках Java, например, Future, это может быть уместно в этом случае.

Что касается вашей озабоченности по поводу количества потоков, позволяющих библиотечному классу управлять распределением работы в пуле потоков, это может решить эту проблему. Что-то вроде Executor может помочь в этом.

+0

Я рассматриваю фьючерсы и вызываемые задачи -> но так как я не уверен, что и сколько это задерживает/останавливает процесс выполнения -> я использовал дополнительный blockingQueue, и это выглядит отлично, спасибо – user2319608

Смежные вопросы