2015-03-31 2 views
12

Предположим, у меня есть DataFrame (который я читал из csv на HDFS), и я хочу обучать некоторые алгоритмы на нем через MLlib. Как преобразовать строки в LabeledPoints или иным образом использовать MLlib в этом наборе данных?Использование DataFrame с MLlib

+1

Вы не упомянули тип данных столбцов, но если они являются числовыми (целым числом, дважды, и т.д.), вы можете использовать [VectorAssembler] (HTTP: //spark.apache .org/docs/latest/ml-features.html # vectorassembler), чтобы преобразовать столбцы функций в один столбец из [Вектор] (http://spark.apache.org/docs/latest/mllib-data-types.html) , – Ben

ответ

5

Предполагая, что вы используете Scala:

Допустим, ваш получить DataFrame следующим образом:

val results : DataFrame = sqlContext.sql(...) 

Шаг 1: вызов results.printSchema() - это покажет вам не только столбцы в DataFrame и (это важно) их порядок, но и то, что Spark SQL считает их типами. Как только вы видите этот выход, все становится намного менее загадочным.

Шаг 2: Получить RDD[Row] из DataFrame:

val rows: RDD[Row] = results.rdd 

Шаг 3: Теперь это просто вопрос потянув любые поля интерес к вам из отдельных строк. Для этого вам нужно знать положение на основе 0 каждого поля и его тип, и, к счастью, вы получили все, что было на шаге 1 выше. Например, скажем, вы сделали SELECT x, y, z, w FROM ... и печать схемы дали

root 
|-- x double (nullable = ...) 
|-- y string (nullable = ...) 
|-- z integer (nullable = ...) 
|-- w binary (nullable = ...) 

И давайте говорить все, что вы хотите использовать x и z. Вы можете вытащить их в RDD[(Double, Integer)] следующим образом:

rows.map(row => { 
    // x has position 0 and type double 
    // z has position 2 and type integer 
    (row.getDouble(0), row.getInt(2)) 
}) 

Здесь вы просто использовать Core, Спарк, чтобы создать соответствующие объекты MLlib. Все может немного усложниться, если ваш SQL возвращает столбцы типа массива, и в этом случае вам придется вызвать getList(...) для этого столбца.

2

Предполагая, что вы используете JAVA (Spark, версия 1.6.2): ​​

Вот простой пример кода Java с помощью DataFrame для машинного обучения.

  • Он загружает JSON со следующей структурой,

    [{ "метка": 1, "ATT2": +5,037089672359123, "att1": 2,4100883023159456}, ...]

  • разбивает данные в обучение и тестирование,

  • поезд модель с использованием данных поездов,
  • применить модель для данных испытаний и
  • Stor результаты.

Кроме того, согласно official documentation, «API-интерфейс на основе DataFrame является основным API» для MLlib с текущей версии 2.0.0. Таким образом, вы можете найти несколько примеров, используя DataFrame.

Код:

SparkConf conf = new SparkConf().setAppName("MyApp").setMaster("local[2]"); 
SparkContext sc = new SparkContext(conf); 
String path = "F:\\SparkApp\\test.json"; 
String outputPath = "F:\\SparkApp\\justTest"; 

System.setProperty("hadoop.home.dir", "C:\\winutils\\"); 

SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); 

DataFrame df = sqlContext.read().json(path); 
df.registerTempTable("tmp"); 
DataFrame newDF = df.sqlContext().sql("SELECT att1, att2, label FROM tmp"); 
DataFrame dataFixed = newDF.withColumn("label", newDF.col("label").cast("Double")); 

VectorAssembler assembler = new VectorAssembler().setInputCols(new String[]{"att1", "att2"}).setOutputCol("features"); 
StringIndexer indexer = new StringIndexer().setInputCol("label").setOutputCol("labelIndexed"); 

// Split the data into training and test 
DataFrame[] splits = dataFixed.randomSplit(new double[] {0.7, 0.3}); 
DataFrame trainingData = splits[0]; 
DataFrame testData = splits[1]; 

DecisionTreeClassifier dt = new DecisionTreeClassifier().setLabelCol("labelIndexed").setFeaturesCol("features"); 
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] {assembler, indexer, dt}); 
// Train model 
PipelineModel model = pipeline.fit(trainingData); 

// Make predictions 
DataFrame predictions = model.transform(testData); 
predictions.rdd().coalesce(1,true,null).saveAsTextFile("justPlay.txt" +System.currentTimeMillis()); 
Смежные вопросы