2016-08-31 3 views
1

Как показано ниже,создания Искры карты занимает очень много времени

Шаг 1: Группа звонков с использованием GroupBy

//Now group the calls by the s_msisdn for call type 1 
//grouped: org.apache.spark.rdd.RDD[(String, Iterable[(String, (Array[String], String))])] 
val groupedCallsToProcess = callsToProcess.groupBy(_._1) 

Шаг 2: сгруппированные вызовы отображаются.

//create a Map of the second element in the RDD, which is the callObject 
//grouped: org.apache.spark.rdd.RDD[(String, Iterable[(String,(Array[String], String))])] 

val mapOfCalls = groupedCallsToProcess.map(f => f._2.toList) 

Шаг 3: Карта объекта Row, где карта будет иметь ключ-значение пары [CallsObject, MSISDN]

val listOfMappedCalls = mapOfCalls.map(f => f.map(_._2).map(c => 
    Row(
     c._1(CallCols.call_date_hour), 
     c._1(CallCols.sw_id), 
     c._1(CallCols.s_imsi), 
     f.map(_._1).take(1).mkString 
    ) 
)) 

3-й стадии, как показано выше, кажется, берет очень долго когда размер данных очень велик. Мне интересно, есть ли способ сделать шаг 3 эффективным. Полюбите любую помощь в этом.

+0

Можете ли вы показать свой ввод и ожидаемый выход? Трудно следить за тем, что вы группируете и сопоставляете над –

+1

Зачем вам нужно делать «Массив» на карте «Список»? Операция 'toList' не только очень дорогостоящая для больших коллекций, но и приводит к большому объему GC-overhead. –

+0

Итак, нужно сделать шаг 2: val mapOfCalls = groupedCallsToProcess.map (f => f._2)? – sparkDabbler

ответ

1

В вашем коде есть очень много вещей, которые вам действительно не нужны.

  1. На первом этапе вам не нужен groupBy. groupBy очень дороги в Spark.
  2. Весь второй шаг бесполезен. toList - это очень дорогостоящее решение, связанное с сбором GC.
  3. Удалите 1 дополнительную карту на третьем этапе. Каждый map является линейной операцией порядка функции отображения.
  4. Никогда ничего не делайте f.map(_._1).take(1). Вы преобразовываете весь список, но используете только 1 (или 5) элемент. Вместо этого сделайте f.take(5).map(_._1). И если вам нужно только 1 - f.head._1.

Прежде чем обсуждать, как вы можете написать этот код без groupBy по-другому, исправьте этот код.

// you had this in start 
val callsToProcess: RDD[(String, (Array[String], String))] = .... 

// RDD[(String, Iterable[(String, (Array[String], String))])] 
val groupedCallsToProcess = callsToProcess 
    .groupBy(_._1) 

// skip the second step 

val listOfMappedCalls = groupedCallsToProcess 
    .map({ case (key, iter) => { 
    // this is what you did 
    // val iterHeadString = iter.head._1 
    // but the 1st element in each tuple of iter is actually same as key 
    // so 
    val iterHeadString = key 
    // or we can totally remove this iterHeadString and use key 
    iter.map({ case (str1, (arr, str2)) => Row(
     arr(CallCols.call_date_hour), 
     arr(CallCols.sw_id), 
     arr(CallCols.s_imsi), 
     iterHeadString 
    ) }) 
    } })  

... Но, как я сказал groupBy являются очень дорогостоящими в Спарк. И у вас уже есть RDD[(key, value)] в вашем callsToProcess. Поэтому мы можем напрямую использовать aggregateByKey. Также вы можете заметить, что ваш groupBy не полезен ни для чего другого, кроме помещения всех этих строк в список, а не непосредственно внутри и RDD.

// you had this in start 
val callsToProcess: RDD[(String, (Array[String], String))] = .... 

// new lets map it to look like what you needed because we can 
// totally do this without any grouping 
// I somehow believe that you needed this RDD[Row] and not RDD[List[Row]] 
// RDD[Row] 
val mapped = callsToProcess 
    .map({ case (key, (arr, str)) => Row(
     arr(CallCols.call_date_hour), 
     arr(CallCols.sw_id), 
     arr(CallCols.s_imsi), 
     key 
) }) 


// Though I can not think of any reason of wanting this 
// But if you really needed that RDD[List[Row]] thing... 
// then keep the keys with your rows 
// RDD[(String, Row)] 
val mappedWithKey = callsToProcess 
    .map({ case (key, (arr, str)) => (key, Row(
     arr(CallCols.call_date_hour), 
     arr(CallCols.sw_id), 
     arr(CallCols.s_imsi), 
     key 
)) }) 

// now aggregate by the key to create your lists 
// RDD[List[Row]] 
val yourStrangeRDD = mappedWithKey 
    .aggregateByKey(List[Row]())(
    (list, row) => row +: list, // prepend, do not append 
    (list1, list2) => list1 ++ list2 
) 
+0

вы можете работать по небольшому контракту – sparkDabbler

+0

значение ++ не является членом List [org.apache.spark.sql.Row] любой альтернативы – sparkDabbler

+0

Что вы имеете в виду? –

Смежные вопросы