2016-12-29 4 views
2

Я оцениваю искру с помощью базы данных marklogic. Я прочитал csv-файл, теперь у меня есть объект JavaRDD, который я должен сбрасывать в базу данных marklogic.Как написать JavaRDD в базу данных marklogic

SparkConf conf = new SparkConf().setAppName("org.sparkexample.Dataload").setMaster("local"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 

    JavaRDD<String> data = sc.textFile("/root/ml/workArea/data.csv"); 
    SQLContext sqlContext = new SQLContext(sc); 
    JavaRDD<Record> rdd_records = data.map(
     new Function<String, Record>() { 
      public Record call(String line) throws Exception { 
      String[] fields = line.split(","); 
      Record sd = new Record(fields[0], fields[1], fields[2], fields[3],fields[4]); 
      return sd; 
      } 
    }); 

Этот объект JavaRDD, который я хочу записать в базу данных marklogic.

Есть ли какие-либо искрообразования, доступные для быстрой записи в базу данных marklogic?

Скажем, если бы мы не могли написать JavaRDD непосредственно в marklogic, то каков правильный подход для достижения этого?

Вот код, который я использую для записи данных JavaRDD в базу данных marklogic, сообщите мне, если это неправильный способ сделать это.

final DatabaseClient client = DatabaseClientFactory.newClient("localhost",8070, "MLTest"); 
    final XMLDocumentManager docMgr = client.newXMLDocumentManager(); 
    rdd_records.foreachPartition(new VoidFunction<Iterator<Record>>() { 
     public void call(Iterator<Record> partitionOfRecords) { 
      while (partitionOfRecords.hasNext()) { 
       Record record = partitionOfRecords.next(); 
       System.out.println("partitionOfRecords - "+record.toString()); 
       String docId = "/example/"+record.getID()+".xml"; 
       JAXBContext context = JAXBContext.newInstance(Record.class); 
       JAXBHandle<Record> handle = new JAXBHandle<Record>(context); 
       handle.set(record); 
       docMgr.writeAs(docId, handle); 
      } 
     } 
    }); 
    client.release(); 

Я использовал Java клиент API для записи данных, но я получаю ниже исключения, даже если класс POJO Record реализует Serializable интерфейс. Пожалуйста, дайте мне знать, что может быть причиной & как решить это.

org.apache.spark.sparkexception task not Serializable.

+0

Я не знаю много о Спарк, но вот некоторые ресурсы, которые могут помочь: [Как использовать MarkLogic в приложениях Apache Spark] (http://developer.marklogic.com/blog/marklogic-spark-example); [Установка Spark для работы с MarkLogic] (http://www.marklogic.com/resources/putting-spark-work-marklogic/). –

+0

Спасибо Дэйв, я видел этот пример. В этом примере хранятся выходные данные в hdfs не в базе данных marklogic. – RCS

ответ

0

Модифицированный пример от spark streaming guide. Здесь вам нужно будет реализовать логику соединения и записи, специфичную для базы данных.

public void send(JavaRDD<String> rdd) { 
    rdd.foreachPartition(new VoidFunction<Iterator<String>>() { 
     @Override 
     public void call(Iterator<String> partitionOfRecords) { 
     // ConnectionPool is a static, lazily initialized pool of 
     Connection connection = ConnectionPool.getConnection(); 
     while (partitionOfRecords.hasNext()) { 
      connection.send(partitionOfRecords.next()); 
     } 
     ConnectionPool.returnConnection(connection); // return to the pool 
     // for future reuse 
     } 
    }); 
    } 
+0

В вашем ответе четко не говорится, как я могу записывать данные в базу данных marklogic. Не могли бы вы рассказать о своем ответе? – RCS

+0

Как я могу создать connectionPool для базы данных marklogic? – RCS

+0

См. Https://docs.marklogic.com/guide/java/intro. Он имеет пример при записи в MarkLogic. Надеюсь, это поможет. –

1

Самый простой способ получить данные в MarkLogic осуществляется через HTTP и клиента REST API - в частности/v1/документы конечных точек - http://docs.marklogic.com/REST/client/management.

Существует множество способов оптимизации этого, например, с помощью набора записей, но на основе вашего вопроса, я думаю, первое, что нужно решить, - какой документ вы хотите написать для каждой записи? Ваш пример показывает 5 столбцов в CSV - как правило, вы будете писать либо JSON, либо XML-документ с 5 полями/элементами, каждый из которых будет основан на индексе столбца. Поэтому вам нужно написать небольшой код для создания этого JSON/XML, а затем использовать любой HTTP-клиент, который вы предпочитаете (и один из них - API клиента MarkLogic Java), чтобы записать этот документ в MarkLogic.

Это касается вашего вопроса о том, как написать JavaRDD MarkLogic, но если ваша цель - как можно быстрее получить данные из CSV в MarkLogic, пропустите Spark и используйте mlcp - https://docs.marklogic.com/guide/mlcp/import#id_70366, что связано с нулевым кодированием.

+0

Я обновил свой вопрос, добавил код для java client api для записи в базу данных marklogic. Я пишу запись как XML-данные. Но получение исключения: задача sparkexception не Serializable. Не удалось выяснить причину исключения. Если у вас есть какие-либо подсказки, пожалуйста, дайте мне знать. Кроме того, как я пишу в базу данных, если не правильно, то предложите мне ОДИН. – RCS

+0

В API клиента Java существует несколько методов для написания документа или набора документов, и для этого примера «writeAs» отлично работает. – rjrudin

+0

Чтобы изолировать проблему с DatabaseClient, не являющейся Serializable, я рекомендую не использовать JAXBContext и просто сделать что-то очень простое, например, создать строку XML, а затем записать ее через StringHandle. Если это сработает, то вы знаете, что проблема связана с JAXB, а некоторый класс не является Serializable. Если это не сработает, тогда проблема связана с DatabaseClient, и я думаю, что предложение Сэма Меффорда ниже со статической ссылкой на DatabaseClient - хорошая вещь, чтобы попробовать. – rjrudin

0

Мне интересно, нужно ли вам просто убедиться, что все, что вы получили в своей VoidFunction, которое было создано вне его, является сериализуемым (см. this page). DatabaseClient и XMLDocumentManager, конечно, не могут быть сериализованы, поскольку они связаны с ресурсами. Однако вы правы, чтобы не создавать экземпляр DatabaseClient внутри вашей VoidFunction, поскольку это будет менее эффективно (хотя оно будет работать). Я не знаю, будет ли следующая идея работать с искрами. Но я предполагаю, что вы могли бы создать класс, который сохраняет владение одноплодной DatabaseClient например:

public static class MLClient { 
    private static DatabaseClient singleton; 
    private MLClient() {} 

    public static DatabaseClient get(DatabaseClientFactory.Bean connectionInfo) { 
    if (connectionInfo == null) { 
     throw new IllegalArgumentException("connectionInfo cannot be null"); 
    } 
    if (singleton == null) { 
     singleton = connectionInfo.newClient(); 
    } 
    return singleton; 
    } 
} 

тогда вы просто создать сериализуемую DatabaseClientFactory.Bean за пределами VoidFunction так что ваша информация аутентификации по-прежнему централизованы

DatabaseClientFactory.Bean connectionInfo = 
    new DatabaseClientFactory.Bean(); 
connectionInfo.setHost("localhost"); 
connectionInfo.setPort(8000); 
connectionInfo.setUser("admin"); 
connectionInfo.setPassword("admin"); 
connectionInfo.setAuthenticationValue("digest"); 

Тогда внутри VoidFunction вы могли бы получить, что синглтон DatabaseClient и новый XMLDocumentManager как так:

DatabaseClient client = MLClient.get(connectionInfo); 
XMLDocumentManager docMgr = client.newXMLDocumentManager(); 
+0

Спасибо Сэму за ваш ответ. Я знаю, что сериализация клиента базы данных не рекомендуется, но это VoidFunction от искры, в котором все ресурсы должны быть сериализованы. На самом деле я искал коннектор для marklogic с искрой, но не нашел, поэтому попробовал java client api сам с искровым. – RCS

Смежные вопросы