2010-11-13 2 views
11

У меня есть два файла, в следующих форматах:Как бы вы предложили выполнить «Присоединиться» к потоку Hadoop?

field1, field2, field3 
field4, field1, field5 

другое число поле указывает другое значение.

Я хочу, чтобы соединить два файла с помощью Hadoop Streaming на основе взаимного поля (field1 в приведенном выше примере), так что выход будет field1, field2, field3, field4, field5 (другие упорядоченности в порядке, как вперед, поскольку у них есть все поля).

ответ

6

Hadoop имеет библиотеку под названием KeyFieldBasedPartitioner http://hadoop.apache.org/mapreduce/docs/r0.21.0/api/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.html

Используя это как вариант в вашем запуске задания как секционирования для вашей потоковой работы позволяет разбить вывод сопоставителя на пары/значения ключей и имеют ключи получить хэшируются вверх вместе идти к тому же редуктор и сортировки, включая значения http://hadoop.apache.org/mapreduce/docs/r0.21.0/streaming.html#More+Usage+Examples

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ 
-D stream.map.output.field.separator=. \ 
-D stream.num.map.output.key.fields=4 \ 
-D mapreduce.map.output.key.field.separator=. \ 
-D mapreduce.partition.keypartitioner.options=-k1,2 \ 
-D mapreduce.job.reduces=12 \ 
-input myInputDirs \ 
-output myOutputDir \ 
-mapper org.apache.hadoop.mapred.lib.IdentityMapper \ 
-reducer org.apache.hadoop.mapred.lib.IdentityReducer \ 
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 

Здесь -Д stream.map.output.field.separator =. и -D stream.num.map.output.key.fields = 4 объясняются здесь http://hadoop.apache.org/mapreduce/docs/r0.21.0/streaming.html#Customizing+How+Lines+are+Split+into+Key%2FValue+Pairs. В основном это то, как вы вывели свои поля картографа, чтобы определить пары ключ/значение.

Ключи вывода карты вышеуказанного задания MapReduce обычно имеют четыре поля, разделенные символом «.». Однако структура MapReduce будет разбивать выходы карты на первые два поля ключей, используя параметр -D mapreduce.partition.keypartitioner.options = -k1,2. Здесь, -D mapreduce.map.output.key.field.separator =. определяет разделитель для раздела. Это гарантирует, что все пары ключ/значение с теми же первыми двумя полями в ключах будут разделены на один и тот же редуктор.

Это фактически эквивалентно определению первых двух полей в качестве первичного ключа и следующих двух полей в качестве вторичного. Первичный ключ используется для разбиения на разделы, и для сортировки используется комбинация первичного и вторичного ключей.

Для того, чтобы сделать соединение, вы можете просто вывести поля из вашего картографа и задать параметры запуска вашей конфигурации для полей, которые являются ключами, а редуктор будет иметь все ваши значения, соединенные соответствующим ключом , Если вы хотите брать данные из нескольких источников, просто добавляйте больше -input в командной строке ... если они разные длины ввода, то в вашем картографе вы можете распознать это и создать стандартный формат вывода из mapper.

0

Cascading предоставляет полезные абстракции для фильтрации, объединения и группировки по полю. https://stackoverflow.com/questions/4626356 ссылки на полезный пример использования потоковой передачи Hadoop в каскадировании.

Смежные вопросы