2017-02-09 2 views
2

Я использую искры, чтобы читать CSV-файл, как это:Как уменьшить перетасовку и время, затраченное Spark при создании карты предметов?

x, y, z 
x, y 
x 
x, y, c, f 
x, z 

Я хочу, чтобы сделать карту пунктов против их подсчета. Это код, который я написал:

private def genItemMap[Item: ClassTag](data: RDD[Array[Item]],  partitioner: HashPartitioner): mutable.Map[Item, Long] = { 
    val immutableFreqItemsMap = data.flatMap(t => t) 
     .map(v => (v, 1L)) 
     .reduceByKey(partitioner, _ + _) 
     .collectAsMap() 

    val freqItemsMap = mutable.Map(immutableFreqItemsMap.toSeq: _*) 
    freqItemsMap 
    } 

Когда я запускаю его, он занимает много времени и перемешивает пространство. Есть ли способ сократить время?

У меня есть кластер из двух узлов с 2 ядрами и 8 разделов. Количество строк в файле csv составляет 170000.

+0

Проблема в том, что 'collectAsMap'. Все операции 'collect' приводят к сборке в памяти всех элементов одного исполнителя, который снова передает все эти данные. Вы должны полностью удалить все операции 'collect' при работе с реальными данными –

ответ

0

Если вы просто хотите сделать уникальную вещь для подсчета предметов, то, полагаю, вы можете принять следующий подход.

val data: RDD[Array[Item]] = ??? 

val itemFrequency = data 
    .flatMap(arr => 
    arr.map(item => (item, 1)) 
) 
    .reduceByKey(_ + _) 

Не указывайте какой-либо разделитель при уменьшении, в противном случае это вызовет повторное перетасовку. Просто держите его с разделом, который у него уже есть.

Также ... не следует использовать collect распределенные данные в локальный объект в памяти, например Map.

Смежные вопросы