2015-06-16 2 views
5

Я использовал входные данные с ниже формате:Параллельная работа Исполнение в Спарк

0 
1 
2 
3 
4 
5 
… 
14 

Input Location: hdfs://localhost:9000/Input/datasource 

Я использовал следующий фрагмент кода, чтобы сохранить RDD в виде текстового файла с использованием нескольких потоков:

package org.apache.spark.examples; 

import java.io.Serializable; 
import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.Collections; 
import java.util.Comparator; 
import java.util.List; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 

import org.apache.avro.ipc.specific.Person; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.api.java.function.Function2; 
import org.apache.spark.api.java.function.PairFunction; 
import org.apache.spark.sql.DataFrame; 
import org.apache.spark.sql.Row; 
import org.apache.spark.sql.RowFactory; 
import org.apache.spark.sql.SQLContext; 
import org.apache.spark.sql.types.DataTypes; 
import org.apache.spark.sql.types.StructField; 
import org.apache.spark.sql.types.StructType; 

import scala.Tuple2; 

class RunnableDemo implements Runnable 
{ 

    private Thread t; 
    private String threadName; 
    private String path; 
    private JavaRDD<String> javaRDD; 
// private JavaSparkContext javaSparkContext; 

    RunnableDemo(String threadName,JavaRDD<String> javaRDD,String path) 
    { 
     this.threadName=threadName; 
     this.javaRDD=javaRDD; 
     this.path=path; 
//  this.javaSparkContext=javaSparkContext; 
    } 


    @Override 
    public void run() { 
     System.out.println("Running " + threadName);  
     try { 
      this.javaRDD.saveAsTextFile(path); 
//   System.out.println(this.javaRDD.count()); 
      Thread.sleep(50); 
      } catch (InterruptedException e) { 
       System.out.println("Thread " + threadName + " interrupted."); 
       } 
     System.out.println("Thread " + threadName + " exiting."); 
//  this.javaSparkContext.stop(); 
    } 

    public void start() 
     { 
      System.out.println("Starting " + threadName); 
      if (t == null) 
      { 
      t = new Thread (this, threadName); 
      t.start(); 
      } 
     } 

} 

public class SparkJavaTest { 



    public static void main(String[] args) { 

     //Spark Configurations: 

     SparkConf sparkConf=new SparkConf().setAppName("SparkJavaTest"); 

     JavaSparkContext ctx=new JavaSparkContext(sparkConf); 

     SQLContext sqlContext = new SQLContext(ctx);   

     JavaRDD<String> dataCollection=ctx.textFile("hdfs://yarncluster/Input/datasource"); 




     List<StructField> fields= new ArrayList<StructField>(); 
     fields.add(DataTypes.createStructField("Id", DataTypes.IntegerType,true)); 

     JavaRDD<Row> rowRDD =dataCollection.map(
       new Function<String, Row>() { 
        @Override 
        public Row call(String record) throws Exception { 
         String[] fields = record.split("\u0001");      
         return RowFactory.create(Integer.parseInt(fields[0].trim())); 
        }     
       });  

     StructType schema = DataTypes.createStructType(fields); 

     DataFrame dataFrame =sqlContext.createDataFrame(rowRDD, schema);   
     dataFrame.registerTempTable("data"); 

     long recordsCount=dataFrame.count();   
     long splitRecordsCount=5; 
     long splitCount =recordsCount/splitRecordsCount; 
     List<JavaRDD<Row>> list1=new ArrayList<JavaRDD<Row>>(); 

     for(int i=0;i<splitCount;i++) 
     { 
      long start = i*splitRecordsCount; 
      long end = (i+1)*splitRecordsCount;   
      DataFrame temp=sqlContext.sql("SELECT * FROM data WHERE Id >="+ start +" AND Id < " + end);   
      list1.add(temp.toJavaRDD()); 
     }  

     long length =list1.size(); 

     int split=0; 

     for (int i = 0; i < length; i++) { 

      JavaRDD rdd1 =list1.get(i); 

      JavaPairRDD rdd3=rdd1.cartesian(rdd1); 

      JavaPairRDD<Row,Row> rdd4=rdd3.filter(
        new Function<Tuple2<Row,Row>,Boolean>() 
        { 
         public Boolean call(Tuple2<Row,Row> s) 
         { 
          Row line1=s._1; 
          Row line2=s._2; 

          long app1 = Integer.parseInt(line1.get(0).toString()); 

          long app2 = Integer.parseInt(line2.get(0).toString()); 


          if(app1<app2) 
          { 
           return true; 
          } 
          return false; 
         } 
        }); 

      JavaRDD<String> test=rdd4.map(new Function<Tuple2<Row,Row>, String>() { 
       @Override 
       public String call(Tuple2<Row, Row> s) 
         throws Exception { 

        Row data1=s._1; 
        Row data2=s._2; 

        int x =Integer.parseInt(data1.get(0).toString()); 
        int y =Integer.parseInt(data2.get(0).toString()); 

        String result =x +","+ y+","+(x+y); 
        return result; 
       } 
      }); 

      RunnableDemo R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split); 

      R.start(); 
      split++;    
      R.start(); 

      int index =i; 

      while(index<length) 
      { 
       JavaRDD rdd2 =list1.get(index); 
       rdd3=rdd1.cartesian(rdd2); 

       rdd4=rdd3.filter(
         new Function<Tuple2<Row,Row>,Boolean>() 
         { 
          public Boolean call(Tuple2<Row,Row> s) 
          { 
           Row line1=s._1; 
           Row line2=s._2; 

           long app1 = Integer.parseInt(line1.get(0).toString()); 

           long app2 = Integer.parseInt(line2.get(0).toString()); 


           if(app1<app2) 
           { 
            return true; 
           } 
           return false; 
          } 
         });   

       test=rdd4.map(new Function<Tuple2<Row,Row>, String>() { 
        @Override 
        public String call(Tuple2<Row, Row> s) 
          throws Exception { 

         Row data1=s._1; 
         Row data2=s._2; 

         int x =Integer.parseInt(data1.get(0).toString()); 
         int y =Integer.parseInt(data2.get(0).toString()); 

         String result =x +","+ y+","+(x+y); 
         return result; 
        } 
       });   

       R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split); 

       R.start(); 
       split++;    
       index++;     
      } 
     } 
    } 

} 

В этот случай я столкнулся следующее исключение

enter image description here

Я попробовал решение, предложенное в следующей ссылке

How to run concurrent jobs(actions) in Apache Spark using single spark context

Но все-таки, я не могу решить эту проблему.

Не могли бы вы направить меня, чтобы решить эту проблему?

ответ

0

Не могли бы вы попробовать использовать

SparkConf sparkConf=new SparkConf() 
.setAppName("SparkJavaTest") 
.set("spark.driver.allowMultipleContexts", "true"); 

Я также новый Спарк программирования, но это помогло мне при выполнении нескольких заданий Spark, в планировщике. По моему мнению, нам не нужно думать о потоках или логике M/R/плане выполнения, поскольку Spark делает это за вас.

+0

Hi Chetandalal, уже я пробовал с этим свойством. Не работает для меня – Raja

4

Прежде всего, вы пытаетесь выполнить всю работу над узлом драйвера несколькими потоками. Это не в духе Spark, так как каждая единица работы в вашем случае не зависит от других и может выполняться на разных машинах. У вас есть пример с игрушкой, но это станет действительно важным с большими объемами данных.

Лучшим подходом было бы использовать что-то вроде mapPartitions, чтобы отправить диапазон ключей каждому работнику и позволить им выполнять соответствующие SQL-запросы, а затем сохранить результаты с одним потоком на одного работника. Это сделает код более понятным и понятным (как только вы привыкнете к работе RDD). Вам, очевидно, необходимо будет установить level of parallelism и количество разделов (о here) для ваших входных данных.

Непосредственная проблема с кодом заключается в том, что основной поток запускает другие потоки, но не ждет их завершения. Обычно это приводит к тому, что порожденные потоки заканчиваются вместе с родителем (см. javadoc). Обратите внимание, как в answer связанному вопросу основная функция делает get() на порожденных фьючерсах перед возвратом.

Смежные вопросы