2014-08-28 2 views
0

Я смотрел на все остальные потоки по этой теме и до сих пор не нашли ответа ...Доступ распределенного кэша в Pig StoreFunc

Проще говоря, я хочу, чтобы получить доступ к Hadoop распределенной кэш из Свиньи StoreFunc, и НЕ изнутри UDF напрямую.

Соответствующие PIG строки кода:

DEFINE CustomStorage KeyValStorage('param1','param2','param3'); 
... 
STORE BLAH INTO /path/ using CustomStorage(); 

Соответствующий Java-код:

public class KeyValStorage<M extends Message> extends BaseStoreFunc /* ElephantBird Storage which inherits from StoreFunc */ { 

... 
public KeyValStorage(String param1, String param2, String param3) { 
    ... 
     try { 
      InputStream is = new FileInputStream(configName); 
      try { 
       prop.load(is); 
      } catch (IOException e) { 
       System.out.println("PROPERTY LOADING FAILED"); 
       e.printStackTrace(); 
      } 
     } catch (FileNotFoundException e) { 
      System.out.println("FILE NOT FOUND"); 
      e.printStackTrace(); 
     } 
    } 
... 
} 

configName это имя локального файла, который я должен иметь возможность читать из распределенного кэша, однако, я получение исключения FileNotFoundException. Когда я использую ТОЧНЫЙ тот же код из PIG UDF напрямую, файл найден, поэтому я знаю, что файл отправляется через распределенный кеш. Я установил соответствующий параметр, чтобы убедиться, что это произошло:

<property><name>mapred.cache.files</name><value>/path/to/file/file.properties#configName</value></property> 

Любые идеи, как я могу обойти это?

Спасибо!

ответ

0

конструктор StroreFunc, называется как на фронтэнда и бэкэндом. Когда он вызывается из внешнего интерфейса (перед запуском задания), вы получите FileNotFoundException, потому что в этот момент файлы из распределенного кеша еще не скопированы на локальный диск узлов.
Вы можете проверить, являются ли вы на внутреннем интерфейсе (когда работа выполняется) и загрузите файл только в этом случае например:

DEFINE CustomStorage KeyValStorage('param1','param2','param3'); 
set mapreduce.job.cache.files hdfs://host/user/cache/file.txt#config 
... 
STORE BLAH INTO /path/ using CustomStorage(); 

public KeyValStorage(String param1, String param2, String param3) { 
    ... 
    try { 
    if (!UDFContext.getUDFContext().isFrontend()) { 
     InputStream is = new FileInputStream("./config"); 
     BufferedReader br = new BufferedReader(new InputStreamReader(is)); 
     ... 
    ... 
} 
Смежные вопросы