2016-03-23 5 views
0

Я пытаюсь сохранить данные csv в хэш-карте. Кажется, он читает csv-файл и хорошо сохраняется в RDD, но NOT map. Я пробовал hashmap, карту с помощью put или + = метод, но ничего не работает. Любая идея об этом?Scala map не хранит данные

val logFile3 = "d:/data/data.csv" 

val rawdf3 = sqlContext.read 
    .format("com.databricks.spark.csv") 
    .option("header", "true") // Use first line of all files as header 
    .option("inferSchema", "true") // Automatically infer data types 
    .load(logFile3) 

var activityName = scala.collection.mutable.Map[String, String]() 

//save key-value to RDD to check 
val activityNameRDD = rawdf3.map { row => 
    activityName += (row.getAs("key").toString -> row.getAs("value").toString) // I think It's work but not 
    println(row.getAs("key").toString + " - " + row.getAs("value").toString) // print all data well 
    (row.getAs("key").toString, row.getAs("value").toString) 
} 
activityNameRDD.saveAsTextFile("d:/outdata/activityName") // all csv data saved well 

activityName.foreach({row => println(row._1 + " = " + row._2)}) // print nothing 

println(activityName.getOrElse("KEY1", "NON")) // print "NON" 
println(activityName.getOrElse("KEY2", "NON")) // print "NON" 

ответ

1

Вы используете Spark? Переменные с суффиксом «Rdd» подразумевают это.

Если да, то читайте тщательно "Shared Variables" раздел документации Спарк по:

Обычно, когда функция перешла к операции Спарк (например, карты или уменьшения) выполняется на удаленном узле кластера, он работает на отдельные копии всех переменных, используемых в этой функции. Эти переменные копируются на каждую машину, и никакие обновления переменных на удаленной машине не распространяются обратно на программу драйвера. Поддержка общих переменных чтения и записи для разных задач будет неэффективной.

При попытке изменить общую переменную от map каждый рабочий изменяет свою собственную версию и в конце теряются обновления. Если вам действительно нужно совместное изменчивое состояние, вместо этого используйте вместо этого Accumulator.

0

Вместо использования var, mutable.Map и мутируют вещи, как побочные эффекты (три больших без номеров в Scala), почему бы не просто делать вещи непосредственно? Будет ясно, что происходит, и также следует исправить вашу проблему:

val activityName:Map[String, String] = rawdf3.map { row => 
    row.getAs("key").toString -> row.getAs("value").toString 
}.toMap