2014-02-20 2 views
0

У меня есть мешки с кортежами, и одно поле в каждой сумке должно быть нормализовано до нуля. Я беру MIN этого поля над сумкой и вычитаю, что min из каждого кортежа.Математика внутри мешка

Это можно сделать без уплощения?

Фактическая ситуация немного сложнее, потому что мне нужен только min над подмножеством кортежей, удовлетворяющим определенному условию.

Вот некоторые примеры кода, который не работает:

data = LOAD 'data.csv' USING PigStorage(',') 
    AS (x:int, y:int, z:int); 

data_grouped = GROUP data BY x; 

data_normal = FOREACH data_grouped { 
    good_data = FILTER data BY y == 0; 
    smallest_good_z = MIN(good_data.z); 
    GENERATE data.(x, y, z-smallest_good_z); 
} 

DESCRIBE data_normal; 

rmf data_normal 
STORE data_normal INTO 'data_normal' USING PigStorage(','); 

и образец data.csv:

0,0,1 
0,0,2 
0,0,3 
0,1,0 
0,2,-1 
1,2,3 
1,3,4 
1,4,5 
1,0,5 

Пожалуйста, скажите мне, что я не группировать, MIN, расплющить, вычитание, и перегруппироваться! Вот метод, который я использую сейчас, и я хочу уйти от него:

data = LOAD 'data.csv' USING PigStorage(',') AS 
    (x:int, y:int, z:int); 

data_grouped = GROUP data BY x; 

data_n0 = FOREACH data_grouped { 
    good_data = FILTER data BY y == 0; 
    smallest_good_z = MIN(good_data.z); 
    GENERATE FLATTEN(data.(x, y, z)), smallest_good_z AS smz:int; 
} 

data_n1 = FOREACH data_n0 GENERATE x,y,z-smz; 

data_normal = GROUP data_n1 BY x; 
+0

О есть кот каламбур скрывается где-то в названии вопроса ...: D – TC1

ответ

1

К сожалению, вы можете сделать это только с UDF. Вот пример:

import java.io.IOException; 
import org.apache.pig.EvalFunc; 
import org.apache.pig.data.DataBag; 
import org.apache.pig.data.Tuple; 

public class MinusToAllInBag extends EvalFunc<Tuple> { 

    @Override 
    public Tuple exec(Tuple input) throws IOException { 
     if (input == null || input.size() != 3) { 
      System.err.println("Inputs are ({inputBag}, position, toSubtract)"); 
      return null; 
     } 
     try { 
      Object o = input.get(0); 
      if (!(o instanceof DataBag)) { 
       throw new RuntimeException("parameter 1 must be a databag"); 
      } 
      DataBag inputBag = (DataBag)o; 
      Integer pos = (Integer) input.get(1); 
      Float toSubtract = (Float) input.get(2); 
      for (Tuple row : inputBag) { 
       Float value = (Float)row.get(pos); 
       if (value != null) { 
        value -= toSubtract; 
        row.set(pos, value); 
       } 
      } 
      return input; 
     } catch (Exception e) { 
      System.err.println("Failed to process input; error - " + e.getMessage()); 
      return null; 
     } 
    } 
} 

И сценарий свиньи:

REGISTER libs.jar; 

data = LOAD 'data.csv' USING PigStorage(',') AS 
    (x:int, y:int, z:float); 

data_grouped = GROUP data BY x; 

data_n0 = FOREACH data_grouped { 
    good_data = FILTER data BY y == 0; 
    smallest_good_z = MIN(good_data.z); 
    GENERATE group, MinusToAllInBag(data, 2, (float)smallest_good_z); 
} 

dump data_n0; 
+0

Может ли этот UDF подхода распространяются на работу на сумке кортежей, где вы не знаете размер кортежа? Другими словами, всегда вычитайте значение из n-й позиции в любом кортеже, независимо от того, имеет ли он n полей или n + 1000 полей? В моей ситуации я действительно не могу сохранить UDF, который нужно редактировать каждый раз, когда я добавляю поле в свой кортеж. –

+0

Да, это второй аргумент UDF, первый - BAG, pos в кортеже BAG, начиная с 0, для вычитания. В этот момент все числа должны плавать. Например, индекс 2 в «MinusToAllInBag (data, 2, (float) smallest_good_z)» является третьим столбцом, если вы считаете, начинается с 1. – alexeipab

Смежные вопросы