2017-01-09 2 views
14

Я включил сериализацию Kryo для моей работы Spark, включил параметр, требующий регистрации, и обеспечил регистрацию всех моих типов.Почему Spark хуже работает при использовании сериализации Kryo?

val conf = new SparkConf() 
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
conf.set("spark.kryo.registrationRequired", "true") 
conf.registerKryoClasses(classes) 
conf.registerAvroSchemas(avroSchemas: _*) 

производительность стандартной даты времени задания ухудшалась примерно 20%, а количество байт перемешиваются увеличилось почти на 400%.

Это кажется действительно удивительным для меня, учитывая предложение Spark documentation о том, что Крио должно быть лучше.

Kryo значительно быстрее и компактнее, чем Java сериализации (часто так же, как в 10 раз)

я вручную вызывается метод serialize о случаях возгорания искры org.apache.spark.serializer.KryoSerializer и org.apache.spark.serializer.JavaSerializer с примером моих данных. Результаты были согласуются с предложениями в документации Spark: Kryo выпустил 98 байт; Java произвела 993 байта. Это действительно 10-кратное улучшение.

Возможно, смешающим фактором является то, что объекты, которые сериализуются и перетасовываются, реализуют интерфейс Avro GenericRecord. Я попытался зарегистрировать схемы Avro в SparkConf, но это не улучшилось.

Я пробовал делать новые классы, чтобы перетасовать данные, которые были простыми Scala case class es, не включая любые машины Avro. Это не улучшило производительность в случайном порядке или количество обмениваемых байтов.

Код Спарк заканчивается кипячения к следующему:

case class A(
    f1: Long, 
    f2: Option[Long], 
    f3: Int, 
    f4: Int, 
    f5: Option[String], 
    f6: Option[Int], 
    f7: Option[String], 
    f8: Option[Int], 
    f9: Option[Int], 
    f10: Option[Int], 
    f11: Option[Int], 
    f12: String, 
    f13: Option[Double], 
    f14: Option[Int], 
    f15: Option[Double], 
    f16: Option[Double], 
    f17: List[String], 
    f18: String) extends org.apache.avro.specific.SpecificRecordBase { 
    def get(f: Int) : AnyRef = ??? 
    def put(f: Int, value: Any) : Unit = ??? 
    def getSchema(): org.apache.avro.Schema = A.SCHEMA$ 
} 
object A extends AnyRef with Serializable { 
    val SCHEMA$: org.apache.avro.Schema = ??? 
} 

case class B(
    f1: Long 
    f2: Long 
    f3: String 
    f4: String) extends org.apache.avro.specific.SpecificRecordBase { 
    def get(field$ : Int) : AnyRef = ??? 
    def getSchema() : org.apache.avro.Schema = B.SCHEMA$ 
    def put(field$ : Int, value : Any) : Unit = ??? 
} 
object B extends AnyRef with Serializable { 
    val SCHEMA$ : org.apache.avro.Schema = ??? 
} 

def join(as: RDD[A], bs: RDD[B]): (Iterable[A], Iterable[B]) = { 
    val joined = as.map(a => a.f1 -> a) cogroup bs.map(b => b.f1 -> b) 
    joined.map { case (_, asAndBs) => asAndBs } 
} 

У вас есть какие-либо идеи, что может быть происходит и как я мог бы получить более высокую производительность, которая должна быть доступна из Kryo?

+0

Не могли бы вы опубликовать пример случай класс и работу? Было бы намного легче ответить на вопрос тогда –

+0

Хорошая точка, @ T.Gawęd. Обновлен с помощью упрощенного кода. –

+0

Как вы оценили свой код? –

ответ

0

Поскольку у вас очень большие РСД, широковещательное/широковещательное хеш-соединение, похоже, будет недоступно.

С наилучшими пожеланиями до coalesce() ваших RDD перед присоединением. Вы видите высокий перекос во время перетасовки? Если это так, вы можете захотеть объединиться с shuffle = true.

И, наконец, если у вас есть RDD вложенных структур (например, JSON), которые иногда позволят вам обходить тасования. Посмотрите слайды и/или видео here для более подробного объяснения.

1

Если ваш размер одной записи слишком мал, и огромное количество записей может замедлить работу. Постарайтесь увеличить размер буфера и посмотреть, не улучшилось ли оно.

Попробуйте ниже один, если не сделано уже ..

val conf = new SparkConf() 
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
    // Now it's 24 Mb of buffer by default instead of 0.064 Mb 
    .set("spark.kryoserializer.buffer.mb","24") 

Ref: https://ogirardot.wordpress.com/2015/01/09/changing-sparks-default-java-serialization-to-kryo/

Смежные вопросы