2015-08-25 2 views
0

У меня есть следующий код для обнаружения наиболее используемого домена верхнего уровня из событий. Я использую его для получения даты через Spark SQL.Apache Spark: «SparkException: задача не сериализуема» в искровой оболочке для RDD, созданной вручную

Функции сами испытаны и работают нормально. Я использую Amazon EMR и искровую оболочку. Когда искра посылает задачи узлам почти сразу, я получаю длинную трассировку стека и «SparkException: Task not serializable» в конце без каких-либо конкретных. Какая сделка здесь?

import scala.io.Source 
val suffixesStr = 
    Source.fromURL("https://publicsuffix.org/list/public_suffix_list.dat").mkString 
val suffList = 
    suffixesStr.lines.filter(line => !line.startsWith("//") && line.trim() != "") 
val suffListRDD = sc.parallelize(suffList.toList).collect() 

val cleanDomain = (domain: String) => { 
    var secLevelSuffix = 
    suffListRDD.find(suffix => domain.endsWith("."+suffix) && suffix.contains(".")) 
    var regex = """[^.]+\.[^.]+$""".r 
    if (!secLevelSuffix.isEmpty){ 
    regex = """[^.]+\.[^.]+\.[^.]+$""".r 
    } 
    var cleanDomain = regex.findFirstMatchIn(domain).map(_ group 0) 
    cleanDomain.getOrElse("") 
} 

val getDomain = (url: String) => { 
    val domain = """(?i)^(?:(?:https?):\/\/)?(?:(?:www|www1|www2|www3)\.)?([^:?#/\s]+)""".r.findFirstMatchIn(url).map(_ group 1) 
    var res = domain.getOrElse("") 
    res = res.toLowerCase() 
    if (res.contains("google.com")){ 
    res = res.replace("google.com.br", "google.com") 
    }else{ 
    res = cleanDomain(res) 
    } 
    res 
} 

sqlContext.udf.register("getDomain", getDomain) 
val domains = sqlContext.sql("SELECT count(*) c, domain from (SELECT getDomain(page_url) as domain FROM events) t group by domain order by c desc") 
domains.take(20).foreach(println) 

ответ

0

При определении RDD как программно в этом случае, не забудьте отметить то, что не будут повторены для рабочих узлов, как @transient.

В вас так:

@transient val suffixesStr = ... 
@transient val suffList = ... 
Смежные вопросы