2015-10-05 5 views
0

Я знаю, что на этот вопрос был дан ответ много раз, но я пробовал все, и я не пришел к решению. У меня есть следующий код, который поднимает NotSerializableExceptionNotSerializableException: org.apache.hadoop.io.LongWritable

val ids : Seq[Long] = ... 
ids.foreach{ id => 
sc.sequenceFile("file", classOf[LongWritable], classOf[MyWritable]).lookup(new LongWritable(id)) 
} 

за исключением следующего

Caused by: java.io.NotSerializableException: org.apache.hadoop.io.LongWritable 
Serialization stack: 
... 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84) 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) 

При создании SparkContext, я

val sparkConfig = new SparkConf().setAppName("...").setMaster("...") 
sparkConfig.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
sparkConfig.registerKryoClasses(Array(classOf[BitString[_]], classOf[MinimalBitString], classOf[org.apache.hadoop.io.LongWritable])) 
sparkConfig.set("spark.kryoserializer.classesToRegister", "org.apache.hadoop.io.LongWritable,org.apache.hadoop.io.Text,org.apache.hadoop.io.LongWritable") 

и глядя на вкладке окружающей среды, я могу см. эти записи. Тем не менее, я не понимаю, почему

  1. сериализатор Kryo, кажется, не будет использоваться (стек не упоминает Kryo)
  2. LongWritable не сериализации.

Я использую Apache Spark, ст. 1.5.1

+1

Я не совсем уверен, чего вы пытаетесь достичь. – eliasah

+0

Я пытаюсь найти определенные ключи в файле последовательности. Ключи передаются методу; для примера я просто создал их случайным образом. – navige

+1

Почему бы вам не преобразовать его в обычный rdd, а затем искать? – eliasah

ответ

3
  1. Загрузка повторно одни и те же данные внутри цикла крайне неэффективно. Если вы выполняете действия против одних и тех же данных, загрузить его один раз и кэш:

    val rdd = sc 
        .sequenceFile("file", classOf[LongWritable], classOf[MyWritable]) 
    
    rdd.cache() 
    
  2. Спарк не считает Hadoop Writables быть сериализации. Для этого есть открытая JIRA (SPARK-2421). Для обработки LongWritables просто get должно быть достаточно:

    rdd.map{case (k, v) => k.get()} 
    

    Что касается вашего пользовательского класса это ваша ответственность, чтобы справиться с этой проблемой.

  3. Для эффективного поиска необходим частичный RDD. В противном случае он должен искать каждый раздел в вашем RDD.

    import org.apache.spark.HashPartitioner 
    
    val numPartitions: Int = ??? 
    val partitioned = rdd.partitionBy(new HashPartitioner(numPartitions)) 
    
  4. Вообще говоря, RDD не предназначены для произвольного доступа. Даже с определенным разделителем lookup приходится линейно искать кандидатский раздел. С 5000 равномерно распределенных ключей и 10M объектов в RDD это, скорее всего, означает повторный поиск по всему RDD. У вас есть несколько вариантов, чтобы избежать этого:

    • фильтр

      val idsSet = sc.broadcast(ids.toSet) 
      rdd.filter{case (k, v) => idsSet.value.contains(k)} 
      
    • присоединиться

      val idsRdd = sc.parallelize(ids).map((_, null)) 
      idsRdd.join(rdd).map{case (k, (_, v)) => (k, v)} 
      
    • IndexedRDD - это не нравится особенно активный проект, хотя

  5. С записями 10M вы, вероятно, будете лучше искать локально в памяти, чем использовать Spark. Для более крупных данных вы должны рассмотреть возможность использования надлежащего хранилища ключей.

+0

Спасибо за подсказки! Я интегрировал LevelDB в свое приложение, используя теперь как LevelDB, так и Spark, и я мог бы значительно увеличить время запроса! Я также экспериментировал с IndexedRDD, который работает очень хорошо, если все данные должны храниться в памяти, чего я хотел избежать. – navige

0

Я новичок в Apache Spark, но пытался решить вашу проблему, пожалуйста, оценить его, если он может помочь вам с проблемой сериализации, это происходит потому, что для искры - Hadoop LongWritable и другие writables не являются сериализованная.

val temp_rdd = sc.parallelize(ids.map(id => 
sc.sequenceFile("file", classOf[LongWritable], classOf[LongWritable]).toArray.toSeq 
)).flatMap(identity) 

ids.foreach(id =>temp_rdd.lookup(new LongWritable(id))) 
Смежные вопросы