TL; DR - У меня есть то, что похоже на DStream of Strings в приложении PySpark. Я хочу отправить его как DStream[String]
в библиотеку Scala. Однако строки не преобразуются Py4j.Преобразование PySpark RDD с помощью Scala
Я работаю над приложением PySpark, которое извлекает данные из Kafka, используя Spark Streaming. Мои сообщения - это строки, и я хотел бы вызвать метод в коде Scala, передав ему экземпляр DStream[String]
. Тем не менее, я не могу получить правильные строки JVM в коде Scala. Мне кажется, что строки Python не преобразуются в строки Java, а вместо этого сериализуются.
Мой вопрос будет: как получить строки Java из объекта DStream
?
Вот простейший код Python я придумал:
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))
from pyspark.streaming.kafka import KafkaUtils
stream = KafkaUtils.createDirectStream(ssc, ["IN"], {"metadata.broker.list": "localhost:9092"})
values = stream.map(lambda tuple: tuple[1])
ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)
ssc.start()
Я выполняющей этот код в PySpark, передавая ей путь к моей JAR:
pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar
On Сторона Scala, у меня есть:
package com.seigneurin
import org.apache.spark.streaming.api.java.JavaDStream
object MyPythonHelper {
def doSomething(jdstream: JavaDStream[String]) = {
val dstream = jdstream.dstream
dstream.foreachRDD(rdd => {
rdd.foreach(println)
})
}
}
Нет ж, скажем, я посылаю некоторые данные в Кафки:
echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic IN
println
заявление в Scala код печатает что-то, что выглядит как:
[[email protected]
Я ожидал получить foo bar
вместо этого.
Теперь, если я заменить простой println
заявление в коде Scala со следующим:
rdd.foreach(v => println(v.getClass.getCanonicalName))
я получаю:
java.lang.ClassCastException: [B cannot be cast to java.lang.String
Это говорит о том, что строки фактически передаются как массивы байтов ,
Если я просто пытаюсь преобразовать этот массив байтов в строку (я знаю, что я даже не указав кодировку):
def doSomething(jdstream: JavaDStream[Array[Byte]]) = {
val dstream = jdstream.dstream
dstream.foreachRDD(rdd => {
rdd.foreach(bytes => println(new String(bytes)))
})
}
я получаю то, что выглядит как (специальные символы могут быть снято):
�]qXfoo barqa.
Это предполагает, что строка Python была сериализована (маринован?). Как я мог получить правильную строку Java вместо этого?
Совершенно ясно и очень полезно. Благодаря! –
Я рад, что смогу помочь. Я, наверное, немного преувеличиваю здесь. Если ваша цель состоит в том, чтобы создавать независимые от языка расширения, тогда вы не можете действительно избегать возиться с внутренними структурами, тем не менее разработчики приняли здесь сознательное решение и возиться с этим не для слабонервных. – zero323
Привет @ zero323 Я делаю тот же процесс здесь, но имея большие проблемы во время процесса, я создаю объект для передачи моего приложения python с kerberized kafka. Но когда я создаю объект, jvm искры не может найти мою функцию в объекте. Если я создам класс, он найдет класс. Но не может отправить rdd-объект из-за ошибки: 'pyKafka ([org.apache.spark.api.java.JavaRDD, класс java.lang.String]) не существует' Я выполняю шаги. Что может быть? –