1

Я написал следующий код для логистической регрессии, я хочу использовать API конвейера, предоставляемый spark.ml. Однако после того, как я попытаюсь распечатать коэффициенты и перехваты, это дало мне ошибку. Также у меня возникают проблемы с вычислением матрицы замешательства и других показателей, таких как точность, отзыв.Логистическая регрессия с искрой мл (данные)

#Logistic Regression: 
from pyspark.mllib.linalg import Vectors 
from pyspark.ml.classification import LogisticRegression 
from pyspark.sql import SQLContext 
from pyspark import SparkContext 
from pyspark.sql.types import * 
from pyspark.sql.functions import * 
from pyspark.ml.feature import StringIndexer,VectorAssembler 
from pyspark.ml import Pipeline 
from pyspark.ml.evaluation import MulticlassClassificationEvaluator 


sc = SparkContext("local", "predictive") 
sqlContext=SQLContext(sc) 

df = sqlContext.read.load('/user/bna_ads_final.csv', 
         format='com.databricks.spark.csv', 
         header='true', 
         inferSchema='true') 

df.show(5) 
df.count() 
df.dtypes 
df=df.withColumn("load_date",df.load_date.cast("timestamp")) 
df_withday= df.withColumn("day",dayofmonth(df.load_date)) 
df_new=df_withday.withColumn("Month",month(df.load_date)) 
df_new=df_new.withColumn("classname",df_new.classname.cast("string")) 
ignore = ["load_date","wo_flag","serialnumber", "classname"] 

def modify_values(r): 
if r == "A" or r =="B": 
    return "dispatch" 
else: 
    return "non-dispatch" 

def show_metrics(metrics): 
# Overall statistics 
precision = metrics.precision() 
recall = metrics.recall() 
f1Score = metrics.fMeasure() 
print("Summary Stats") 
print("Precision = %s" % precision) 
print("Recall = %s" % recall) 
print("F1 Score = %s" % f1Score) 
print (metrics.confusionMatrix()) 

ol_val = udf(modify_values, StringType()) 
df_final = df_new.withColumn("wo_flag",ol_val(df_new.wo_flag)) 
indexer= StringIndexer(inputCol="classname", outputCol="classnamecat") 
indexed = indexer.fit(df_final).transform(df_final) 
indexed=indexed.withColumn("classnamecat",indexed.classnamecat.cast("int")) 
indexed.show(5) 
(trainingData, testData) = indexed.randomSplit([0.7, 0.3]) 
assembler = VectorAssembler(inputCols=[x for x in indexed.columns if x not in ignore],outputCol='features') 
stringindexer=StringIndexer(inputCol="wo_flag", outputCol="labellr") 
Classifier= LogisticRegression(labelCol="labellr", featuresCol="features") 
pipeline=Pipeline(stages=[stringindexer,assembler,Classifier]) 
model = pipeline.fit(trainingData) 
predictions = model.transform(testData) 

selected = predictions.select("features", "labellr", "probability", "prediction") 
for row in selected.collect(): 
print row 


evaluator = MulticlassClassificationEvaluator(
labelCol="labellr", predictionCol="prediction", metricName="precision") 
accuracy = evaluator.evaluate(predictions) 
print("Test Error = %g" % (1.0 - accuracy)) 
print("Accuracy= %g" % (accuracy)) 

print("Coefficients: " + str(model.coefficients)) 
print("Intercept: " + str(model.intercept)) 

ошибка, что я получаю:

print("Coefficients: " + str(model.coefficients)) 
AttributeError: 'PipelineModel' object has no attribute 'coefficients' 

У меня есть Спарк 1,5 установлен на кластере Hadoop, я не буду в состоянии модернизировать в ближайшее время. Есть ли проблема в решении этой проблемы.

load_date   | r   | classname| mstatus34_timdiff| day|Month| classnamecat| serialnumber 
+-----------+------------------+----------+--------------------+------------+--- +-----------+---- 
2013-12-29 10:55:...|non-dispatch|  6634|    19| 1| 7|   0.0| 231234  
2014-10-05 23:43:...|non-dispatch|  6634|    4| 5| 10|   0.0| 342345 
2014-10-09 09:39:...| dispatch|  5886|    36| 9| 10|   1.0| 563472 
2014-09-16 09:47:...| dispatch|  6634|    53| 16| 9|   0.0| 134657 

ответ

3

Вы можете получить доступ к отдельным этапам, используя stages атрибут PipelineModel

from pyspark.ml import Pipeline 
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel 
from pyspark.ml.feature import VectorAssembler 

df = sc.parallelize([ 
    (0.0, 1.0, 2.0, 4.0), 
    (1.0, 3.0, 4.0, 5.0) 
]).toDF(["label", "x1", "x2", "x3"]) 

assembler = (VectorAssembler() 
    .setInputCols(df.columns[1:]) 
    .setOutputCol("features")) 

lr = LogisticRegression(maxIter=10, regParam=0.01) 

pipeline = Pipeline(stages=[assembler, lr]) 
model = pipeline.fit(data) 

[stage.coefficients for stage in model.stages if hasattr(stage, "coefficients")] 
## [DenseVector([2.1178, 1.6843, -1.8338])] 

## or 

[stage.coefficients for stage in model.stages 
    if isinstance(stage, LogisticRegressionModel)] 
## [DenseVector([2.1178, 1.6843, -1.8338])] 
+0

Спасибо за ответ, однако это не печатают коэффициенты, он просто дает мне пустые скобки: [] –

+0

Должно работать нормально, пока в конвейере есть правильная модель. См. [Mcve]. – zero323

+0

Здравствуйте @ zero323 Я прикрепил, как выглядят преобразованные данные, модель отлично работает при использовании RDD и использует лямбда-функцию с помеченными точками для создания функций и меток. Но он не работает на dataframes, Моя функция вычисления показателей, а также коэффициент печати не работают –

3

Попробуйте

pipeline=Pipeline(stages=[assembler, lr]) 
model = pipeline.fit(trainingData) 
lrm = model.stages[-1] 

lrm.coefficients 
Смежные вопросы