2014-11-11 2 views
3

Я пишу приложение Spark в scala и хочу обрабатывать грязный входной файл.Spark Scala scala.util.control.Exception catching and dropping Нет на карте

// CSV file 
val raw_data = sc.textFile(...) 

val clean_data = raw_data.map(_.split(delimiter)) 
    .map(r => (r(0), r(1).toDouble) 

будет вызывать исключение NumberFormatException, когда r (1) не является числом. Это происходит в небольшом количестве строк в уродливых входных данных.

я, наконец, приземлился на уродливой способ сделать то, что мне нужно:

import scala.util.control.Exception._ 

val clean_data = raw_data.map(_.split(delimiter)) 
    .map(r => (r(0), 
     catching(classOf[NumberFormatException]).opt(r(1).toDouble)) 
    .filter(r => r._2 != None) 
    .map(r => (r._1, r._2.get)) 

Это оставляет меня два вопроса.

1) Каков наилучший способ просто отбросить неверные строки на карте?

2) Как я могу обрабатывать типы опций, созданные путем ловли без необходимости сначала явно фильтровать None, а затем отображать и применять функцию .get для значений параметра None-None?

Я попытался применить шаг .flatMap (identity), чтобы избавиться от Nones, но получил ожидаемое исключение: TraversableOnce [?].

ответ

4

В Spark collect(pf:PartialFunction) является братом-близнецом коллекций scala collect, который существует именно с этой целью: сохраните те элементы коллекции, которые определены в частичной функции.

val rawData = sc.textFile(...) 

val cleanData = rawData.map(_.split(Delimiter)) 
      .collect{ case Array(x,y) if (Try(y.toDouble).isSuccess) (x,y.toDouble) } 

Другой вариант, который не оценивает .toDouble дважды будет использовать flatMap:

val cleanData = rawData.map(_.split(Delimiter)) 
         .flatMap(entry => Try(entry.toDouble).toOption) 

Примечание: Искра немного сбивает с толку, что есть без параметров collect метод, который предназначен для получения данных от RDD до водителя.

+0

Хорошо, после некоторого ворча я получил это решение для работы (например, Array (x, y) делал x и y Any, поэтому вместо этого я просто (x, y) сохранил их как строки). Но у меня есть некоторые вопросы. Во-первых, просто кажется, что немного странно звонить дважды. Дважды два раза - один раз, чтобы проверить, не является ли оно двойным, и один раз, чтобы на самом деле это сделать. Во-вторых, для этого случая .collect() работает хорошо, потому что я могу вытащить RDD в память драйвера. Но что бы я сделал в тех случаях, когда я действительно имел дело с большими данными и не мог использовать .collect() Спасибо за помощь! – Metropolis

+2

re: 'collect' Я знаю, что это сбивает с толку. В RDD есть два метода 'collect'. Безпараметрическая версия 'def collect(): Array [T]' запускает вычисление и передает все данные водителю. Версия, которая принимает частичную функцию: 'def collect [U] (f: PartialFunction [T, U]) (неявный arg0: ClassTag [U]): RDD [U]' является эквивалентом функции Scala 'collect' на коллекциях и выдает другой результат RDD, поэтому он все еще параллелен любым другим преобразованиям RDD. – maasg

+1

rd: toDouble (x2) - Я использую его, потому что я предпочитаю версию частичной функции, но в действительности она выполняет в два раза больше операции. Вот альтернатива: 'data.flatMap (entry => Try (entry.toDouble) .toOption)' – maasg

Смежные вопросы