2015-05-05 2 views
0

Попытка создать Bi-карту Spark, состоящую из двух карт. Поскольку отображения уникальны с любого направления, все, что должно быть сериализовано, представляет собой единую карту, на самом деле нужно сериализовать только a Seq [(K, V)]. Таким образом, лежат только основные элементы карты. В десериализации мы можем воссоздать обратную карту и индексы.Серийная трансляция серийного программирования BiMap класса

Вот предлагаемая конструкция:

class BiMap[K, V] (
    private val m: Map[K, V], 
    // if this is serialized we allow i to be discarded and recalculated when deserialized 
    @transient private var i: Option[BiMap[V, K]] = None 
) extends Serializable { 

    // NOTE: make inverse's inverse point back to current BiMap 
    // if this is serialized we allow inverse to be discarded and recalculated 
    // when first invoked from "val size_" in the constructor 
    @transient lazy val inverse: BiMap[V, K] = { 
    if(i == null.asInstanceOf[Option[BiMap[V, K]]]) 
     i = None 
    i.getOrElse { 
     val rev = m.map(_.swap) 
     require((rev.size == m.size), "Failed to create reversed map. Cannot have duplicated values.") 
     new BiMap(rev, Some(this)) 
    } 
    } 

    // forces inverse to be calculated in the constructor when deserialized 
    // not when first used 
    @transient val size_ = inverse.size 
    ... 
} 

Хотя это похоже на работу, я не могу понять, почему я должен проверить i на нуль, но нуль после десериализации. Первоначально это был val, который имел инициализацию по умолчанию = None.

только м следует сериализовать поэтому обратное @transient lazy и есть другой @transient val size_ = inverse.size, который предназначен, чтобы вызвать обратный быть оценено при десериализации (вместо того, чтобы, когда задача вызывает inverse). Этот последний бит состоит в том, чтобы убедиться, что обратная связь является общей и не воссоздана каждой задачей.

Хотя это, кажется, работает это немного некрасиво, и я до сих пор не уверен в несколько вещей:

  1. все для хранения экземпляра, выделенного в переменном вещании, а не в задачах кучи пространства?
  2. Почему i должен быть var и проверен на null, когда он никогда не должен быть пустым?
  3. Самое главное, что это означает, что инверсия будет отброшена во время вещания и воссоздана в десериализовании?

Я понимаю, что мне нужно зарегистрировать это с помощью Kryo и в конечном итоге реализовать KryoSerializable, чтобы точно контролировать сериализацию.

ответ

0

Я могу ответить только на несколько ваших вопросов.

Прежде всего, установка Kryo в качестве сериализатора по умолчанию недостаточна. Ваши данные сериализуются в соответствии со схемой сериализации Java, которая намного менее эффективна, чем Kryo. Если вы хотите использовать Kryo, вам необходимо также установить свойство "spark.kryo.registrator" на полное имя класса вашего регистратора, который должен реализовать KryoRegistrator. Это означает наличие способа, который можно легко реализовать, как это (это Java-код, но я верю, что вы не будете иметь проблем при переносе его на Scala):

public void registerClasses(Kryo k) { 
    k.register(BiMap.class); 

} 

Если вы хотите, с Kryo вы можете управлять сериализации, делая ваш класс, реализующий KryoSerializable. Для получения дополнительной информации, пожалуйста, прочитайте documentation.

Если вы хотите, чтобы проверить, что посылается по сети, вы можете записать результат ваших сериализации в файл ...

+0

Хорошо, у меня есть так регистратор может легко зарегистрировать класс. Это означает, что он будет использовать интерфейс Serializable, если я не реализую KryoSerializable. Я отредактирую вопрос, чтобы задать ответ. – pferrel

+0

Нет, это означает, что вы будете использовать Serailizable, если вы не зарегистрируете свои классы в KryoRegistrator. Если вам не нравится поведение по умолчанию, вы можете реализовать интерфейс KryoSerializable, чтобы иметь собственную сериализацию ... – mgaido

Смежные вопросы