2015-07-31 2 views
2

в моем проекте, я переношу данные из MongoDB в таблицу SparkSQL для запросов на основе SQL. Но Spark SQL позволяет мне создавать временные файлы. Когда я хочу что-то запросить, время выполнения очень велико, потому что операция передачи и отображения данных занимает слишком много времени.Как создать постоянный стол в spark sql

Итак, можно ли сократить время выполнения? Могу ли я создавать постоянные таблицы Spark SQL? Можно ли запрашивать постоянные таблицы с JDBC?

Я добавляю свой код и результаты выполнения. Я делаю все в автономном режиме.

package com.mongodb.spark.sql; 

import java.util.List; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.sql.DataFrame; 
import org.apache.spark.sql.Row; 
import org.apache.spark.sql.SQLContext; 
import org.bson.BSONObject; 

import com.mongodb.hadoop.MongoInputFormat; 
import com.mongodb.spark.demo.Observation; 
import com.mongodb.spark.demo.Sensor; 

import scala.Tuple2; 

public class SparkSqlMongo { 

public static void main(String[] args) { 

    Configuration conf = new Configuration(); 

    conf.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat"); 
    conf.set("mongo.input.uri", "mongodb://localhost:27017/test.observations"); 

    Configuration sensConf = new Configuration(); 

    sensConf.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat"); 
    sensConf.set("mongo.input.uri", "mongodb://localhost:27017/test.sens"); 

    SparkConf sconf = new SparkConf().setMaster("local[2]").setAppName("SQL DENEME").set("nsmc.connection.host", 
      "mongodb:"); 

    JavaSparkContext sc = new JavaSparkContext(sconf); 
    SQLContext sql = new SQLContext(sc); 

    JavaRDD<Observation> obs = sc.newAPIHadoopRDD(conf, MongoInputFormat.class, Object.class, BSONObject.class) 
      .map(new Function<Tuple2<Object, BSONObject>, Observation>() { 

       private static final long serialVersionUID = 1L; 

       @Override 
       public Observation call(Tuple2<Object, BSONObject> v1) throws Exception { 

        int id = (int) v1._2.get("_id"); 
        double value = (double) v1._2.get("Value"); 
        // Date time = (Date) v1._2.get("Time"); 
        int sensor = (int) v1._2.get("SensorId"); 
        int stream = (int) v1._2.get("DataStreamId"); 

        Observation obs = new Observation(id, value, sensor, stream); 
        return obs; 

       } 
      }); 

    DataFrame obsi = sql.createDataFrame(obs, Observation.class); 

    obsi.registerTempTable("obsi"); 

    JavaRDD<Sensor> sens = sc.newAPIHadoopRDD(sensConf, MongoInputFormat.class, Object.class, BSONObject.class) 
      .map(new Function<Tuple2<Object, BSONObject>, Sensor>() { 

       /** 
       * 
       */ 
       private static final long serialVersionUID = 1L; 

       @Override 
       public Sensor call(Tuple2<Object, BSONObject> v1) throws Exception { 

        int id = (int) v1._2.get("_id"); 
        String name = (String) v1._2.get("Name"); 
        String description = (String) v1._2.get("Description"); 

        Sensor s = new Sensor(id, name, description); 

        System.out.println(s.getName()); 
        return s; 

       } 
      }); 

    DataFrame sensi = sql.createDataFrame(sens, Sensor.class); 

    sensi.registerTempTable("sensi"); 

    sensi.show(); 

    long start = System.currentTimeMillis(); 

    DataFrame obser = sql 
      .sql("SELECT obsi.value, obsi.id, sensi.name FROM obsi, sensi WHERE obsi.sensorID = sensi.id and sensi.id = 107") 
      .cache(); 
    long stop = System.currentTimeMillis(); 

    // System.out.println("count ====>>> " + a.toString()); 
    System.out.println("toplam sorgu zamani : " + (stop - start)); 
    ; 
    // 
    // while(!obser.equals(null)){ 
    // System.out.println(obser); 
    // } 

    List<String> names = obser.javaRDD().map(new Function<Row, String>() { 

     private static final long serialVersionUID = 1L; 

     public String call(Row row) { 

      // System.out.println(row); 
      // System.out.println("value : " + row.getDouble(0) + " id : " + 
      // row.getInt(1) + " name : " + row.getString(0)); 
      return "Name: " + row; 
     } 
    }).collect(); 

} 

}

Все время выполнения составляет около 120 секунд в течение около 5M наблюдения и 1К SNS данных. Я присоединяюсь к этим таблицам, и это время исполнения очень велико и неприемлемо.

ответ

3
  1. Да, вы можете улучшить время выполнения программы на Caching your Table, Dataframe или Rdd.
  2. И, если вы хотите сохранить данные как постоянную таблицу, вы можете использовать метод df.saveAsTable, но dataframe должен быть создан через HiveContext.
  3. Для подключения JDBC необходимо запустить Thrift service, после чего вы можете выполнить команду Spark Sql на таблицах регистров.
+0

спасибо за ответы. Я исследовал Hive, и производительность запросов не удовлетворила меня. Итак, я ищу более эффективные способы. Наконец, я использовал паркетные файлы для хранения данных. Когда я запрашиваю, я извлекаю данные из запроса паркета и таким образом повышает производительность запросов примерно в 4-5 раз. – trallallalloo

1

Spark SQL не манипулирует данными базы данных, которые происходят внутри него, только до тех пор, пока доступен контекст искры, который их создал. Существует несколько реализаций сервера искровых заданий, которые позволят вам удерживать результат одного задания и отправлять другие задания против одного и того же набора данных. Он будет по-прежнему временным и должен быть перезагружен, если сервер (т.е. контекст искры) отключается

Это говорит о том, что вы можете сохранить результат вычисления и получить его позже (либо обратно в Монго, либо в файлы на Hadoop/other файловая система)

+0

благодарит за ответ. Я понимаю, что жизненный цикл таблицы Spark SQL ограничен жизненным циклом Spark Context. Итак, позвольте мне задать еще один вопрос: могу ли я сократить время выполнения запроса? Я работаю в автономном режиме, и позже мы будем использовать кластеры. Когда мы используем кластеры, время выполнения запроса будет уменьшаться до приемлемого времени выполнения? – trallallalloo