2014-11-20 2 views
3

Я загружаю кучу файлов JSON с использованием SparkSQL, но некоторые из них имеют проблемы.SparkSQL: Игнорирование недопустимых json-файлов

Я хотел бы продолжить обработку других файлов, игнорируя плохие файлы, как я могу это сделать?

Я пробовал использовать try-catch, но он все еще терпит неудачу. Пример:

try { 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext._ 

    val jsonFiles=sqlContext.jsonFile("/requests.loading") 
} catch { 
    case _: Throwable => // Catching all exceptions and not doing anything with them 
} 

Я неудачу на:

14/11/20 01:20:44 INFO scheduler.TaskSetManager: Starting task 3065.0 in stage 1.0 (TID 6150, HDdata2, NODE_LOCAL, 1246 bytes)<BR> 
14/11/20 01:20:44 WARN scheduler.TaskSetManager: Lost task 3027.1 in stage 1.0 (TID 6130, HDdata2): com.fasterxml.jackson.core.JsonParseException: Unexpected end-of-input: was expecting closing quote for a string value 
at [Source: [email protected]; line: 1, column: 1805] 
+0

Вы _sure_ это на самом деле не удается? Эти записи журнала представляют собой строки INFO и WARN соответственно. Журнал искры очень многословный, и эти две строки не убеждают меня в том, что исключение не поймано. Если бы я увидел линию ERROR, это было бы иначе, но лично я бы поставил 'println' в тело catch, чтобы быть уверенным. –

+0

Да, я уверен, что это неудачно. Я сначала позаботился об этом, отфильтровывая плохие jsons, но потом мне сказали, что версия 1.2 теперь может игнорировать неверные строки json, поэтому я переключился на 1.2. –

ответ

0

Если вы используете Спарк 1.2, Спарк SQL будет обрабатывать эти сломанные JSON записи для вас. Вот пример ...

// requests.loading has some broken records 
val jsonFiles=sqlContext.jsonFile("/requests.loading") 
// Look at the schema of jsonFiles, you will see a new column called "_corrupt_record", which holds all broken JSON records 
// jsonFiles.printSchema 
// Register jsonFiles as a table 
jsonFiles.registerTempTable("jsonTable") 
// To query all normal records 
sqlContext.sql("SELECT * FROM jsonTable WHERE _corrupt_record IS NULL") 
// To query all broken JSON records 
sqlContext.sql("SELECT _corrupt_record FROM jsonTable WHERE _corrupt_record IS NOT NULL") 
Смежные вопросы