2015-09-26 2 views
0

я хочу, чтобы инициализировать матрицу, используя данные в flatMap, это мои данные:Как назначить значение в бриджную матрицу в flatMap Scala-Spark?

-4,0,1.0 ### horrible . not-work install dozen scanner umax ofcourse . tech-support everytime call . fresh install work error . crummy product crummy tech-support crummy experience . 
2,1,1.0 ### scanner run . grant product run windows . live fact driver windows lose performance . setup program alert support promptly quits . amazon . website product package requirement listing compatible windows . 
1,2,1.0 ### conversion kit spare battery total better stick versionand radio blow nimh charger battery . combination operation size nimh battery . motorola kit . rechargable battery available flashlight camera game toy . 
-4,3,1.0 ### recieive part autowinder catch keep place sudden break . hold listen music winder wind . extremely frustrated fix pull little hard snap half . flush drain . 

и это мой код:

val spark_context = new SparkContext(conf) 
val data = spark_context.textFile(Input) 
val Gama=DenseMatrix.zeros[Double](4,2) 
var gmmainit = data.flatMap(line => { 
    val tuple = line.split("###") 
    val ss = tuple(0) 
    val re = """^(-?\d+)\s*,\s*(\d+)\s*,\s*(\d+).*$""".r 
    val re(n1, n2, n3) = ss // pattern match and extract values 

    if (n1.toInt >= 0) { 
    Gama(n2.toInt, 0) += 1 
    } 
    if (n1.toInt < 0) { 
    Gama(n2.toInt, 1) += 1 
    } 
}) 

println(Gama) 

, но это не меняет матрицу Гама,

Как я могу изменить свой код, чтобы решить эту проблему?

ответ

0

Прежде всего, ваш код даже не будет скомпилирован. Если вы посмотрите на flatMap подписи:

flatMap[U](f: T => TraversableOnce[U]) 

вы увидите, что карты от T к TraversableOnce[U]. С update способ DenseMatrix возвращение Unit функция вы используете тип String => Unit и Unit не TraversableOnce.

Кроме того, как уже объяснялось Justin, каждый раздел получает свою локальную копию переменных, на которые ссылается замыкание, и только эта копия изменяется.

Один из способов вы можете решить эту проблему, что-то вроде этого:

val gmmainit = data.mapPartitions(iter => { 
    val re = """^(-?\d+)\s*,\s*(\d+)\s*,\s*(\d+).*$""".r 
    val gama = DenseMatrix.zeros[Double](4,2) 
    iter.foreach{ 
    case re(n1, n2, n3) => gama(n2.toInt, if(n1.toInt >= 0) 0 else 1) += 1 
    case _ => 
    } 
    Iterator(gama) 
}).reduce(_ + _) 
1

Вы не можете изменять переменные в распределенных функциях. Ну, вы можете, но переменная изменяется только в THAT-процессе. Помните, что искра распределена. Итак, вам нужно вернуть значение, которое может быть сплющено (я не знаю DenseMatrix достаточно хорошо, чтобы сказать точную потребность здесь). Возможно, вы сможете создать пользовательский накопитель, если это может быть ассоциативным и коммутативным.

Смежные вопросы