2017-02-22 11 views
0

Я пытаюсь создать трубопровод, который берет мою DataFrame информацию о задержке полета и запускает на ней случайный лес. Я довольно новичок в MLLib и не могу понять, где я ошибаюсь в своем коде ниже.PySpark Training Случайный лесной трубопровод

Мой DataFrame считывается из паркетной файла с этим форматом:

Table before Encoding 
+-----+-----+---+---+----+--------+-------+------+----+-----+-------+ 
|Delay|Month|Day|Dow|Hour|Distance|Carrier|Origin|Dest|HDays|Delayed| 
+-----+-----+---+---+----+--------+-------+------+----+-----+-------+ 
| -8| 8| 4| 2| 11|  224|  OO| GEG| SEA| 31|  0| 
| -12| 8| 5| 3| 11|  224|  OO| GEG| SEA| 32|  0| 
| -9| 8| 6| 4| 11|  224|  OO| GEG| SEA| 32|  0| 
+-----+-----+---+---+----+--------+-------+------+----+-----+-------+ 
only showing top 3 rows 

Я тогда перейти к OneHotEncode категориальные колонны, и сочетают в себе все функции в Features колонке (Delayed это то, что я пытаюсь предсказывать). Вот код, который:

import os 
from pyspark.sql import SparkSession 
from pyspark.ml import Pipeline 
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler 
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier 

spark = SparkSession.builder \ 
    .master('local[3]') \ 
    .appName('Flight Delay') \ 
    .getOrCreate() 

# read in the pre-processed DataFrame from the parquet file 
base_dir = '/home/william/Projects/flight-delay/data/parquet' 
flights_df = spark.read.parquet(os.path.join(base_dir, 'flights.parquet')) 

print('Table before Encoding') 
flights_df.show(3) 

# categorical columns that will be OneHotEncoded 
cat_cols = ['Month', 'Day', 'Dow', 'Hour', 'Carrier', 'Dest'] 

# numeric columns that will be a part of features used for prediction 
non_cat_cols = ['Delay', 'Distance', 'HDays'] 

# NOTE: StringIndexer does not have multiple col support yet (PR #9183) 
# Create StringIndexer for each categorical feature 
cat_indexers = [ StringIndexer(inputCol=col, outputCol=col+'_Index') 
       for col in cat_cols ] 

# OneHotEncode each categorical feature after being StringIndexed 
encoders = [ OneHotEncoder(dropLast=False, inputCol=indexer.getOutputCol(), 
          outputCol=indexer.getOutputCol()+'_Encoded') 
      for indexer in cat_indexers ] 

# Assemble all feature columns (numeric + categorical) into `features` col 
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() 
             for encoder in encoders] + non_cat_cols, 
          outputCol='Features') 

# Train a random forest model 
rf = RandomForestClassifier(labelCol='Delayed',featuresCol='Features', numTrees=10) 

# Chain indexers, encoders, and forest into one pipeline 
pipeline = Pipeline(stages=[ *cat_indexers, *encoders, assembler, rf ]) 

# split the data into training and testing splits (70/30 rn) 
(trainingData, testData) = flights_df.randomSplit([0.7, 0.3]) 

# Train the model -- which also runs indexers and coders 
model = pipeline.fit(trainingData) 

# use model to make predictions 
precitions = model.trainsform(testData) 

predictions.show(10) 

Когда я запускаю это я получаю Py4JJavaError: An error occurred while calling o46.fit. : java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Double

Я очень признателен за любую помощь!

ответ

1

Как пояснил in the comments, этикетка должна быть double, так что вы должны бросить:

flights_df = spark.read.parquet(os.path.join(base_dir, 'flights.parquet')) \ 
    .withColumn("Delayed", col("Delayed").cast("double")) 
+0

Действительно (хотя и не может найти комментарии, которые Вы имеете в виду). Есть еще несколько аналогичных досадных и недокументированных функций Spark ML/MLlib - см. Здесь: https://www.nodalpoint.com/spark-classification/ – desertnaut