2015-10-21 2 views
0

Я искал некоторые файлы данных (~ 20 ГБ). Я хотел бы найти некоторые конкретные термины в этих данных и отметить смещение для матчей. Есть ли способ, чтобы Spark идентифицировал смещение для части данных, над которыми я работаю?Как определить смещение в Apache Spark?

import org.apache.spark.api.java.*; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.function.Function; 

import java.util.regex.*; 

public class Grep { 
     public static void main(String args[]) { 
      SparkConf  conf  = new SparkConf().setMaster("spark://ourip:7077"); 
      JavaSparkContext jsc  = new JavaSparkContext(conf); 
      JavaRDD<String> data  = jsc.textFile("hdfs://ourip/test/testdata.txt"); // load the data from HDFS 
      JavaRDD<String> filterData = data.filter(new Function<String, Boolean>() { 
        // I'd like to do something here to get the offset in the original file of the string "babe ruth" 
        public Boolean call(String s) { return s.toLowerCase().contains("babe ruth"); } // case insens matching 

      }); 

      long matches = filterData.count(); // count the hits 

      // execute the RDD filter 
      System.out.println("Lines with search terms: " + matches); 
); 
     } // end main 
} // end class Grep 

Я хотел бы сделать что-то в «фильтр» операции для вычисления смещения «Babe Ruth» в исходном файле. Я могу получить смещение «babe ruth» в текущей строке, но что это за процесс или функция, которая сообщает мне смещение строки внутри файла?

+0

Я не уверен, что вы имеете в виду смещение здесь. Можете быть более конкретными? – eliasah

+0

Я ищу смещение байта в файле. Например, если у меня был текст: 1 а
2 б
3 с
и я хочу, чтобы найти смещение байта для символа «Ъ» в файле. (В этом случае это будет 6 (пробелы + '\ n'). Если Spark не участвует в этом процессе, это просто, но когда Spark читает эти файлы, они будут разбиты на строки. Таким образом, код выше, может принимать «2 b» в качестве входных данных. Он может вычислять смещение байта относительно этой строки, но как мне получить смещение байта относительно файла? – Gary

ответ

2

In Spark common Hadoop Формат ввода может использоваться. Чтобы прочитать смещение байта из файла, вы можете использовать класс TextInputFormat от Hadoop (org.apache.hadoop.mapreduce.lib.input). Он уже поставляется вместе с Spark.

Он будет читать файл как ключа (байт смещения) и значение (текстовую строку):

An InputFormat для простых текстовых файлов. Файлы разбиваются на строки. Для сигнализации конца строки используются либо возврат линии, либо возврат каретки. Ключи - это позиция в файле, а значения - строка текста.

В Спарк его можно использовать по телефону newAPIHadoopFile()

SparkConf conf = new SparkConf().setMaster(""); 
JavaSparkContext jsc = new JavaSparkContext(conf); 

// read the content of the file using Hadoop format 
JavaPairRDD<LongWritable, Text> data = jsc.newAPIHadoopFile(
     "file_path", // input path 
     TextInputFormat.class, // used input format class 
     LongWritable.class, // class of the value 
     Text.class, // class of the value 
     new Configuration());  

JavaRDD<String> mapped = data.map(new Function<Tuple2<LongWritable, Text>, String>() { 
    @Override 
    public String call(Tuple2<LongWritable, Text> tuple) throws Exception { 
     // you will get each line from as a tuple (offset, text)  
     long pos = tuple._1().get(); // extract offset 
     String line = tuple._2().toString(); // extract text 

     return pos + " " + line; 
    } 
}); 
+0

это функция, которую я ищу. конечно работает. Спасибо! – Gary

0

Вы можете использовать метод wholeTextFiles(String path, int minPartitions) из JavaSparkContext вернуть JavaPairRDD<String,String> где ключ имя файла и значение является строка, содержащая все содержимое файла (таким образом, каждая запись в этом РДУ представляет собой файл). Отсюда просто запустите map(), который будет называть indexOf(String searchString) на каждое значение. Это должно возвращать первый индекс в каждом файле с появлением соответствующей строки.

(EDIT :)

Так что найти смещение в распределенном виде одного файла (в вашем случае использования ниже в комментариях) возможно. Ниже приведен пример, который работает в Scala.

val searchString = *search string* 
val rdd1 = sc.textFile(*input file*, *num partitions*) 

// Zip RDD lines with their indices 
val zrdd1 = rdd1.zipWithIndex() 

// Find the first RDD line that contains the string in question 
val firstFind = zrdd1.filter { case (line, index) => line.contains(searchString) }.first() 

// Grab all lines before the line containing the search string and sum up all of their lengths (and then add the inline offset) 
val filterLines = zrdd1.filter { case (line, index) => index < firstFind._2 } 
val offset = filterLines.map { case (line, index) => line.length }.reduce(_ + _) + firstFind._1.indexOf(searchString) 

Обратите внимание, что вам дополнительно потребуется добавить любые символы новой строки вручную поверх этого, так как они не учитывается (формат ввод использует новые строки как разграничения между записями). Количество новых строк - это просто количество строк перед строкой, содержащей строку поиска, поэтому это тривиально для добавления.

Я не совсем хорошо знаком с API Java и, к сожалению, это не совсем легко проверить, поэтому я не уверен, что код ниже работает, но имеет на нем (также я использовал Java 1.7, но 1.8 сжимает много этот код с лямбда-выражений):.

String searchString = *search string*; 
JavaRDD<String> data = jsc.textFile("hdfs://ourip/test/testdata.txt"); 

JavaRDD<Tuple2<String, Long>> zrdd1 = data.zipWithIndex(); 

Tuple2<String, Long> firstFind = zrdd1.filter(new Function<Tuple2<String, Long>, Boolean>() { 
     public Boolean call(Tuple2<String, Long> input) { return input.productElement(0).contains(searchString); } 
    }).first(); 

JavaRDD<Tuple2<String, Long>> filterLines = zrdd1.filter(new Function<Tuple2<String, Long>, Boolean>() { 
     public Boolean call(Tuple2<String, Long> input) { return input.productElement(1) < firstFind.productElement(1); } 
    }); 

Long offset = filterLines.map(new Function<Tuple2<String, Long>, Int>() { 
     public Int call(Tuple2<String, Long> input) { return input.productElement(0).length(); } 
    }).reduce(new Function2<Integer, Integer, Integer>() { 
     public Integer call(Integer a, Integer b) { return a + b; } 
    }) + firstFind.productElement(0).indexOf(searchString); 

Это может быть сделано только тогда, когда ваш вход один файл (так как в противном случае, zipWithIndex() не гарантирует смещения в файле), но этот метод работает для РДУ из любое количество разделов, поэтому не стесняйтесь разбивать свой файл на любое количество фрагментов.

+0

Итак, моя ситуация в том, что есть один файл размером ~ 20 ГБ. метод, который вы описываете, будет ли каждый экземпляр Spark получать копию этого файла в этом случае? Разве это не повлечет за собой по существу поражение Spark в этом случае? Я надеюсь, что все еще будет работать множество узлов из разных смещений в файле, чтобы получить ответ быстро. Теоретически, подсистема Spark знает о смещении + длины куска, над которым я работаю. Есть ли способ получить к нему доступ? – Gary

+0

Эй, @Gary, я обновил свой ответ с тем, что, по моему мнению, может помочь в вашем случае использования Дайте мне знать, если это поможет! –