2013-05-24 4 views
0

Начиная играть с каскадом на Amazon EMR, удалось запустить его, но падал с довольно простым препятствием, и я надеялся, что кто-то сможет пролить свет на него.Каскадирование - парсер регулярного выражения - неправильное количество полей

Мой код:

import java.util.Properties; 

import cascading.flow.Flow; 
import cascading.flow.FlowDef; 
import cascading.flow.hadoop.HadoopFlowConnector; 
import cascading.pipe.Pipe; 
import cascading.property.AppProps; 
import cascading.scheme.hadoop.TextLine; 
import cascading.tap.Tap; 
import cascading.tap.hadoop.Hfs; 
import cascading.tuple.Fields; 
import cascading.operation.regex.RegexParser; 
import cascading.pipe.Each; 
import cascading.tap.SinkMode; 

public class Main 
    { 
    public static void 
    main(String[] args) 
    { 
    String inPath = args[ 0 ]; 
    String outPath = args[ 1 ]; 

    Properties properties = new Properties(); 
    AppProps.setApplicationJarClass(properties, Main.class); 
    HadoopFlowConnector flowConnector = new HadoopFlowConnector(properties); 

    // create the source tap 
    TextLine sourceScheme = new TextLine(new Fields("line")); 
    Tap inTap = new Hfs(sourceScheme, inPath); 

    // create the sink tap 
    TextLine sinkScheme = new TextLine(new Fields("custid", "movieids")); 
    Tap outTap = new Hfs(sinkScheme, outPath, SinkMode.REPLACE); 

    Fields filmFields = new Fields("custid", "movieids"); 

    String filmRegex = "([0-9]:*[,.]*)"; 

    RegexParser parser = new RegexParser(filmFields, filmRegex); 

    Pipe importPipe = new Each("import", new Fields("line"), parser, Fields.RESULTS); 

    // connect the taps, pipes, etc., into a flow 
    Flow parsedFlow = new HadoopFlowConnector(properties).connect(inTap, outTap, importPipe); 

    // run the flow 
    parsedFlow.start(); 
    parsedFlow.complete(); 
    } 
    } 

Мой вход (нет пустых строк):

1: 2

2: 4

5: 1

3: 9

My Выход:

Task TASKID="task_201305241444_0003_m_000000" TASK_TYPE="MAP" TASK_STATUS="FAILED" FINISH_TIME="1369408133954" ERROR="cascading\.tuple\.TupleException: operation added the wrong number of fields, expected: ['custid', 'movieids'], got result size: 1 
    at cascading\.tuple\.TupleEntryCollector\.add(TupleEntryCollector\.java:82) 
    at cascading\.operation\.regex\.RegexParser\.onFoundGroups(RegexParser\.java:168) 
    at cascading\.operation\.regex\.RegexParser\.operate(RegexParser\.java:151) 
    at cascading\.flow\.stream\.FunctionEachStage\.receive(FunctionEachStage\.java:99) 
    at cascading\.flow\.stream\.FunctionEachStage\.receive(FunctionEachStage\.java:39) 
    at cascading\.flow\.stream\.SourceStage\.map(SourceStage\.java:102) 
    at cascading\.flow\.stream\.SourceStage\.run(SourceStage\.java:58) 
    at cascading\.flow\.hadoop\.FlowMapper\.run(FlowMapper\.java:127) 
    at org\.apache\.hadoop\.mapred\.MapTask\.runOldMapper(MapTask\.java:441) 
    at org\.apache\.hadoop\.mapred\.MapTask\.run(MapTask\.java:377) 
    at org\.apache\.hadoop\.mapred\.Child$4\.run(Child\.java:255) 
    at java\.security\.AccessController\.doPrivileged(Native Method) 
    at javax\.security\.auth\.Subject\.doAs(Subject\.java:396) 
    at org\.apache\.hadoop\.security\.UserGroupInformation\.doAs(UserGroupInformation\.java:1132) 
    at org\.apache\.hadoop\.mapred\.Child\.main(Child\.java:249) 

ГСС экс проверяет штраф в http://regexpal.com/

Большое спасибо

Дункан

ответ

1

Вы получите исключение, потому что регулярное выражение дает один результат, где два исключены поля результатов (а именно «custid» и «movieids»), поскольку регулярное выражение содержит только одну группу (...).

Если вы просто хотите разделить на двоеточие, либо использовать выражение с 2 группами, например:

String filmRegex = "(\\d):(\\d)"; 

или \d+, соответственно, если ваши номера могут иметь более чем одну цифру.

Или, более легко, просто разделить входные данные в свои поля автоматически при чтении из файла, используя схему, TextDelimited ввода:

Scheme sourceScheme = new TextDelimited(new Fields("custid", "movieids"), ":"); 
Смежные вопросы