0

Я пытаюсь сохранить некоторые анализы в toDF TempTable, но получить следующую ошибку ": 215: error: value toDF не является членом Double". Я читаю данные таблицы Кассандры, и я делаю некоторые вычисления. Я хочу сохранить эти результаты в таблице temp. Я новичок в scala, кто-то, может мне помочь? моего кодScala: сохранить результат в таблице temp toDf

case class Consumo(consumo:Double, consumo_mensal: Double, mes: org.joda.time.DateTime,ano: org.joda.time.DateTime, soma_pf: Double,empo_gasto: Double); 

object Analysegridata{ 

val conf = new SparkConf(true) 

.set("spark.cassandra.connection.host","127.0.0.1").setAppName("LiniarRegression") 
.set("spark.cassandra.connection.port", "9042") 
.set("spark.driver.allowMultipleContexts", "true") 
.set("spark.streaming.receiver.writeAheadLog.enable", "true"); 
val sc = new SparkContext(conf); 

val ssc = new StreamingContext(sc, Seconds(1)) 

val sqlContext = new org.apache.spark.sql.SQLContext(sc); 
val checkpointDirectory = "/var/lib/cassandra/data" 
ssc.checkpoint(checkpointDirectory) // set checkpoint directory 

// val context = StreamingContext.getOrCreate(checkpointDirectory)  
import sqlContext.implicits._ 
JavaSparkContext.fromSparkContext(sc); 

def rddconsumo(rddData: Double): Double = { 

val rddData: Double = { 
    implicit val data = conf 
    val grid = sc.cassandraTable("smartgrids", "analyzer").as((r:Double) => (r)).collect 

def goto(cs: Array[Double]): Double = { 

    var consumo = 0.0; 
    var totaldias = 0; 
    var soma_pf = 0.0; 
    var somamc = 0.0; 
    var tempo_gasto = 0.0; 
    var consumo_mensal = 0.0; 
    var i=0 
for (i <- 0 until grid.length) {  
    val minutos = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol", "MINUTE"); 
    val horas = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol","HOUR_OF_DAY"); 
    val dia = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol", "DAY_OF_MONTH"); 
    val ano = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol", "YEAR"); 
    val mes = sc.cassandraTable("smartgrids","analyzer_temp").select("timecol", "MONTH"); 
    val potencia = sc.cassandraTable("smartgrids","analyzer_temp").select("n_pf1", "n_pf2", "n_pf3") 

def convert_minutos (minuto : Int) : Double ={ 
    minuto/60 
} 
    dia.foreach (i => { 

    def adSum(potencia: Array[Double]) = { 

    var i=0; 
     while (i < potencia.length) { 
     soma_pf += potencia(i); 
     i += 1; 
     soma_pf; 
     println("Potemcia =" + soma_pf) 
    } 
} 
    def tempo(minutos: Array[Int]) = { 
     var i=0; 
     while (i < minutos.length) { 
     somamc += convert_minutos(minutos(i)) 
     i += 1; 
     somamc 
    } 
    } 

    def tempogasto(horas: Array[Int]) = { 
     var i=0; 
     while (i < horas.length) { 
     tempo_gasto = horas(i) + somamc; 
     i += 1; 
     tempo_gasto; 
     println("Temo que o aparelho esteve ligado =" + tempo_gasto) 
    } 
} 

def consumof(dia: Array[Int]) = { 
    var i=0; 
    while (i < dia.length) { 
     consumo = soma_pf * tempo_gasto; 
     i += 1; 
     consumo; 
     println("Consumo diario =" + consumo)  
    } 
    } 
}) 

mes.foreach (i => { 

def totaltempo(dia: Array[Int]) = { 
    var i = 0; 
    while(i < dia.length){ 
     totaldias += dia(i); 
     i += 1; 
     totaldias; 
     println("Numero total de dias =" + totaldias) 
    } 
} 
def consumomensal(mes: Array[Int]) = { 
    var i = 0; 
    while(i < mes.length){ 
     consumo_mensal = consumo * totaldias; 
     i += 1; 
    consumo_mensal; 
    println("Consumo Mensal =" + consumo_mensal); 
    } 
} 
}) 

} 
    consumo; 
    totaldias; 
    consumo_mensal; 
    soma_pf; 
    tempo_gasto; 
    somamc 

} 

rddData 

    } 
    rddData.toDF().registerTempTable("rddData") 
} 
     ssc.start() 
     ssc.awaitTermination() 




error: value toDF is not a member of Double" 
+0

Пожалуйста, разместите полный стек ошибок! Это полезно –

+0

: 260: error: значение toDF не является членом Double rddData.toDF(). RegisterTempTable ("rddData") –

ответ

0

Это довольно непонятно, что вы пытаетесь сделать точно (слишком много коды, попробуйте обеспечивая минимальный примера), но есть несколько очевидных вопросов:

  1. rddData имеет тип Double: Кажется, он должен быть RDD[Double] (который является распределенной коллекцией двойных значений). Попытка сохранить одно значение Double в качестве таблицы не имеет смысла, и действительно - не работает (toDF можно вызывать на RDD, а не на любом типе, а не на Double, как предупреждает компилятор).
  2. вы collect() данные: если вы хотите загрузить РД, преобразовать его с помощью некоторых манипуляций, а затем сохранить его в виде таблицы - collect() не должен, вероятно, можно назвать по этим RDD. collect() отправляет все данные (распределенные по кластеру) на один «драйвер» (тот, который работает с этим кодом), после чего вы не пользуетесь кластером и снова не используете структуру данных RDD, чтобы вы могли " t преобразует данные в DataFrame с использованием toDF.
Смежные вопросы