2016-08-23 2 views
0

У меня есть этот dataframe, и я хотел бы объединить все массивы, в столбце данных, в один большой массив, отдельно от DataFrame.Объединяя весь столбец массивов в один массив

Scala и DataFrame API все еще довольно новое для меня, но я дал ему выстрелили:

case class Tile(data: Array[Int]) 

val ta = Tile(Array(1,2)) 
val tb = Tile(Array(3,4)) 
val tc = Tile(Array(5,6)) 

df = ListBuffer(ta,tb,tc).toDF() 

// Combine contents of DF into one array 
val result = new Array[Int](6) 
var offset = 0 
val combine = (t: WrappedArray[Int]) => { 
    Array.copy(t, 0, result, offset, t.length) 
    offset += t.length 
} 

df.foreach(r => combine(r(0).asInstanceOf[WrappedArray[Int]])) 

df.show() 
+------+ 
| data| 
+------+ 
|[1, 1]| 
|[2, 2]| 
|[3, 3]| 
+------+ 

Когда я запускаю это, я получаю следующее сообщение об ошибке:

16/08/23 11:21:32 ERROR executor.Executor: Exception in task 0.0 in stage 17.0 (TID 17) 
scala.MatchError: WrappedArray(1, 1) (of class scala.collection.mutable.WrappedArray$ofRef) 
at scala.runtime.ScalaRunTime$.array_apply(ScalaRunTime.scala:71) 
at scala.Array$.slowcopy(Array.scala:81) 
at scala.Array$.copy(Array.scala:107) 
at $line150.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:32) 
at $line150.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:31) 
at $line190.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:46) 
at $line190.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:46) 
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912) 
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
at org.apache.spark.scheduler.Task.run(Task.scala:89) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:74 

Может кто-нибудь точку меня в правильном направлении? Благодаря!

+1

Есть так много вещей, которые не так с этим кодом. ваше использование 'offset' внутри функции делает ее так, что она не будет' Serializable'. Также ваш 'комбинат' определен для принятия 2 параметров, но вы предоставляете его. –

+0

Вы правы, этого второго аргумента не должно быть. Я только что исправил это. Язык Scala и использование DataFrames для меня новы, поэтому я все еще разбираюсь в способе работы Spark/Scala. @SarveshKumarSingh – gofly

ответ

1

При работе с Spark вы не можете накапливать вещи, используя foreach, как обычно. Поскольку искра распределяет работу среди всех исполнителей, ваш function должен быть Serializable.

Если вы все еще хотите сделать что-то похожее на то, что вы обычно делаете, используйте Accumulator, который поддерживает распределенную модель искры.

val myRdd: RDD[List[Int]] = sc.parallelize(List(List(1,2), List(3,4), List(5,6)) 

val acc = sc.collectionAccumulator[Int]("MyAccumulator") 

myRdd.foreach(l => l.foreach(i => acc.add(i))) 

Или в вашем случае

case class Tile(data: Array[Int]) 

val myRdd: RDD[Tile] = sc.parallelize(List(
    Tile(Array(1,2)), 
    Tile(Array(3,4)), 
    Tile(Array(5,6)) 
)) 

val acc = sc.collectionAccumulator[Int]("MyAccumulator") 

myRdd.foreach(t => t.data.foreach(i => acc.add(i))) 
+0

Я очень ценю ваши отзывы! Вы сделали хороший вывод о функции, подлежащей сериализации. Я должен был знать лучше. Я забыл упомянуть, что я запускаю Spark 1.6. Я не мог найти ничего, что касается collectionAccumulator для этой версии. Тем не менее, я закончил преобразование столбца DataFrame в RDD, сбор и агрегирование массивов. Мне было интересно, есть ли более прямой маршрут, а не конвертация в RDD. Между тем, я буду продолжать играть с ним. – gofly

Смежные вопросы