2015-05-20 2 views
0

Я пытаюсь выполнить некоторую обработку текста, используя Spark RDD.Spark - Как правильно обрабатывать ошибку в RDD.map()?

Формат входного файла:

2015-05-20T18:30 <some_url>/?<key1>=<value1>&<key2>=<value2>&...&<keyn>=<valuen> 

Я хочу, чтобы извлечь некоторые поля из текста и преобразовывать их в формат CSV, как:

<value1>,<value5>,<valuek>,<valuen> 

Следующий код, как я делаю это:

val lines = sc.textFile(s"s3n://${MY_BUCKET}/${MY_FOLDER}/test/*.gz") 
val records = lines.map { line => 
    val mp = line.split("&") 
       .map(_.split("=")) 
       .filter(_.length >= 2) 
       .map(t => (t(0), t(1))).toMap 

    (mp.get("key1"), mp.get("key5"), mp.get("keyk"), mp.get("keyn")) 
} 

Я хотел бы знать, что если какая-либо строка входного текста неправильного формата или i nvalid, то функция map() не может вернуть допустимое значение. Это должно очень распространяться в обработке текста, какова наилучшая практика для решения этой проблемы?

ответ

9

для того, чтобы справиться с этой ошибки вы можете использовать класс в Scala, попробуйте в работе flatMap, в коде:

val lines = sc.textFile(s"s3n://${MY_BUCKET}/${MY_FOLDER}/test/*.gz") 
    val records = lines.flatMap (line => 
     Try{ 
      val mp = line.split("&") 
       .map(_.split("=")) 
       .filter(_.length >= 2) 
       .map(t => (t(0), t(1))).toMap 

      (mp.get("key1"), mp.get("key5"), mp.get("keyk"), mp.get("keyn")) 
     } match { 
     case Success(map) => Seq(map) 
     case _ => Seq() 
    }) 

При этом у вас есть только «хорошие», но если вы хотите оба (ошибки и хорошие) я бы рекомендовал использовать функцию карты, которая возвращает Scala Либо, а затем использовать Спарк фильтр, в коде:

val lines = sc.textFile(s"s3n://${MY_BUCKET}/${MY_FOLDER}/test/*.gz") 
    val goodBadRecords = lines.map (line => 
     Try{ 
      val mp = line.split("&") 
       .map(_.split("=")) 
       .filter(_.length >= 2) 
       .map(t => (t(0), t(1))).toMap 

      (mp.get("key1"), mp.get("key5"), mp.get("keyk"), mp.get("keyn")) 
     } match { 
     case Success(map) => Right(map) 
     case Failure(e) => Left(e) 
    }) 
    val records = goodBadRecords.filter(_.isRight) 
    val errors = goodBadRecords.filter(_.isLeft) 

Я надеюсь, что это будет полезно

+0

к сожалению, почему вы переназначение a Попробуйте в Ли? Семантика в этом случае точно такая же. –

Смежные вопросы