2016-09-12 4 views
5

TL; DR - У меня есть то, что похоже на DStream of Strings в приложении PySpark. Я хочу отправить его как DStream[String] в библиотеку Scala. Однако строки не преобразуются Py4j.Преобразование PySpark RDD с помощью Scala

Я работаю над приложением PySpark, которое извлекает данные из Kafka, используя Spark Streaming. Мои сообщения - это строки, и я хотел бы вызвать метод в коде Scala, передав ему экземпляр DStream[String]. Тем не менее, я не могу получить правильные строки JVM в коде Scala. Мне кажется, что строки Python не преобразуются в строки Java, а вместо этого сериализуются.

Мой вопрос будет: как получить строки Java из объекта DStream?


Вот простейший код Python я придумал:

from pyspark.streaming import StreamingContext 
ssc = StreamingContext(sparkContext=sc, batchDuration=int(1)) 

from pyspark.streaming.kafka import KafkaUtils 
stream = KafkaUtils.createDirectStream(ssc, ["IN"], {"metadata.broker.list": "localhost:9092"}) 
values = stream.map(lambda tuple: tuple[1]) 

ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream) 

ssc.start() 

Я выполняющей этот код в PySpark, передавая ей путь к моей JAR:

pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar 

On Сторона Scala, у меня есть:

package com.seigneurin 

import org.apache.spark.streaming.api.java.JavaDStream 

object MyPythonHelper { 
    def doSomething(jdstream: JavaDStream[String]) = { 
    val dstream = jdstream.dstream 
    dstream.foreachRDD(rdd => { 
     rdd.foreach(println) 
    }) 
    } 
} 

Нет ж, скажем, я посылаю некоторые данные в Кафки:

echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic IN 

println заявление в Scala код печатает что-то, что выглядит как:

[[email protected] 

Я ожидал получить foo bar вместо этого.

Теперь, если я заменить простой println заявление в коде Scala со следующим:

rdd.foreach(v => println(v.getClass.getCanonicalName)) 

я получаю:

java.lang.ClassCastException: [B cannot be cast to java.lang.String 

Это говорит о том, что строки фактически передаются как массивы байтов ,

Если я просто пытаюсь преобразовать этот массив байтов в строку (я знаю, что я даже не указав кодировку):

 def doSomething(jdstream: JavaDStream[Array[Byte]]) = { 
     val dstream = jdstream.dstream 
     dstream.foreachRDD(rdd => { 
      rdd.foreach(bytes => println(new String(bytes))) 
     }) 
     } 

я получаю то, что выглядит как (специальные символы могут быть снято):

�]qXfoo barqa. 

Это предполагает, что строка Python была сериализована (маринован?). Как я мог получить правильную строку Java вместо этого?

ответ

6

Короче говоря, нет поддерживаемого способа сделать что-то подобное. Не пробуйте это в производстве. Вас предупредили.

В общем, Spark не использует Py4j для чего-либо другого, кроме некоторых основных вызовов RPC в драйвере и не запускает шлюз Py4j на любом другом компьютере.Когда это требуется (в основном MLlib и некоторые части SQL) Spark использует Pyrolite для сериализации объектов, передаваемых между JVM и Python.

Эта часть API является частной (Scala) или внутренней (Python) и как таковая не предназначена для общего использования. Хотя теоретически вы получите доступ к нему в любом случае либо на партию:

package dummy 

import org.apache.spark.api.java.JavaRDD 
import org.apache.spark.streaming.api.java.JavaDStream 
import org.apache.spark.sql.DataFrame 

object PythonRDDHelper { 
    def go(rdd: JavaRDD[Any]) = { 
    rdd.rdd.collect { 
     case s: String => s 
    }.take(5).foreach(println) 
    } 
} 

полного поток:

object PythonDStreamHelper { 
    def go(stream: JavaDStream[Any]) = { 
    stream.dstream.transform(_.collect { 
     case s: String => s 
    }).print 
    } 
} 

или подвергая отдельную партию как DataFrames (вероятно, наименее злой варианта):

object PythonDataFrameHelper { 
    def go(df: DataFrame) = { 
    df.show 
    } 
} 

и использование эти обертки следующие:

from pyspark.streaming import StreamingContext 
from pyspark.mllib.common import _to_java_object_rdd 
from pyspark.rdd import RDD 

ssc = StreamingContext(spark.sparkContext, 10) 
spark.catalog.listTables() 

q = ssc.queueStream([sc.parallelize(["foo", "bar"]) for _ in range(10)]) 

# Reserialize RDD as Java RDD<Object> and pass 
# to Scala sink (only for output) 
q.foreachRDD(lambda rdd: ssc._jvm.dummy.PythonRDDHelper.go(
    _to_java_object_rdd(rdd) 
)) 

# Reserialize and convert to JavaDStream<Object> 
# This is the only option which allows further transformations 
# on DStream 
ssc._jvm.dummy.PythonDStreamHelper.go(
    q.transform(lambda rdd: RDD( # Reserialize but keep as Python RDD 
     _to_java_object_rdd(rdd), ssc.sparkContext 
    ))._jdstream 
) 

# Convert to DataFrame and pass to Scala sink. 
# Arguably there are relatively few moving parts here. 
q.foreachRDD(lambda rdd: 
    ssc._jvm.dummy.PythonDataFrameHelper.go(
     rdd.map(lambda x: (x,)).toDF()._jdf 
    ) 
) 

ssc.start() 
ssc.awaitTerminationOrTimeout(30) 
ssc.stop() 

Это не поддерживается, не проверено и, как таковое, бесполезно ни для чего другого, кроме экспериментов с Spark API.

+1

Совершенно ясно и очень полезно. Благодаря! –

+0

Я рад, что смогу помочь. Я, наверное, немного преувеличиваю здесь. Если ваша цель состоит в том, чтобы создавать независимые от языка расширения, тогда вы не можете действительно избегать возиться с внутренними структурами, тем не менее разработчики приняли здесь сознательное решение и возиться с этим не для слабонервных. – zero323

+0

Привет @ zero323 Я делаю тот же процесс здесь, но имея большие проблемы во время процесса, я создаю объект для передачи моего приложения python с kerberized kafka. Но когда я создаю объект, jvm искры не может найти мою функцию в объекте. Если я создам класс, он найдет класс. Но не может отправить rdd-объект из-за ошибки: 'pyKafka ([org.apache.spark.api.java.JavaRDD, класс java.lang.String]) не существует' Я выполняю шаги. Что может быть? –

Смежные вопросы