2016-03-22 1 views
10

У меня есть 3 каскадные трубы (один, чтобы присоединиться к двум другим), описанных в следующем,Создание пользовательских присоединиться логики в каскадного обеспечения MAP_SIDE только

  • LHSPipe - (больше размер)

enter image description here

  • RHSPipes - (меньше размера, которые могли бы, возможно, подходит к памятке Ry)

enter image description here

Psuedocode следующим образом, Этот пример включает в себя два присоединяется

ЕСЛИ F1DecidingFactor = ДА затем Регистрация LHSPipe с РИТ подстановок # 1 ПО (LHSPipe.F1Input = РИТ Lookup # 1.Join # F1) и установите результат поиска (SET LHSPipe.F1Output = Результат # F1) В противном случае SET LHSPipe.F1Output = N/A

Такая же логика применяется для вычисления F2.

Ожидаемый выход,

enter image description here

Этот сценарий заставил меня пойти с Настраиваемой Регистрацией операции IF-ELSE принимает решение о регистрации или нет.

Учитывая описанный выше сценарий, я хотел бы перейти на соединение MAP-SIDE (сохраняя RHSPipe в памяти узла задачи MAP), я думал о ниже возможных решениях, каждый из которых имеет свои плюсы и минусы. Вам нужны ваши предложения.

Вариант № 1:

CoGroup - Мы можем строить пользовательские присоединиться к логике с помощью CoGroup с BufferJoiner следуют обычаю присоединиться (операции), но Wouldnt обеспечить MAP-SIDE присоединиться.

Вариант № 2:

HashJoin - Это обеспечивает MAP-SIDE присоединиться, но, насколько я вижу пользовательские присоединиться не могут быть построены с использованием этого.

Исправьте свое понимание и предложите свои мнения для работы над этим требованием.

Заранее спасибо.

+0

Можете ли вы предоставить свой пример кода, а также то, что вы хотите сделать в пользовательском соединении? – Ambrish

+0

Также будут полезны примеры входных данных и ожидаемого выхода. – Ambrish

+0

Рассматривали ли вы разбивку своих данных на подмножества? – kpie

ответ

1

Лучший способ решить эту проблему (что я могу придумать) - изменить ваш меньший набор данных. Вы можете добавить новое поле (F1DecidingFactor) к меньшему набору данных.Значение F1Result может должно понравиться:

Судо код

if F1DecidingFactor == "Yes" then 
    F1Result = ACTUAL_VALUE 
else 
    F1Result = "N/A" 

Таблица результатов

|F1#Join|F1#Result|F1#DecidingFactor| 
| Yes|  0|    True| 
| Yes|  1|   False| 
|  No|  0|    N/A| 
|  No|  1|    N/A| 

Вы можете сделать выше, с помощью каскадирования, а также.

После этого вы можете присоединиться к своей стороне карты.

Если модификация меньшего набора данных невозможна, у меня есть 2 варианта решения проблемы.

Вариант 1

Добавить новые поля в ваших труб малого диаметра, что эквивалентно вас решающим фактором (т.е. F1DecidingFactor_RHS = Yes). Затем включите его в свои критерии присоединения. Как только ваше соединение будет выполнено, вы будете иметь значения только для тех строк, где это условие соответствует. В противном случае это будет null/blank. Пример кода:

Главный класс

import cascading.operation.Insert; 
import cascading.pipe.Each; 
import cascading.pipe.HashJoin; 
import cascading.pipe.Pipe; 
import cascading.pipe.assembly.Discard; 
import cascading.pipe.joiner.LeftJoin; 
import cascading.tuple.Fields; 

public class StackHashJoinTestOption2 { 
    public StackHashJoinTestOption2() { 
     Fields f1Input = new Fields("F1Input"); 
     Fields f2Input = new Fields("F2Input"); 
     Fields f1Join = new Fields("F1Join"); 
     Fields f2Join = new Fields("F2Join"); 

     Fields f1DecidingFactor = new Fields("F1DecidingFactor"); 
     Fields f2DecidingFactor = new Fields("F2DecidingFactor"); 
     Fields f1DecidingFactorRhs = new Fields("F1DecidingFactor_RHS"); 
     Fields f2DecidingFactorRhs = new Fields("F2DecidingFactor_RHS"); 

     Fields lhsJoinerOne = f1DecidingFactor.append(f1Input); 
     Fields lhsJoinerTwo = f2DecidingFactor.append(f2Input); 

     Fields rhsJoinerOne = f1DecidingFactorRhs.append(f1Join); 
     Fields rhsJoinerTwo = f2DecidingFactorRhs.append(f2Join); 

     Fields functionFields = new Fields("F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output"); 

     // Large Pipe fields : 
     // F1DecidingFactor F1Input F2DecidingFactor F2Input 
     Pipe largePipe = new Pipe("large-pipe"); 

     // Small Pipe 1 Fields : 
     // F1Join F1Result 
     Pipe rhsOne = new Pipe("small-pipe-1"); 

     // New field to small pipe. Expected Fields: 
     // F1Join F1Result F1DecidingFactor_RHS 
     rhsOne = new Each(rhsOne, new Insert(f1DecidingFactorRhs, "Yes"), Fields.ALL); 

     // Small Pipe 2 Fields : 
     // F2Join F2Result 
     Pipe rhsTwo = new Pipe("small-pipe-2"); 

     // New field to small pipe. Expected Fields: 
     // F2Join F2Result F2DecidingFactor_RHS 
     rhsTwo = new Each(rhsTwo, new Insert(f1DecidingFactorRhs, "Yes"), Fields.ALL); 

     // Joining first small pipe. Expected fields after join: 
     // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS 
     Pipe resultsOne = new HashJoin(largePipe, lhsJoinerOne, rhsOne, rhsJoinerOne, new LeftJoin()); 

     // Joining second small pipe. Expected fields after join: 
     // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS F2Join F2Result F2DecidingFactor_RHS 
     Pipe resultsTwo = new HashJoin(resultsOne, lhsJoinerTwo, rhsTwo, rhsJoinerTwo, new LeftJoin()); 

     Pipe result = new Each(resultsTwo, functionFields, new TestFunction(), Fields.REPLACE); 

     result = new Discard(result, f1DecidingFactorRhs); 
     result = new Discard(result, f2DecidingFactorRhs); 

     // result Pipe should have expected result 
    } 
} 

Вариант 2

Если вы хотите иметь значение по умолчанию, а не нулевой/пустой, то я хотел бы предложить вам сделать HashJoin первый с невыполнением Столяры, за которыми следует функция для обновления кортежей с соответствующими значениями. Что-то вроде:

главный класс

import cascading.pipe.Each; 
import cascading.pipe.HashJoin; 
import cascading.pipe.Pipe; 
import cascading.pipe.joiner.LeftJoin; 
import cascading.tuple.Fields; 

public class StackHashJoinTest { 
    public StackHashJoinTest() { 
     Fields f1Input = new Fields("F1Input"); 
     Fields f2Input = new Fields("F2Input"); 
     Fields f1Join = new Fields("F1Join"); 
     Fields f2Join = new Fields("F2Join"); 

     Fields functionFields = new Fields("F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output"); 

     // Large Pipe fields : 
     // F1DecidingFactor F1Input F2DecidingFactor F2Input 
     Pipe largePipe = new Pipe("large-pipe"); 

     // Small Pipe 1 Fields : 
     // F1Join F1Result 
     Pipe rhsOne = new Pipe("small-pipe-1"); 

     // Small Pipe 2 Fields : 
     // F2Join F2Result 
     Pipe rhsTwo = new Pipe("small-pipe-2"); 

     // Joining first small pipe. 
     // Expected fields after join: 
     // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result 
     Pipe resultsOne = new HashJoin(largePipe, f1Input, rhsOne, f1Join, new LeftJoin()); 

     // Joining second small pipe. 
     // Expected fields after join: 
     // F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F2Join F2Result 
     Pipe resultsTwo = new HashJoin(resultsOne, f2Input, rhsTwo, f2Join, new LeftJoin()); 

     Pipe result = new Each(resultsTwo, functionFields, new TestFunction(), Fields.REPLACE); 

     // result Pipe should have expected result 
    } 
} 

Update Функция

import cascading.flow.FlowProcess; 
import cascading.operation.BaseOperation; 
import cascading.operation.Function; 
import cascading.operation.FunctionCall; 
import cascading.tuple.Fields; 
import cascading.tuple.TupleEntry; 

public class TestFunction extends BaseOperation<Void> implements Function<Void> { 

    private static final long serialVersionUID = 1L; 

    private static final String DECIDING_FACTOR = "No"; 
    private static final String DEFAULT_VALUE = "N/A"; 

    // Expected Fields: "F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output" 
    public TestFunction() { 
     super(Fields.ARGS); 
    } 

    @Override 
    public void operate(@SuppressWarnings("rawtypes") FlowProcess process, FunctionCall<Void> call) { 
     TupleEntry arguments = call.getArguments(); 

     TupleEntry result = new TupleEntry(arguments); 

     if (result.getString("F1DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) { 
      result.setString("F1Output", DEFAULT_VALUE); 
     } 

     if (result.getString("F2DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) { 
      result.setString("F2Output", DEFAULT_VALUE); 
     } 

     call.getOutputCollector().add(result); 
    } 

} 

Ссылки

Это должно решить вашу проблему. Позвольте мне знать, если это помогает.

Смежные вопросы