Я использовал входные данные с ниже формате:Параллельная работа Исполнение в Спарк
0
1
2
3
4
5
…
14
Input Location: hdfs://localhost:9000/Input/datasource
Я использовал следующий фрагмент кода, чтобы сохранить RDD в виде текстового файла с использованием нескольких потоков:
package org.apache.spark.examples;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.avro.ipc.specific.Person;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
class RunnableDemo implements Runnable
{
private Thread t;
private String threadName;
private String path;
private JavaRDD<String> javaRDD;
// private JavaSparkContext javaSparkContext;
RunnableDemo(String threadName,JavaRDD<String> javaRDD,String path)
{
this.threadName=threadName;
this.javaRDD=javaRDD;
this.path=path;
// this.javaSparkContext=javaSparkContext;
}
@Override
public void run() {
System.out.println("Running " + threadName);
try {
this.javaRDD.saveAsTextFile(path);
// System.out.println(this.javaRDD.count());
Thread.sleep(50);
} catch (InterruptedException e) {
System.out.println("Thread " + threadName + " interrupted.");
}
System.out.println("Thread " + threadName + " exiting.");
// this.javaSparkContext.stop();
}
public void start()
{
System.out.println("Starting " + threadName);
if (t == null)
{
t = new Thread (this, threadName);
t.start();
}
}
}
public class SparkJavaTest {
public static void main(String[] args) {
//Spark Configurations:
SparkConf sparkConf=new SparkConf().setAppName("SparkJavaTest");
JavaSparkContext ctx=new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(ctx);
JavaRDD<String> dataCollection=ctx.textFile("hdfs://yarncluster/Input/datasource");
List<StructField> fields= new ArrayList<StructField>();
fields.add(DataTypes.createStructField("Id", DataTypes.IntegerType,true));
JavaRDD<Row> rowRDD =dataCollection.map(
new Function<String, Row>() {
@Override
public Row call(String record) throws Exception {
String[] fields = record.split("\u0001");
return RowFactory.create(Integer.parseInt(fields[0].trim()));
}
});
StructType schema = DataTypes.createStructType(fields);
DataFrame dataFrame =sqlContext.createDataFrame(rowRDD, schema);
dataFrame.registerTempTable("data");
long recordsCount=dataFrame.count();
long splitRecordsCount=5;
long splitCount =recordsCount/splitRecordsCount;
List<JavaRDD<Row>> list1=new ArrayList<JavaRDD<Row>>();
for(int i=0;i<splitCount;i++)
{
long start = i*splitRecordsCount;
long end = (i+1)*splitRecordsCount;
DataFrame temp=sqlContext.sql("SELECT * FROM data WHERE Id >="+ start +" AND Id < " + end);
list1.add(temp.toJavaRDD());
}
long length =list1.size();
int split=0;
for (int i = 0; i < length; i++) {
JavaRDD rdd1 =list1.get(i);
JavaPairRDD rdd3=rdd1.cartesian(rdd1);
JavaPairRDD<Row,Row> rdd4=rdd3.filter(
new Function<Tuple2<Row,Row>,Boolean>()
{
public Boolean call(Tuple2<Row,Row> s)
{
Row line1=s._1;
Row line2=s._2;
long app1 = Integer.parseInt(line1.get(0).toString());
long app2 = Integer.parseInt(line2.get(0).toString());
if(app1<app2)
{
return true;
}
return false;
}
});
JavaRDD<String> test=rdd4.map(new Function<Tuple2<Row,Row>, String>() {
@Override
public String call(Tuple2<Row, Row> s)
throws Exception {
Row data1=s._1;
Row data2=s._2;
int x =Integer.parseInt(data1.get(0).toString());
int y =Integer.parseInt(data2.get(0).toString());
String result =x +","+ y+","+(x+y);
return result;
}
});
RunnableDemo R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split);
R.start();
split++;
R.start();
int index =i;
while(index<length)
{
JavaRDD rdd2 =list1.get(index);
rdd3=rdd1.cartesian(rdd2);
rdd4=rdd3.filter(
new Function<Tuple2<Row,Row>,Boolean>()
{
public Boolean call(Tuple2<Row,Row> s)
{
Row line1=s._1;
Row line2=s._2;
long app1 = Integer.parseInt(line1.get(0).toString());
long app2 = Integer.parseInt(line2.get(0).toString());
if(app1<app2)
{
return true;
}
return false;
}
});
test=rdd4.map(new Function<Tuple2<Row,Row>, String>() {
@Override
public String call(Tuple2<Row, Row> s)
throws Exception {
Row data1=s._1;
Row data2=s._2;
int x =Integer.parseInt(data1.get(0).toString());
int y =Integer.parseInt(data2.get(0).toString());
String result =x +","+ y+","+(x+y);
return result;
}
});
R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split);
R.start();
split++;
index++;
}
}
}
}
В этот случай я столкнулся следующее исключение
Я попробовал решение, предложенное в следующей ссылке
How to run concurrent jobs(actions) in Apache Spark using single spark context
Но все-таки, я не могу решить эту проблему.
Не могли бы вы направить меня, чтобы решить эту проблему?
Hi Chetandalal, уже я пробовал с этим свойством. Не работает для меня – Raja