2013-09-21 3 views
2

Сегодня я начал работать над пакетами rhdfs и rmr2.отладка функции mapreduce() в R

Функция mapreduce() на 1D-векторе хорошо работала, как и ожидалось. кусок кода на 1D вектор

a1 <- to.dfs(1:20) 
a2 <- mapreduce(input=a1, map=function(k,v) keyval(v, v^2)) 
a3 <- as.data.frame(from.dfs(a2()) 

Он возвращает следующие dataframe

Key Val 
1  1 1 
2 10 100 
3 11 121 
4 12 144 
5 13 169 
6 14 196 
7 15 225 
8 16 256 
9 17 289 
10 18 324 
11 19 361 
12 2 4 
13 20 400 
14 3 9 
15 4 16 
16 5 25 
17 6 36 
18 7 49 
19 8 64 
20 9 81 

До сих пор, это было прекрасно.

Но, работая над функцией mapreduce на наборе данных mtcars, я получил следующее сообщение об ошибке. Не удалось отладить его дальше. Пожалуйста, дайте подсказку двигаться вперед.

Мой кусок кода:

rs1 <- mapreduce(input=mtcars, 
        map=function(k, v) { 
         if (mtcars$hp > 150) keyval("Bigger", 1) }, 
        reduce=function(k, v) keyval(k, sum(v)) 
       ) 

Сообщение об ошибке с помощью данной части кода.

13/09/21 07:24:49 ERROR streaming.StreamJob: Missing required option: input 
Usage: $HADOOP_HOME/bin/hadoop jar \ 
      $HADOOP_HOME/hadoop-streaming.jar [options] 
Options: 
    -input <path>  DFS input file(s) for the Map step 
    -output <path>  DFS output directory for the Reduce step 
    -mapper <cmd|JavaClassName>  The streaming command to run 
    -combiner <cmd|JavaClassName> The streaming command to run 
    -reducer <cmd|JavaClassName>  The streaming command to run 
    -file  <file>  File/dir to be shipped in the Job jar file 
    -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional. 
    -outputformat TextOutputFormat(default)|JavaClassName Optional. 
    -partitioner JavaClassName Optional. 
    -numReduceTasks <num> Optional. 
    -inputreader <spec> Optional. 
    -cmdenv <n>=<v> Optional. Pass env.var to streaming commands 
    -mapdebug <path> Optional. To run this script when a map task fails 
    -reducedebug <path> Optional. To run this script when a reduce task fails 
    -io <identifier> Optional. 
    -verbose 

Generic options supported are 
-conf <configuration file>  specify an application configuration file 
-D <property=value>   use value for given property 
-fs <local|namenode:port>  specify a namenode 
-jt <local|jobtracker:port> specify a job tracker 
-files <comma separated list of files> specify comma separated files to be copied to the map reduce cluster 
-libjars <comma separated list of jars> specify comma separated jar files to include in the classpath. 
-archives <comma separated list of archives> specify comma separated archives to be unarchived on the compute machines. 

The general command line syntax is 
bin/hadoop command [genericOptions] [commandOptions] 


For more details about these options: 
Use $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar -info 

Streaming Command Failed! 
Error in mr(map = map, reduce = reduce, combine = combine, vectorized.reduce, : 
    hadoop streaming failed with error code 1 

Быстрые и детальные ответы высоко оценены ...

ответ

2

Данные, которые вы передаете для Keyval, думает как вектор, его не единое целое. Попробуйте интерпретировать снизу код.

Попытка локально

  • загрузка данных

    data(mtcars) 
    
  • просмотреть несколько строк данных

    head(mtcars) 
    hpTest=mtcars$hp # taking required data 
    print(hpTest) 
    
  • Окончательная сумма

    sum(hpTest[which(hpTest>150)]) # 2804 
    

Бег на Hadoop MapReduce-

  • экспортирующих переменные Env

    # requied 
    Sys.setenv(HADOOP_HOME="/home/trendwise/apache/hadoop-1.0.4"); 
    Sys.setenv(HADOOP_CMD="/home/trendwise/apache/hadoop-1.0.4/bin/hadoop"); 
    
    #optional 
    Sys.setenv(HADOOP_BIN="/home/trendwise/apache/hadoop-1.0.4/bin"); 
    Sys.setenv(HADOOP_CONF_DIR="/home/trendwise/apache/hadoop-1.0.4/conf"); 
    Sys.setenv(HADOOP_STREAMING='/home/trendwise/apache/hadoop-1.0.4/contrib/streaming/hadoop-streaming-1.0.4.jar') 
    Sys.setenv(LD_LIBRARY_PATH="/lib:/lib/x86_64-linux-gnu:/lib64:/usr/lib:/usr/lib64:/usr/local/lib:/usr/local/lib64:/usr/lib/jvm/jdk1.7.0_10/lib:/usr/lib/jvm/jdk1.7.0_10/jre/lib:/usr/lib/jvm/jdk1.7.0_10/jre/lib/amd64:/usr/lib/jvm/jdk1.7.0_10/jre/lib/amd64/server"); 
    
  • Загрузка библиотеки

    library(rmr2) 
    library(rhdfs) 
    
  • инициализацией

    hdfs.init() 
    
  • положить вклад в HDFS

    hpInput = to.dfs(mtcars$hp) 
    
  • работает MapReduce

    mapReduceResult <- mapreduce(input=hpInput, 
          map=function(k, v) { keyval(rep(1,length(which(inputData > 150))) ,v[which(v>150)])} , 
          reduce=function(k2, v2){ keyval(k2, sum(v2))} 
    
  • просмотра значения выходной MR

    from.dfs(mapReduceResult) 
    
  • выход

    $key 
    [1] 1 
    
    $val 
    [1] 2804  
    
0

Вы можете использовать встроенные отладки функциональности в новейшей RStudio. Просто перепишите код в локальном режиме MR

+0

Фактически, нет необходимости переписывать код, RHadoop имеет локальный режим –

Смежные вопросы