Вы можете использовать метод wholeTextFiles(String path, int minPartitions)
из JavaSparkContext
вернуть JavaPairRDD<String,String>
где ключ имя файла и значение является строка, содержащая все содержимое файла (таким образом, каждая запись в этом РДУ представляет собой файл). Отсюда просто запустите map()
, который будет называть indexOf(String searchString)
на каждое значение. Это должно возвращать первый индекс в каждом файле с появлением соответствующей строки.
(EDIT :)
Так что найти смещение в распределенном виде одного файла (в вашем случае использования ниже в комментариях) возможно. Ниже приведен пример, который работает в Scala.
val searchString = *search string*
val rdd1 = sc.textFile(*input file*, *num partitions*)
// Zip RDD lines with their indices
val zrdd1 = rdd1.zipWithIndex()
// Find the first RDD line that contains the string in question
val firstFind = zrdd1.filter { case (line, index) => line.contains(searchString) }.first()
// Grab all lines before the line containing the search string and sum up all of their lengths (and then add the inline offset)
val filterLines = zrdd1.filter { case (line, index) => index < firstFind._2 }
val offset = filterLines.map { case (line, index) => line.length }.reduce(_ + _) + firstFind._1.indexOf(searchString)
Обратите внимание, что вам дополнительно потребуется добавить любые символы новой строки вручную поверх этого, так как они не учитывается (формат ввод использует новые строки как разграничения между записями). Количество новых строк - это просто количество строк перед строкой, содержащей строку поиска, поэтому это тривиально для добавления.
Я не совсем хорошо знаком с API Java и, к сожалению, это не совсем легко проверить, поэтому я не уверен, что код ниже работает, но имеет на нем (также я использовал Java 1.7, но 1.8 сжимает много этот код с лямбда-выражений):.
String searchString = *search string*;
JavaRDD<String> data = jsc.textFile("hdfs://ourip/test/testdata.txt");
JavaRDD<Tuple2<String, Long>> zrdd1 = data.zipWithIndex();
Tuple2<String, Long> firstFind = zrdd1.filter(new Function<Tuple2<String, Long>, Boolean>() {
public Boolean call(Tuple2<String, Long> input) { return input.productElement(0).contains(searchString); }
}).first();
JavaRDD<Tuple2<String, Long>> filterLines = zrdd1.filter(new Function<Tuple2<String, Long>, Boolean>() {
public Boolean call(Tuple2<String, Long> input) { return input.productElement(1) < firstFind.productElement(1); }
});
Long offset = filterLines.map(new Function<Tuple2<String, Long>, Int>() {
public Int call(Tuple2<String, Long> input) { return input.productElement(0).length(); }
}).reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
}) + firstFind.productElement(0).indexOf(searchString);
Это может быть сделано только тогда, когда ваш вход один файл (так как в противном случае, zipWithIndex()
не гарантирует смещения в файле), но этот метод работает для РДУ из любое количество разделов, поэтому не стесняйтесь разбивать свой файл на любое количество фрагментов.
Я не уверен, что вы имеете в виду смещение здесь. Можете быть более конкретными? – eliasah
Я ищу смещение байта в файле. Например, если у меня был текст: 1 а
2 б
3 с
и я хочу, чтобы найти смещение байта для символа «Ъ» в файле. (В этом случае это будет 6 (пробелы + '\ n'). Если Spark не участвует в этом процессе, это просто, но когда Spark читает эти файлы, они будут разбиты на строки. Таким образом, код выше, может принимать «2 b» в качестве входных данных. Он может вычислять смещение байта относительно этой строки, но как мне получить смещение байта относительно файла? – Gary