Я включил сериализацию Kryo для моей работы Spark, включил параметр, требующий регистрации, и обеспечил регистрацию всех моих типов.Почему Spark хуже работает при использовании сериализации Kryo?
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrationRequired", "true")
conf.registerKryoClasses(classes)
conf.registerAvroSchemas(avroSchemas: _*)
производительность стандартной даты времени задания ухудшалась примерно 20%, а количество байт перемешиваются увеличилось почти на 400%.
Это кажется действительно удивительным для меня, учитывая предложение Spark documentation о том, что Крио должно быть лучше.
Kryo значительно быстрее и компактнее, чем Java сериализации (часто так же, как в 10 раз)
я вручную вызывается метод serialize
о случаях возгорания искры org.apache.spark.serializer.KryoSerializer
и org.apache.spark.serializer.JavaSerializer
с примером моих данных. Результаты были согласуются с предложениями в документации Spark: Kryo выпустил 98 байт; Java произвела 993 байта. Это действительно 10-кратное улучшение.
Возможно, смешающим фактором является то, что объекты, которые сериализуются и перетасовываются, реализуют интерфейс Avro GenericRecord
. Я попытался зарегистрировать схемы Avro в SparkConf
, но это не улучшилось.
Я пробовал делать новые классы, чтобы перетасовать данные, которые были простыми Scala case class
es, не включая любые машины Avro. Это не улучшило производительность в случайном порядке или количество обмениваемых байтов.
Код Спарк заканчивается кипячения к следующему:
case class A(
f1: Long,
f2: Option[Long],
f3: Int,
f4: Int,
f5: Option[String],
f6: Option[Int],
f7: Option[String],
f8: Option[Int],
f9: Option[Int],
f10: Option[Int],
f11: Option[Int],
f12: String,
f13: Option[Double],
f14: Option[Int],
f15: Option[Double],
f16: Option[Double],
f17: List[String],
f18: String) extends org.apache.avro.specific.SpecificRecordBase {
def get(f: Int) : AnyRef = ???
def put(f: Int, value: Any) : Unit = ???
def getSchema(): org.apache.avro.Schema = A.SCHEMA$
}
object A extends AnyRef with Serializable {
val SCHEMA$: org.apache.avro.Schema = ???
}
case class B(
f1: Long
f2: Long
f3: String
f4: String) extends org.apache.avro.specific.SpecificRecordBase {
def get(field$ : Int) : AnyRef = ???
def getSchema() : org.apache.avro.Schema = B.SCHEMA$
def put(field$ : Int, value : Any) : Unit = ???
}
object B extends AnyRef with Serializable {
val SCHEMA$ : org.apache.avro.Schema = ???
}
def join(as: RDD[A], bs: RDD[B]): (Iterable[A], Iterable[B]) = {
val joined = as.map(a => a.f1 -> a) cogroup bs.map(b => b.f1 -> b)
joined.map { case (_, asAndBs) => asAndBs }
}
У вас есть какие-либо идеи, что может быть происходит и как я мог бы получить более высокую производительность, которая должна быть доступна из Kryo?
Не могли бы вы опубликовать пример случай класс и работу? Было бы намного легче ответить на вопрос тогда –
Хорошая точка, @ T.Gawęd. Обновлен с помощью упрощенного кода. –
Как вы оценили свой код? –