2015-05-06 2 views
1

Я пытаюсь использовать API MaxMind GeoIP для scala-spark, который найден https://github.com/snowplow/scala-maxmind-iplookups. Я загрузит файл, используя стандарт:using maxmind geoip in spark serialized

val ipLookups = IpLookups(geoFile = Some("GeoLiteCity.dat"), memCache = false, lruCache = 20000) 

У меня есть базовый файл CSV, который я загружаю в том, что содержит время и IP-адресов:

val sweek1 = week1.map{line=> IP(parse(line))}.collect{ 
    case Some(ip) => { 
    val ipadress = ipdetect(ip.ip) 
    (ip.time, ipadress) 
    } 
} 

Функция ipdetect в основном определяется по формуле:

def ipdetect(a:String)={ 
    ipLookups.performLookups(a)._1 match{ 
    case Some(value) => value.toString 
    case _ => "Unknown" 
    } 
} 

Когда я запускаю эту программу, он предлагает «Задача не сериализуема». Поэтому я прочитал несколько сообщений, и, кажется, есть несколько способов обойти это.

1, a wrapper 2, используя SparkContext.addFile (который распространять файл через кластер)

, но я не могу работать, как ни один из них работает, я попробовал обертку, но я не знаю, как и где назови это. Я пробовал addFile, но он возвращает Unit вместо String, который, как я полагаю, вам нужно каким-то образом передать двоичный файл. Поэтому я не уверен, что делать сейчас. Любая помощь очень ценится

Так что я смог несколько сериализовать ее, используя mapPartitions и перебирать каждый локальный раздел, но мне интересно, есть ли более эффективный способ сделать это, поскольку у меня есть набор данных в диапазоне миллионов

ответ

4

Предположим, что ваш файл csv содержит IP-адрес в строке и, например, вы хотите сопоставить каждый IP-адрес в городе.

import com.snowplowanalytics.maxmind.iplookups.IpLookups 

val geoippath = "path/to/geoip.dat" 
val sc = new SparkContext(new SparkConf().setAppName("IP Converter")) 
sc.addFile(geoippath) 

def parseIP(ip:String, ipLookups: IpLookups): String = { 
    val lookupResult = ipLookups.performLookups(ip) 
    val city = lookupResult._1.map(_.city).getOrElse(None).getOrElse("") 
} 

val logs = sc.textFile("path/to/your.csv") 
      .mapWith(_ => IpLookups(geoFile = Some(SparkFiles.get("geoip.dat"))))(parseIP) 

Для других преобразований ip, пожалуйста, обратитесь к Scala MaxMind IP Lookups. Кроме того, mapWith, как представляется, устарел. Вместо этого используйте mapPartitionsWithIndex.

+1

Отличное решение, я не знал, как распределить данные по узлам, поэтому мне пришлось разрешить, запустив цикл в главном, который был очень неэффективным. Кстати, как вы пишете полный путь к файлу .dat? Я попробовал «файл: ///home/hadoop/geoip.dat», это не сработало (но просто работал «geoip.dat») – GameOfThrows

+0

Я также застрял в формате пути 'addFile', но я обнаружил, что 'addFile', похоже, принимает тот же формат пути, что и' sc.textFile'. Поэтому, как указано в [данном руководстве] (http://spark.apache.org/docs/latest/programming-guide.html), вы можете попробовать «/my/directory/geoip.dat» без указания «file: // /». –

Смежные вопросы