2015-10-12 3 views
13

Сценарий: Мое Ввод будет представлять собой несколько небольших XML-данных и предполагается, что эти XML будут считаться RDD. Выполните соединение с другим набором данных и сформируйте RDD и отправьте вывод как XML.Обработка Xml в Spark

Можно ли читать XML с использованием искры, загружать данные как RDD? Если возможно, как будет читаться XML.

Пример XML:

<root> 
    <users> 
     <user> 
       <account>1234<\account> 
       <name>name_1<\name> 
       <number>34233<\number> 
     <\user> 
     <user> 
       <account>58789<\account> 
       <name>name_2<\name> 
       <number>54697<\number> 
     <\user>  
    <\users> 
<\root> 

Как это будет загружено в РД?

+7

BTW, ваш XML не является вообще XML.Вам нужно заменить все '\\' на '/' –

+0

Привет Pavani! Я начинаю с этого упражнения на Spark, и я хочу знать, что решения будут более продвинутыми по классу, не могли бы вы мне помочь? –

ответ

15

Да, это возможно, но детали будут отличаться в зависимости от подхода, который вы принимаете.

  • Если файлы невелики, как вы уже упоминали, самым простым решением является загрузка ваших данных с помощью SparkContext.wholeTextFiles. Он загружает данные как RDD[(String, String)], где первым элементом является путь и второе содержимое файла. Затем вы анализируете каждый файл отдельно, как в локальном режиме.
  • Для больших файлов вы можете использовать Hadoop input formats.
    • Если структура проста, вы можете разделить записи, используя textinputformat.record.delimiter. Вы можете найти простой пример here. Ввод не является XML, но вы должны дать вам и идея, как действовать
    • В противном случае Mahout обеспечивает XmlInputFormat
  • Наконец можно прочитать файл с помощью SparkContext.textFile и настроить позже для записи остовного между разделами. Концептуально это означает что-то похожее на создание скользящего окна или partitioning records into groups of fixed size:

    • использования mapPartitionsWithIndex разделов для идентификации записей сломанных между разделами, собирать сломанные записи
    • использования второго mapPartitionsWithIndex для ремонта сломанной записей

Редакция:

Существует также относительно новых spark-xml пакета, который позволяет извлекать определенные записи по тегу:

val df = sqlContext.read 
    .format("com.databricks.spark.xml") 
    .option("rowTag", "foo") 
    .load("bar.xml") 
4

Вот способ выполнить это -> Я использовал HadoopInputFormats для чтения данных XML в искре, как объяснено zero323.

Входные данные ->

<root> 
    <users> 
     <user> 
      <account>1234<\account> 
      <name>name_1<\name> 
      <number>34233<\number><\user> 
     <user> 
      <account>58789<\account> 
      <name>name_2<\name> 
      <number>54697<\number> 
     <\user> 
    <\users> 
<\root> 

код для чтения XML Input ->

Вы получите некоторые банки в этом link

//---------------spark_import 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkConf 
import org.apache.spark.sql.SQLContext 

//----------------xml_loader_import 
import org.apache.hadoop.io.LongWritable 
import org.apache.hadoop.io.Text 
import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.io.{ LongWritable, Text } 
import com.cloudera.datascience.common.XmlInputFormat 

object Tester_loader { 
    case class User(account: String, name: String, number: String) 
    def main(args: Array[String]): Unit = { 

    val sparkHome = "/usr/big_data_tools/spark-1.5.0-bin-hadoop2.6/" 
    val sparkMasterUrl = "spark://SYSTEMX:7077" 

    var jars = new Array[String](3) 

    jars(0) = "/home/hduser/Offload_Data_Warehouse_Spark.jar" 
    jars(1) = "/usr/big_data_tools/JARS/Spark_jar/avro/spark-avro_2.10-2.0.1.jar" 

    val conf = new SparkConf().setAppName("XML Reading") 
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
     .setMaster("local") 
     .set("spark.cassandra.connection.host", "127.0.0.1") 
     .setSparkHome(sparkHome) 
     .set("spark.executor.memory", "512m") 
     .set("spark.default.deployCores", "12") 
     .set("spark.cores.max", "12") 
     .setJars(jars) 

    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._ 

    //---------------------------------loading user from XML 

    val pages = readFile("src/input_data", "<user>", "<\\user>", sc) //calling function 1.1 

    val xmlUserDF = pages.map { tuple => 
     { 
     val account = extractField(tuple, "account") 
     val name = extractField(tuple, "name") 
     val number = extractField(tuple, "number") 

     User(account, name, number) 
     } 
    }.toDF() 
    println(xmlUserDF.count()) 
    xmlUserDF.show() 
    } 

    //------------------------------------Functions 

    def readFile(path: String, start_tag: String, end_tag: String, sc: SparkContext) = { 
    val conf = new Configuration() 
    conf.set(XmlInputFormat.START_TAG_KEY, start_tag) 
    conf.set(XmlInputFormat.END_TAG_KEY, end_tag) 
    val rawXmls = sc.newAPIHadoopFile(path, classOf[XmlInputFormat], classOf[LongWritable], 
     classOf[Text], conf) 

    rawXmls.map(p => p._2.toString) 
    } 

    def extractField(tuple: String, tag: String) = { 
    var value = tuple.replaceAll("\n", " ").replace("<\\", "</") 

    if (value.contains("<" + tag + ">") && value.contains("</" + tag + ">")) { 

     value = value.split("<" + tag + ">")(1).split("</" + tag + ">")(0) 

    } 
    value 
    } 
} 

выход ->

+-------+------+------+ 
|account| name|number| 
+-------+------+------+ 
| 1234|name_1| 34233| 
| 58789|name_2| 54697| 
+-------+------+------+ 

Полученный результат в dataframes вы можете конвертировать их в РДУ согласно вашему требованию, как this->

val xmlUserRDD = xmlUserDF.toJavaRDD.rdd.map { x => (x.get(0).toString(),x.get(1).toString(),x.get(2).toString()) } 

Пожалуйста, оцените его, если он может помочь вам, как некоторые.

3

Это поможет вам.

package packagename; 

import org.apache.spark.sql.Dataset; 
import org.apache.spark.sql.Row; 
import org.apache.spark.sql.SQLContext; 
import org.apache.spark.sql.SparkSession; 

import com.databricks.spark.xml.XmlReader; 

public class XmlreaderSpark { 
    public static void main(String arr[]){ 
    String localxml="file path"; 
    String booksFileTag = "user"; 

    String warehouseLocation = "file:" + System.getProperty("user.dir") + "spark-warehouse"; 
    System.out.println("warehouseLocation" + warehouseLocation); 
    SparkSession spark = SparkSession 
       .builder() 
       .master("local") 
       .appName("Java Spark SQL Example") 
       .config("spark.some.config.option", "some-value").config("spark.sql.warehouse.dir", warehouseLocation) 
       .enableHiveSupport().config("set spark.sql.crossJoin.enabled", "true") 
       .getOrCreate(); 
    SQLContext sqlContext = new SQLContext(spark); 

    Dataset<Row> df = (new XmlReader()).withRowTag(booksFileTag).xmlFile(sqlContext, localxml); 
    df.show(); 

    } 
} 

Вам нужно добавить эту зависимость в вашем pom.xml:

<dependency> 
    <groupId>com.databricks</groupId> 
    <artifactId>spark-xml_2.10</artifactId> 
    <version>0.4.0</version> 
</dependency> 

и ваш входной файл не в нужном формате.

Спасибо.

3

Есть два хороших вариантов для простых случаев:

  • wholeTextFiles. Используйте метод сопоставления с вашим парсером XML, который может быть обработчиком XML Scala XML (более быстрый код) или SAX Pull Parser (лучшая производительность).
  • Hadoop streaming XMLInputFormat который вы должны определить начальную и конечную метку <user></user> обработать его, тем не менее, он создает один раздел для каждого пользователя тега
  • spark-xml package это хороший вариант тоже.

Со всеми параметрами вы ограничены только обработкой простых XML-данных, которые могут интерпретироваться как набор данных с строками и столбцами.

Однако, если мы сделаем это немного сложным, эти варианты не будут полезны.

Например, если у вас есть еще один объект там:

<root> 
    <users> 
    <user>...</users> 
    <companies> 
    <company>...</companies> 
</root> 

Теперь вам нужно создать 2 РД и изменить синтаксический анализатор для распознавания <company> метки.

Это простой случай, но XML может быть намного сложнее, и вам нужно будет включать все больше изменений.

Чтобы решить эту сложность, мы построили Flexter поверх Apache Spark, чтобы снять боль из processing XML files on Spark. Я также рекомендую прочитать около converting XML on Spark to Parquet. Последнее сообщение также содержит некоторые примеры кода, которые показывают, как вывод может быть запрошен с помощью SparkSQL.

Отказ от ответственности: Я работаю для Sonra

+1

Я предлагаю вам добавить отказ от ответственности, что вы являетесь соучредителем этой компании. – Davos

Смежные вопросы