2016-02-15 5 views
5

Недавно я начал многопользовательскую обработку в java. Поскольку я пишу программу для численных расчетов в своем университете, я решил сделать первые попытки, запрограммировав многопоточное матричное умножение.Умножение многопоточной матрицы

Это мой код. Пожалуйста, имейте в виду, что это было сделано только как первая попытка и не очень чистая.

public class MultithreadingTest{ 

     public static void main(String[] args) { 
      // TODO Auto-generated method stub 
      double[][] matrix1 = randomSquareMatrix(2000); 
      double[][] matrix2 = randomSquareMatrix(2000); 

      matrixMultiplication(matrix1,matrix2,true); 
      matrixMultiplicationSingleThread(matrix1, matrix2); 
      try { 
       matrixMultiplicationParallel(matrix1,matrix2, true); 
      } catch (InterruptedException | ExecutionException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      try { 
       matrixMultiplicationParallel2(matrix1,matrix2, true); 
      } catch (InterruptedException | ExecutionException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 

     } 

     public static double[][] randomSquareMatrix(int n){ 
      double[][] mat = new double[n][n]; 
      Random rand = new Random(); 
      for(int i=0; i<n; i++) for(int j=0; j<n; j++) mat[i][j]=rand.nextInt(10); 
      return mat; 
     } 
     public static void printSquareMat(double[][] mat){ 
      int n=mat.length; 
      for(int i=0; i<n; i++){ for(int j=0; j<n; j++) System.out.print(mat[i][j]+" "); System.out.print("\n");} 
      System.out.print("\n"); 
     } 

     public static void average(double[][] matrix) 
     { 
      int n=matrix.length; 
      double sum=0; 
      for(int i=0; i<n; i++) for(int j=0; j<n; j++) sum+=matrix[i][j]; 

      System.out.println("Average of all Elements of Matrix : "+(sum/(n*n))); 
     } 

     public static void matrixMultiplication(double[][] matrix1, double[][] matrix2, boolean printMatrix){ 

      int n=matrix1.length; 
      double[][] resultMatrix = new double[n][n]; 

      double startTime = System.currentTimeMillis(); 

      for(int i=0; i<n; i++)for(int j=0; j<n; j++)for(int k=0; k<n; k++) resultMatrix[i][j]+=matrix1[i][k]*matrix2[k][j]; 


      if (printMatrix && n<=5)for(int i=0; i<n; i++){for(int j=0; j<n; j++) System.out.print(resultMatrix[i][j]+" ");System.out.print("\n"); } 

      System.out.print("\n"); 
      System.out.println(((System.currentTimeMillis()-startTime)/1000)+ 
        " seconds for matrix of size "+n+" in main thread."); 
      average(resultMatrix); 
     } 

     public static void matrixMultiplicationSingleThread(double[][] m1, double[][] m2) 
     { 
      int n=m1.length; 
      double startTime = System.currentTimeMillis(); 
      Thread t = new Thread(new multiSingle(m1,m2)); 
      t.start(); 
      try { 
       t.join(); 
      } catch (InterruptedException e) { 
       // TODO Auto-generated catch block 
       System.out.println("Error"); 
       e.printStackTrace(); 
      } 
      System.out.print("\n"); 
      System.out.println(((System.currentTimeMillis()-startTime)/1000)+ 
        " seconds for matrix of size "+n+" in external Thread."); 

     } 

     public static void matrixMultiplicationParallel(double[][] matrix1, double[][] matrix2, boolean printMatrix) throws InterruptedException, ExecutionException{ 

      int n=matrix1.length; 
      double[][] resultMatrix=new double[n][n]; 
      double tmp; 
      ExecutorService exe = Executors.newFixedThreadPool(2); 
      Future<Double>[][] result = new Future[n][n]; 
      double startTime = System.currentTimeMillis(); 
      for(int i=0; i<n; i++) 
      { 
       for(int j=0; j<=i; j++) 
       { 
        tmp=matrix2[i][j]; 
        matrix2[i][j]=matrix2[j][i]; 
        matrix2[j][i]=tmp; 
       } 
      } 

      for(int i=0; i<n; i++) 
      { 
       for(int j=0; j<n; j++) 
       { 
        result[i][j] = exe.submit(new multi(matrix1[i],matrix2[j])); 
       } 
      } 

      exe.shutdown(); 
      exe.awaitTermination(1, TimeUnit.DAYS); 

      for(int i=0; i<n; i++) 
      { 
       for(int j=0; j<n; j++) 
       { 
        resultMatrix[i][j] = result[i][j].get(); 
       } 
      } 
      for(int i=0; i<n; i++) 
      { 
       for(int j=0; j<=i; j++) 
       { 
        tmp=matrix2[i][j]; 
        matrix2[i][j]=matrix2[j][i]; 
        matrix2[j][i]=tmp; 
       } 
      } 
      if (printMatrix && n<=5)for(int i=0; i<n; i++){for(int j=0; j<n; j++) System.out.print(resultMatrix[i][j]+" ");System.out.print("\n"); } 

      System.out.print("\n"); 
      System.out.println(((System.currentTimeMillis()-startTime)/1000)+ 
        " seconds for matrix of size "+n+" multithreaded with algorithm 1."); 
      average(resultMatrix); 
     } 

     public static void matrixMultiplicationParallel2(double[][] matrix1, double[][] matrix2, boolean printMatrix) throws InterruptedException, ExecutionException{ 

      int n=matrix1.length; 
      double[][] resultMatrix=new double[n][n]; 
      double tmp; 
      ExecutorService exe = Executors.newFixedThreadPool(2); 
      Future<Double>[][] result = new Future[n][n]; 
      double startTime = System.currentTimeMillis(); 


      for(int i=0; i<n; i++) 
      { 
       for(int j=0; j<n; j++) 
       { 
        result[i][j] = exe.submit(new multi2(i,j,matrix1,matrix2)); 
       } 
      } 

      exe.shutdown(); 

      exe.awaitTermination(1, TimeUnit.DAYS); 


      for(int i=0; i<n; i++) 
      { 
       for(int j=0; j<n; j++) 
       { 
        resultMatrix[i][j] = result[i][j].get(); 
       } 
      } 

      if (printMatrix && n<=5)for(int i=0; i<n; i++){for(int j=0; j<n; j++) System.out.print(resultMatrix[i][j]+" ");System.out.print("\n"); } 

      System.out.print("\n"); 
      System.out.println(((System.currentTimeMillis()-startTime)/1000)+ 
        " seconds for matrix of size "+n+" multithreaded with algorithm 2."); 
      average(resultMatrix); 
     } 

     public static class multi implements Callable<Double>{ 

      multi(double[] vec1, double[] vec2){ 
       this.vec1=vec1; this.vec2=vec2; 
      } 
      double result; 
      double[] vec1, vec2; 

      @Override 
      public Double call() { 
       result=0; 
       for(int i=0; i<vec1.length; i++) result+=vec1[i]*vec2[i]; 
       return result; 
      } 
     } 

     public static class multi2 implements Callable<Double>{ 

      multi2(int a, int b, double[][] vec1, double[][] vec2){ 
       this.a=a; this.b=b; this.vec1=vec1; this.vec2=vec2; 
      } 
      int a,b; 
      double result; 
      double[][] vec1, vec2; 

      @Override 
      public Double call() { 
       result=0; 
       for(int i=0; i<vec1.length; i++) result+=vec1[a][i]*vec2[i][b]; 
       return result; 
      } 
     } 

     public static class multiSingle implements Runnable{ 

      double[][] matrix1, matrix2; 

      multiSingle(double[][] m1, double[][] m2){ 
       matrix1=m1; 
       matrix2=m2; 
      } 
      public static void matrixMultiplication(double[][] matrix1, double[][] matrix2, boolean printMatrix){ 

       int n=matrix1.length; 
       double[][] resultMatrix = new double[n][n]; 

       for(int i=0; i<n; i++)for(int j=0; j<n; j++)for(int k=0; k<n; k++) resultMatrix[i][j]+=matrix1[i][k]*matrix2[k][j]; 

       MultithreadingTest.average(resultMatrix); 
      } 

      @Override 
      public void run() { 
       matrixMultiplication(matrix1, matrix2, false); 
      } 
     } 

    } 

У меня есть два общих вопроса для многопоточности, я надеюсь, что это не открытие новой темы для этого.

  1. Есть ли способ написать код без дополнительных классов для потоков, реализующих runnable или callable? Я смотрел на подходы, используя анонимные внутренние классы и лямбды, но, поскольку у меня есть информация о фонте, я не могу передавать параметры в потоки таким образом, поскольку run() и call() не принимают никаких, то есть, если параметры являются окончательными. Но, предполагая, что я пишу класс для операций с матрицами, я бы предпочел не писать дополнительный класс для каждой операции, которую я хочу запустить в потоке.
  2. Предполагая, что мой класс делает много многопоточных операций, создавая новый пул потоков и закрывая его в каждом методе, я бы потратил много ресурсов. Поэтому я хотел бы создать пул потоков в качестве члена ob my class, создавая его при необходимости и используя invokeAll. Но что произойдет, если мой объект будет удален? Будут ли у меня проблемы, так как я никогда не закрываю пул потоков? В C++ я использовал бы деструктор для этого. Или gc заботится обо всем в этом случае?

Теперь concering моего кода непосредственно:

Я реализовал матричное умножение четырех различных способов, так как метод работает в моем главном потоке, как метод работает в новом потоке, но до сих пор не многопоточный (чтобы не было бы не будет никаких фоновых taks в моем основном потоке, замедляющих его) и двух разных способов многопоточного умножения матрицы. Первая версия переносит вторую матрицу, передает умножение как вектор-вектор-умножение и переносит матрицу обратно в ее исходную форму. Вторая версия принимает матрицы напрямую, а additinaly принимает два индекса для определения строки и столбца матриц для вектор-векторного умножения.

Для всех версий я измерил время, необходимое для умножения, и вычислил среднее значение в результирующих матрицах, чтобы увидеть, являются ли результаты одинаковыми.

Я запустил этот код на двух компьютерах, как JVM, так и Windows 10. Первым моим ноутбуком, i5 5-го поколения, 2,6 ГГц двухъядерным, а вторым на настольном ПК, i5 4-го поколения, 4,2 ГГц четырехъядерного ядра.

Я ожидал, что мой настольный ПК будет намного быстрее. Я также ожидал, что многопоточные версии будут занимать примерно половину/четверть времени для потоковой версии с надписью, но тем более, что есть дополнительная работа для создания потоков и т. Д. И, наконец, я ожидал вторую многопоточную версию, которая не переносит одну матрицу дважды, чтобы быть быстрее, поскольку операций меньше.

После запуска кода я немного запутался о результатах, я надеюсь, что кто-то может объяснить мне:

Для обоих однопоточных методов мой ноутбук должен примерно 340s (для размера матрицы 3000). Поэтому я предполагаю, что в моем основном потоке нет дорогих фоновых задач. С другой стороны, мой настольный ПК нуждается в 440s. Теперь вопрос в том, почему мой ноутбук, который работает медленнее, намного быстрее? Даже если пятое поколение работает быстрее, чем 4-го поколения, поскольку мой настольный ПК работает в 1,6 раза быстрее, чем у моего ноутбука, я бы все же ожидал, что он будет быстрее.Разница между этими поколениями маловероятна.

Для многопоточных методов моему ноутбуку требуется примерно 34 с. Если многопоточность будет идеальной, то она не должна занимать меньше половины. Почему это на два потока в десять раз быстрее? То же самое касается моего настольного ПК. Используя четыре потока, умножение выполняется в 16 с вместо 440. Это похоже на то, что мой настольный ПК работает с той же скоростью, что и мой ноутбук, только на четырех, а не на двух потоках.

Теперь для сравнения между двумя многопоточными методами версия, которая переносит одну матрицу дважды, занимает примерно 34 с на моем ноутбуке, версия, которая берет матрицы, занимает примерно 200 секунд. Это звучит реалистично, так как это более половины однопоточного метода. Но почему это намного медленнее, чем первая версия? Я бы предположил, что перенос матрицы дважды будет медленнее, чем дополнительное время для получения матричных элементов? Есть ли что-то, что я пропускаю или работает с матрицей, которая намного медленнее, чем работа с вектором?

Я надеюсь, что кто-то может ответить на эти вопросы. Извините за то, что написал такой длинный пост.

уважением Торстен

ответ

2

Ответ на эту большую загадку: время, необходимое для умножения матрицы, во многом зависит от времени, затрачиваемого на перенос данных из ОЗУ в кеш процессора. У вас может быть 4 ядра, но у вас только 1 RAM-шина, поэтому вы не получите никакой выгоды, используя больше ядер (многопоточность), если все они блокируют друг друга, ожидая доступа к памяти.

Первый эксперимент, который вы должны попробовать, заключается в следующем: Напишите однопоточную версию, используя матричное транспонирование и векторное умножение. Вы обнаружите, что он намного быстрее - вероятно, примерно так же быстро, как многопоточная версия с транспозицией.

Причина, по которой исходная однопоточная версия настолько медленная, заключается в том, что она должна загружать блок кеша для каждой ячейки в столбце, который умножается. Если вы используете матрицу transpose, то все эти ячейки являются последовательными в памяти, и загрузка одного блока дает вам кучу из них.

Итак, если вы хотите оптимизировать матричное умножение, FIRST оптимизирует доступ к памяти для эффективности кеша, THEN делят работу между несколькими потоками - не более чем в два раза больше потоков, чем у вас есть ядра. Что-то еще просто тратит время и ресурсы с помощью контекстных переключателей и т. Д.

относительно других ваших вопросов:

1) Это удобно использовать лямбды, которые захватывают переменные из области, которая их создает, как:

for(int i=0; i<n; i++) 
{ 
    for(int j=0; j<n; j++) 
    { 
     final double[] v1 = matrix1[i]; 
     final double[] v2 = matrix2[j]; 
     result[i][j] = exe.submit(() -> vecdot(v1,v2)); 
    } 
} 

2) ОЕ будут заботиться о нем. Вам не нужно явно закрывать пул потоков для освобождения любых ресурсов.

+0

Я пробовал однопоточную транспозицию. 3s медленнее, чем многопоточный на моем ноутбуке –

1

Вы должны быть осторожны, чтобы свести к минимуму накладные расходы на создание потоков. Хорошей оценкой является использование инфраструктуры ForkJoin для разделения проблемы с использованием пула потоков. Эта структура

  • повторно использует существующий пул потоков.
  • разбивает задачу до тех пор, пока не будет достаточно, чтобы бассейн был занят, но не более.

Существует только один блок с плавающей точкой на ядро, поэтому ваша масштабируемость будет основываться на количестве ядер, которые у вас есть.

Предлагаю вам прочитать Fork Join Matrix Multiplication in Java Не удалось найти исходный код этого кода.

http://gee.cs.oswego.edu/dl/papers/fj.pdf

http://gee.cs.oswego.edu/dl/cpjslides/fj.pdf об использовании рамки ForkJoin.

+1

Привет, Спасибо за информацию, я взгляну на эту структуру. Что касается процессоров, то, что я выяснил до сих пор, заключается в том, что 5-е поколение - это в основном сокращенная версия 4-го поколения (22 нм -> 14 нм) и что она должна быть на 5% лучше в производительности. Но оба имеют турбонаддув, а тактовые частоты, которые я дал, - это максимальные скорости с турбонаддувом. Они нормально работают на частотах 2 и 3,4 ГГц. –

+0

@ThorstenSchmitz Вы можете быть правы, я обнаружил, что Skylake (6-й ген) до 20% быстрее, чем Haswell для той же ГГц. Я не пробовал широкополосный доступ. –

Смежные вопросы