2014-04-14 5 views
19

Взгляните на этот вопрос: Scala + Spark - Task not serializable: java.io.NotSerializableExceptionon. When calling function outside closure only on classes not objects.Spark - Задача не сериализуема: как работать со сложными закрытиями карт, вызывающими внешние классы/объекты?

Проблема:

Пусть мои картографы могут быть функциями (DEF), которые внутренне называют другие классы и создавать объекты и делать разные вещи внутри. (Или они могут даже быть классами, которые расширяют (Foo) => Bar, и выполняют обработку в методе их применения, но пока не будем игнорировать этот случай)

Spark поддерживает только Java Serialization для закрытий. Есть ли выход из этого? Можем ли мы использовать что-то вместо закрытия, чтобы делать то, что я хочу сделать? Мы можем легко делать такие вещи с Hadoop. Это единственное, что делает Искра почти непригодной для меня. Нельзя ожидать, что все сторонние библиотеки будут иметь все классы, расширяющие Serializable!

Возможные решения:

ли, кажется, что-то вроде этого, чтобы быть полезным: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

Это, конечно, кажется, как обертку ответ, но я не могу видеть, как именно.

+0

Связанный: Кроме того, во избежание прохождения SparkContext в РДД карту/фильтр/flatMap и т. д., которые могут дать аналогичную ошибку. – RAbraham

ответ

12

Я понял, как это сделать сам!

Вам просто нужно сериализовать объекты перед тем, как пройти через крышку, и затем де-сериализовать. Этот подход просто работает, даже если ваши классы не являются Serializable, потому что он использует Kryo за кулисами. Все, что вам нужно, это карри. ;)

Вот пример того, как я сделал это:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)]) 
       (foo: Foo) : Bar = { 
    kryoWrapper.value.apply(foo) 
} 
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _ 
rdd.flatMap(mapper).collectAsMap() 

object Blah(abc: ABC) extends (Foo => Bar) { 
    def apply(foo: Foo) : Bar = { //This is the real function } 
} 

Вы можете сделать Бла так сложно, как вы хотите, класс, объект компаньон, вложенные классы, ссылки на нескольких сторонних LIBS 3rd.

KryoSerializationWrapper referes к: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

+0

Другой вариант - реализовать интерфейс Serializable в самом классе Blah. – SKP

+0

С другой стороны, оболочка делает ее более гибкой, вы можете переключиться на разный тип сериализации, если хотите. – SKP

+1

@ SKP Это было вопросом. Как оказалось, не только Blah, но и поля экземпляров Blah нуждаются в расширении Serializable - это очевидно, поскольку все будет рекурсивно сохранено. Что делать, если ваш класс использует сторонние библиотеки и модифицирует их код, может вызвать у вас кошмар? Вот что-то вроде этого пригодится. Более того, Java Serialization работает медленно. – Nilesh

3

В случае использования Java API следует избегать анонимный класс при переходе к замыканию функции отображения. Вместо того чтобы делать карты (новая функция) вам нужен класс, который расширяет ваши функции и передать, что на карте (..) См: https://yanago.wordpress.com/2015/03/21/apache-spark/

+0

, когда вы говорите, что класс расширяет вашу функцию, я хотел расширить VoidFunction, который является интерфейсом, я немного запутался здесь. Мне нужно реализовать или расширить, если мне нужно расширить, мне нужно фактически создать интерфейс? – Shankar

+0

В этом конкретном примере вам необходимо расширить PairFunction. Нет необходимости реализовывать чередование.карта принимает (PairFunction f) или (Функция f) – vvy

+0

Не могли бы вы привести здесь пример? –

Смежные вопросы