2013-11-02 4 views
0

У меня возникли проблемы с поиском использования бинарного выхода для потоковой передачи данных hadoop в качестве входных данных для другого преобразования потоковой передачи хаоу.Каскадирование hadoop потокового mapreductions с двоичными данными

echo.py:

import sys 

while True: 
    buffer = sys.stdin.read(1024) 
    if not buffer: 
    break 
    sys.stdout.write(buffer) 
    sys.stdout.flush() 

$ XXD input.txt

0000000: 6b31 0976 310a 6b32 0976 320a 6b33 0976 k1.v1.k2.v2.k3.v 
0000010: 330a 6b34 0976 340a      3.k4.v4. 

С помощью следующих команд, я ожидал получить output.seq.txt файл, который выглядит как input.txt но этого не происходит. Зачем?

./hadoop/bin/hadoop dfs -rmr /samples/output.seq ;\ 
./hadoop/bin/hadoop jar ./hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar \ 
-D 'stream.map.input=typedbytes' \ 
-D 'stream.map.output=typedbytes' \ 
-D 'stream.reduce.input=typedbytes' \ 
-D 'stream.reduce.output=typedbytes' \ 
-D 'mapred.job.name=echo.py (1/2)' \ 
-mapper 'python -m echo map 0 262144000' \ 
-reducer 'python -m echo red 0 262144000' \ 
-file echo.py \ 
-inputformat 'org.apache.hadoop.mapred.KeyValueTextInputFormat' \ 
-input /samples/input.txt \ 
-outputformat 'org.apache.hadoop.mapred.SequenceFileOutputFormat' \ 
-output /samples/output.seq ;\ 
./hadoop/bin/hadoop dfs -cat /samples/output.seq/part-0000 > output.seq 

$ XXD output.seq

0000000: 5345 5106 2f6f 7267 2e61 7061 6368 652e SEQ./org.apache. 
0000010: 6861 646f 6f70 2e74 7970 6564 6279 7465 hadoop.typedbyte 
0000020: 732e 5479 7065 6442 7974 6573 5772 6974 s.TypedBytesWrit 
0000030: 6162 6c65 2f6f 7267 2e61 7061 6368 652e able/org.apache. 
0000040: 6861 646f 6f70 2e74 7970 6564 6279 7465 hadoop.typedbyte 
0000050: 732e 5479 7065 6442 7974 6573 5772 6974 s.TypedBytesWrit 
0000060: 6162 6c65 0000 0000 0000 41e2 785b 996d able......A.x[.m 
0000070: f015 772c 9c66 10d4 13e7 0000 0012 0000 ..w,.f.......... 
0000080: 0009 0000 0005 0700 0000 0000 0000 0507 ................ 
0000090: 0000 0000 0000 0016 0000 000b 0000 0007 ................ 
00000a0: 0700 0000 026b 3100 0000 0707 0000 0002 .....k1......... 
00000b0: 7631 0000 0016 0000 000b 0000 0007 0700 v1.............. 
00000c0: 0000 026b 3200 0000 0707 0000 0002 7632 ...k2.........v2 
00000d0: 0000 0016 0000 000b 0000 0007 0700 0000 ................ 
00000e0: 026b 3300 0000 0707 0000 0002 7633 0000 .k3.........v3.. 
00000f0: 0016 0000 000b 0000 0007 0700 0000 026b ...............k 
0000100: 3400 0000 0707 0000 0002 7634   4.........v4 

Тогда

./hadoop/bin/hadoop dfs -rmr /samples/output.seq.txt ;\ 
./hadoop/bin/hadoop jar ./hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar \ 
-D 'stream.map.input=typedbytes' \ 
-D 'stream.map.output=typedbytes' \ 
-D 'stream.reduce.input=typedbytes' \ 
-D 'stream.reduce.output=typedbytes' \ 
-D 'mapred.job.name=echo.py (2/2)' \ 
-mapper 'python -m echo map 0 262144000' \ 
-reducer 'python -m echo red 0 262144000' \ 
-file echo.py \ 
-inputformat 'org.apache.hadoop.mapred.SequenceFileInputFormat' \ 
-input /samples/output.seq/part-00000 \ 
-outputformat 'org.apache.hadoop.mapred.TextOutputFormat' \ 
-output /samples/output.seq.txt ;\ 
./hadoop/bin/hadoop dfs -cat /samples/output.seq.txt/part-0000 > output.seq.txt 

$ XXD output.seq.txt

0000000: 5345 5106 2f6f 7267 2e61 7061 6368 652e SEQ./org.apache. 
0000010: 6861 646f 6f70 2e74 7970 6564 6279 7465 hadoop.typedbyte 
0000020: 732e 5479 7065 6442 7974 6573 5772 6974 s.TypedBytesWrit 
0000030: 6162 6c65 2f6f 7267 2e61 7061 6368 652e able/org.apache. 
0000040: 6861 646f 6f70 2e74 7970 6564 6279 7465 hadoop.typedbyte 
0000050: 732e 5479 7065 6442 7974 6573 5772 6974 s.TypedBytesWrit 
0000060: 6162 6c65 0000 0000 0000 41ef bfbd 785b able......A...x[ 
0000070: efbf bd6d efbf bd15 772c efbf bd66 10ef ...m....w,...f.. 
0000080: bfbd 13ef bfbd 0000 0012 0000 0009 0000 ................ 
0000090: 0005 0700 0000 0000 0000 0507 0000 0000 ................ 
00000a0: 0000 0016 0000 000b 0000 0007 0700 0000 ................ 
00000b0: 026b 3100 0000 0707 0000 0002 7631 0000 .k1.........v1.. 
00000c0: 0016 0000 000b 0000 0007 0700 0000 026b ...............k 
00000d0: 3200 0000 0707 0000 0002 7632 0000 0016 2.........v2.... 
00000e0: 0000 000b 0000 0007 0700 0000 026b 3300 .............k3. 
00000f0: 0000 0707 0000 0002 7633 0000 0016 0000 ........v3...... 
0000100: 000b 0000 0007 0700 0000 026b 3400 0000 ...........k4... 
0000110: 0707 0000 0002 7634 0a     ......v4. 

(Side вопрос: вышеуказанные команды принимают ~ 30-60, чтобы закончить каждый. их так медленно !?)

ответ

0

Для вашей первой работы ваш ввод выглядит как текст, поэтому вы можете пропустить -inputformat и указать свои входы и выходы как typedbytes. Вы должны получить SequenceFile, который использует org.apache.hadoop.io.Text как для ключей, так и для значений. В настоящее время это может быть двойное кодирование значений, возвращаемых Python.

Я недавно врывался в это, так что, возможно, Binary Streaming with Hadoop (and Node.js) поможет разобраться.

Смежные вопросы