2015-02-18 6 views
1

У меня есть файл с двумя столбцами id и timestamp. Я подсчитываю количество сеансов, каждое из которых имеет значение - определяется бездействием более 30 минут. Однако у меня возникают проблемы с потоковыми командами. Пример несколько строк выглядит следующим образом.Использование Hadoop Partitioner и Comparator Class Together

id,time 
1,2015-02-05 01:01:01 
1,2015-02-05 01:02:01 
3,2015-02-05 02:01:01 
3,2015-02-05 02:01:02 

я правильно знаю, что мой сопоставителя и редуктор работы б/с я получить правильные результаты, когда я использую только один редуктор. Моя проблема в том, когда мне нужно использовать более одного редуктора. Я пытаюсь использовать Partitioner для отправки первого значения вывода карты на один редуктор и сортировки его по второму значению на выходе карты. Любые предложения о том, как это сделать?

Это то, что я пытаюсь.

hadoop jar /opt/cloudera/parcels/CDH-5.1.2-1.cdh5.1.2.p470.103/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.3.0-mr1-cdh5.1.2.jar \ 
-Dmapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \ 
-D stream.map.output.field.separator=, \ 
-D stream.num.map.output.key.fields=2 \ 
-D mapred.text.key.partitioner.options=-k1,1 \ 
-Dmapred.text.key.comparator.options=-k2,2 \ 
-input /in/ \ 
-output /out/ \ 
-mapper mapper1.py \ 
-file ${DIR}mapper.py \ 
-reducer reducerA.py \ 
-file ${DIR}reducer.py 
+0

Да, вы можете. Но какова ваша конкретная проблема? Значения, поступающие к редукторам, не отсортированы должным образом или разделены? – yurgis

ответ

0

Изменить "-Dmapred.text.key.comparator.options = -k2,2" к "-Dmapred.text.key.comparator.options = -k1,2" так записи редуктор получает являются сначала отсортированы по id, а затем по времени. Также вашему редуктору необходимо сравнить последовательные ключи (id) записей и только подсчитывать сеансы для записей с равными идентификаторами.

Смежные вопросы