2015-09-09 3 views
2

Я пишу искрение, пытаясь прочитать текстовый файл с помощью scala, на моей локальной машине отлично работает.Spark: прочитайте csv-файл с s3 с помощью scala

val myFile = "myLocalPath/myFile.csv" 
    for (line <- Source.fromFile(myFile).getLines()) { 
    val data = line.split(",") 
    myHashMap.put(data(0), data(1).toDouble) 
    } 

Тогда я попытался заставить его работать на AWS, я сделал следующее, но не похоже, чтобы прочитать весь файл правильно. Каким должен быть правильный способ чтения такого текстового файла на s3? Большое спасибо!

val credentials = new BasicAWSCredentials("myKey", "mySecretKey"); 
val s3Client = new AmazonS3Client(credentials); 
val s3Object = s3Client.getObject(new GetObjectRequest("myBucket", "myFile.csv")); 

val reader = new BufferedReader(new InputStreamReader(s3Object.getObjectContent())); 

var line = "" 
while ((line = reader.readLine()) != null) { 
     val data = line.split(",") 
     myHashMap.put(data(0), data(1).toDouble) 
     println(line); 
} 

ответ

0

Я думаю, что я получил это работает, как показано ниже:

val s3Object= s3Client.getObject(new GetObjectRequest("myBucket", "myPath/myFile.csv")); 

    val myData = Source.fromInputStream(s3Object.getObjectContent()).getLines() 
    for (line <- myData) { 
     val data = line.split(",") 
     myMap.put(data(0), data(1).toDouble) 
    } 

    println(" my map : " + myMap.toString()) 
1

Читайте в csv-файле с помощью sc.textFile("s3://myBucket/myFile.csv"). Это даст вам RDD [String]. Получите, что в карту

val myHashMap = data.collect 
        .map(line => { 
         val substrings = line.split(" ") 
         (substrings(0), substrings(1).toDouble)}) 
        .toMap 

Вы можете использование sc.broadcast транслировать свою карту, так что он легко доступен на всех рабочих узлов.

(Обратите внимание, что вы можете, конечно, также использовать Databricks «искрового CSV» пакет для чтения в CSV-файл, если вы предпочитаете.)

+0

Функция моей полезности требует myHashMap. поэтому мой код выглядит следующим образом: output = input.map {t => myUtiltyFunction (myHashMap, t)} можно ли каждый раз передавать myHashMap в myUtiltiyFunction? Есть ли способ использовать трансляцию myHashMap и позволить myUtitlityFunction знать это напрямую? Большое спасибо! – Edamame

+0

Кроме того, я не хотел использовать sc.textFile («s3: //myBucket/myFile.csv»), потому что я хочу сделать общий код даже в случае отсутствия контекста искры. Благодарю. – Edamame

+0

Вы понимаете, что если вы позволите своей функции полезности прочитать карту напрямую, и вы используете функцию утилиты, как вы описываете 'output = input.map {t => myUtiltyFunction (...)}', карта будет считана и созданный для каждой строки вашего входного rdd. Я действительно не думаю, что ты этого хочешь. Если вы передаете переменную (используя «sc.broadcast'), с другой стороны, вы читаете и создаете карту только один раз на своем драйвере, а затем все ваши работники имеют прямой доступ к ней. Почему вы не хотите передавать карту в функцию полезности? Мне это кажется странным. –

Смежные вопросы