2016-03-25 3 views
1

я имел следующий набор данных в качестве входных данных
Сформировать несколько выходных файлов с помощью MultiSinkTap

id,name,gender 
asinha161,Aniruddha,Male 
vic,Victor,Male 
day1,Daisy,Female 
jazz030,Jasmine,Female 
Mic002,Michael,Male 

I, направленные на сегрегацию самцов и самок в два отдельных выходных файлов следующим образом
Dataset для мужчин

id,name,gender 
asinha161,Aniruddha,Male 
vic,Victor,Male 
Mic002,Michael,Male 

Набор данных для самок

id,name,gender 
day1,Daisy,Female 
jazz030,Jasmine,Female 

Теперь, я попытался написать каскадный Framework код, который должен делать выше задачи, код выглядит следующим образом

public class Main { 

     public static void main(String[] args) { 
      Tap sourceTap = new FileTap(new TextDelimited(true, ","),  "inputFile.txt"); 
      Tap sink_one = new FileTap(new TextDelimited(true, ","), "maleFile.txt"); 
      Tap sink_two = new FileTap(new TextDelimited(true, ","), "FemaleFile.txt"); 

      Pipe assembly = new Pipe("inputPipe"); 


      // ...split into two pipes 
      Pipe malePipe = new Pipe("for_male", assembly); 
      malePipe=new Each(malePipe,new CustomFilterByGender("male")); 
      Pipe femalePipe = new Pipe("for_female", assembly); 
      femalePipe=new Each(femalePipe, new CustomFilterByGender("female")); 
      // create the flow 
      List<Pipe> pipes = new ArrayList<Pipe>(2) 
     {{pipes.add(countOne); 
      pipes.add(countTwo);}}; 

      Tap outputTap=new MultiSinkTap<>(sink_one,sink_two); 

      FlowConnector flowConnector = new LocalFlowConnector(); 
      Flow flow = flowConnector.connect(sourceTap, outputTap, pipes); 
      flow.complete(); 
     } 

где CustomFilterByGender (String пол); это настраиваемая функция, которая возвращает кортежи в соответствии с гендерным значением, переданным в качестве аргумента.

Обратите внимание, что я не использовал Custom Buffer ради эффективности.
Использование MultiSinkTap, я не могу получить желаемый результат, так как метод connect() объекта LocalFlowConnector не принимает объект MultiSinkTap, что приводит к ошибке времени компиляции.
Будет необходимо, если вы предложите возможные изменения в приведенном выше коде, чтобы заставить его работать или использовать MultiSinkTap.
Спасибо за терпеливое рассмотрение вопроса :)

ответ

4

Я думаю, что вы хотите написать вывод различных труб в разные выходные файлы, я внесла некоторые изменения в ваш код, который должен определенно решить вашу задачу.

public class Main { 
    public static void main(String[] args) { 
     Tap sourceTap = new FileTap(new TextDelimited(true, ","), "inputFile.txt"); 
     Tap sink_one = new FileTap(new TextDelimited(true, ","), "maleFile.txt"); 
     Tap sink_two = new FileTap(new TextDelimited(true, ","), "FemaleFile.txt"); 

     Pipe assembly = new Pipe("inputPipe"); 

     Pipe malePipe = new Pipe("for_male", assembly); 
     malePipe=new Each(malePipe,new CustomFilterByGender("male")); 
     Pipe femalePipe = new Pipe("for_female", assembly); 
     femalePipe=new Each(femalePipe, new CustomFilterByGender("female")); 

     List<Pipe> pipes = new ArrayList<Pipe>(2); 
     pipes.add(malePipe); 
     pipes.add(femalePipe); 

     Map<String, Tap> sinks = new HashMap<String, Tap>(); 
     sinks.put("for_male", sink_one); 
     sinks.put("for_female", sink_two); 

     FlowConnector flowConnector = new LocalFlowConnector(); 
     Flow flow = flowConnector.connect(sourceTap, sinks, pipes); 
     flow.complete(); 
    } 

Вместо использования MultiSinkTap вы можете напрямую дать Map <> моек те, которые вы хотите подключить к выходным трубам в этом случае malePipe и femalePipe.

+0

Да, это сработало !!! –

+0

Может ли кто-нибудь сказать мне, как использовать multiSinkTap –

Смежные вопросы