2015-06-26 2 views
1

Я пытаюсь импортировать файл модели PMML, сгенерированный в R в Spark Context и использовать его для прогнозирования результатов. Это код, используемый в Spark.Spark JPMML import Issue

JavaRDD<String> scoreData = data.map(new Function<String, String>() { 

    @Override 
    public String call(String line) throws Exception { 
     String[] row = line.split(","); 
     PMML pmml; 
     Evaluator evaluator; 
     FileSystem fs = FileSystem.get(new Configuration()); 
     FSDataInputStream inStr = fs.open(new Path("PATH_TO_PMML_FILE")); 
     Source transformedSource = ImportFilter.apply(new InputSource(inStr)); 
     pmml = JAXBUtil.unmarshalPMML(transformedSource); 
     System.out.println(pmml.getModels().get(0).getModelName()); 
     ModelEvaluatorFactory modelEvaluatorFactory = ModelEvaluatorFactory.newInstance(); 
     ModelEvaluator<?> modelEvaluator = modelEvaluatorFactory.newModelManager(pmml); 
     System.out.println(modelEvaluator.getSummary()); 
     evaluator = (Evaluator) modelEvaluator; 

     List<FieldName> activeFields = evaluator.getActiveFields(); 
     double[] features = new double[row.length - 2]; // row - {contact_id,label} 
     StringBuilder strBld = new StringBuilder(); 
     Map<FieldName, FieldValue> arguments = new LinkedHashMap<FieldName, FieldValue>(); 
     strBld.append(row[0]); 
     for (int i = 3; i <= row.length - 1; i++) { 
      //from f1 - f16 
      FieldValue activeValue = evaluator.prepare(activeFields.get(i - 3), Double.parseDouble(row[i])); 
      arguments.put(activeFields.get(i - 3), activeValue); 
     } 
    } 

код работал нормально при запуске в основной среде Java (без контекста Спарк), но при работе над кодом, который я получаю следующее исключение

java.lang.NoSuchMethodError: com.google.common.collect.Range.closed(Ljava/lang/Comparable;Ljava/lang/Comparable;)Lcom/google/common/collect/Range; 
at org.jpmml.evaluator.Classification$Type.<clinit>(Classification.java:278) 
at org.jpmml.evaluator.ProbabilityDistribution.<init>(ProbabilityDistribution.java:26) 
at org.jpmml.evaluator.GeneralRegressionModelEvaluator.evaluateClassification(GeneralRegressionModelEvaluator.java:333) 
at org.jpmml.evaluator.GeneralRegressionModelEvaluator.evaluate(GeneralRegressionModelEvaluator.java:107) 
at org.jpmml.evaluator.ModelEvaluator.evaluate(ModelEvaluator.java:266) 
at org.zcoe.spark.pmml.PMMLSpark_2$1.call(PMMLSpark_2.java:146) 
at org.zcoe.spark.pmml.PMMLSpark_2$1.call(PMMLSpark_2.java:1) 
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:999) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) 
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) 
at org.apache.spark.scheduler.Task.run(Task.scala:64) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745) 

Проблема, кажется, с совместимостью Файл Guvava Jar, который необходим для запуска кода. Я удалил все банки, содержащие класс com.google.common.collect.Range, из класса классов Spark, но по-прежнему сохраняется та же проблема.

Подробности Спарк Работа ниже,

искровым представить --jars ./lib/pmml-evaluator-1.2.0.jar,./lib/pmml-model-1.2.2.jar ,. /lib/pmml-manager-1.1.20.jar./lib/pmml-schema-1.2.2.jar./lib/guava-15.0.jar --class

[Этап 0:> (0 + 2)/2] 15/06/26 14:39:15 ERROR YarnScheduler: потерянный исполнитель 1 на hslave2: удаленный клиент Akka отключен 15.06.26 14:39:15 ОШИБКА YarnScheduler: потерянный исполнитель 2 на hslave1: удаленный Клиент Akka disasociated [Этап 0:> (0 + 2)/2] 15/06/26 14:39:33 ОШИБКА Ярвы: утерянный исполнитель 4 на hslave1: удаленный Ак ka disasociated 15.06.26 14:39:33 ОШИБКА TaskSetManager: Задача 0 на этапе 0.0 не удалась 4 раза; отмена работы

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, hslave1): ExecutorLostFailure (executor 4 lost) 
Driver stacktrace: 
     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) 
     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) 
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) 
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) 
     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 

Сообщите мне, если будут ошибки, которые я совершил.

+0

Я пытаюсь сделать то же самое, но хочу создать модель один раз и передать ее внутри 'map {}', вы пробовали этот подход для лучшей производительности? https://www.mail-archive.com/[email protected]/msg42171.html – zengr

ответ

2

Вы должны позволить как Spark, так и JPMML иметь собственную версию библиотек Guava. Не рекомендуется модифицировать базовую установку Spark, когда вы можете достичь своей цели, просто переработав упаковку своего приложения Spark.

Если вы переместите приложение Spark в Apache Maven, тогда можно использовать relocation feature of the Maven Shade Plugin для перемещения JPMML-версии библиотеки Guava в другой пакет, такой как org.jpmml.com.google. example application of the JPMML-Cascading project делает этот трюк.

Кроме того, с точки зрения перехода к Apache Maven, ваше приложение Spark будет доступно как файл uber-JAR, что значительно упростит его развертывание. Например, на данный момент вы указываете pmml-manager-1.1.20.jar в своей командной строке, которая не нужна.

+0

Спасибо за ответ, я очень новичок в Maven, не могли бы вы рассказать о том, как это сделать. –

+1

Чтобы начать работу, возьмите ссылочный JPMML-каскадный POM-файл и замените каскадные зависимости зависимостями Spark. Затем поставьте свой источник приложения в каталог 'src/main/java' и создайте с помощью' mvn clean install'. – user1808924

+0

Его работа с предложением, которое вы дали @ user1808924. Большое спасибо за помощь –