2015-10-29 2 views
0

Я относительно новичок в концепции перехватчиков флюма и сталкиваюсь с проблемой, когда перед применением перехватчика файл погружен в обычный текстовый файл, и после применения перехватчика все становится очень плохо.Apache flume custom interceptor - файл HDFS в двоичном и странном

Мой перехватчик код ниже -

package com.flume; 

import org.apache.flume.*; 
import org.apache.flume.interceptor.*; 

import java.util.List; 
import java.util.Map; 
import java.util.ArrayList; 
import java.io.UnsupportedEncodingException; 
import java.net.InetAddress; 
import java.net.UnknownHostException; 

public class CustomHostInterceptor implements Interceptor { 

    private String hostValue; 
    private String hostHeader; 

    public CustomHostInterceptor(String hostHeader){ 
     this.hostHeader = hostHeader; 
    } 

    @Override 
    public void initialize() { 
     // At interceptor start up 
     try { 
      hostValue = 
        InetAddress.getLocalHost().getHostName(); 
     } catch (UnknownHostException e) { 
      throw new FlumeException("Cannot get Hostname", e); 
     } 
    } 

    @Override 
    public Event intercept(Event event) { 

     // This is the event's body 
     String body = new String(event.getBody()); 
     if(body.toLowerCase().contains("text")){ 
      try { 
       event.setBody("hadoop".getBytes("UTF-8")); 
      } catch (UnsupportedEncodingException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     } 
     // These are the event's headers 
     Map<String, String> headers = event.getHeaders(); 

     // Enrich header with hostname 
     headers.put(hostHeader, hostValue); 

     // Let the enriched event go 
     return event; 
    } 

    @Override 
    public List<Event> intercept(List<Event> events) { 

     List<Event> interceptedEvents = 
       new ArrayList<Event>(events.size()); 
     for (Event event : events) { 
      // Intercept any event 
      Event interceptedEvent = intercept(event); 
      interceptedEvents.add(interceptedEvent); 
     } 

     return interceptedEvents; 
    } 

    @Override 
    public void close() { 
     // At interceptor shutdown 
    } 

    public static class Builder 
      implements Interceptor.Builder { 

     private String hostHeader; 

     @Override 
     public void configure(Context context) { 
      // Retrieve property from flume conf 
      hostHeader = context.getString("hostHeader"); 
     } 

     @Override 
     public Interceptor build() { 
      return new CustomHostInterceptor(hostHeader); 
     } 
    } 
} 

Flume конф есть -

agent.sources=exec-source 
agent.sinks=hdfs-sink 
agent.channels=ch1 

agent.sources.exec-source.type=exec 
agent.sources.exec-source.command=tail -F /home/cloudera/Desktop/app.log 
agent.sources.exec-source.interceptors = i1 
agent.sources.exec-source.interceptors.i1.type = com.flume.CustomHostInterceptor$Builder 
agent.sources.exec-source.interceptors.i1.hostHeader = hostname 

agent.sinks.hdfs-sink.type=hdfs 
agent.sinks.hdfs-sink.hdfs.path= hdfs://localhost:8020/bosch/flume/applogs 
agent.sinks.hdfs-sink.hdfs.filePrefix=logs 
agent.sinks.hdfs-sink.hdfs.rollInterval=60 
agent.sinks.hdfs-sink.hdfs.rollSize=0 

agent.channels.ch1.type=memory 
agent.channels.ch1.capacity=1000 

agent.sources.exec-source.channels=ch1 
agent.sinks.hdfs-sink.channel=ch1 

делать кошку на файл, созданный в HDFS -

SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable���*q�CJv�/ESmP�ź 
                          some textP�żc 
                              some more textP���K 
                                  textP��ߌangels and deamonsP��%� 
      text bla blaP��1�angels and deamonsP��1� 
                testP��1�hmmmP��1�anything 

Любые предложения?

Thanks

ответ

0

Похоже, что ничего не случилось с перехватчиком.

В вашей конфигурации Flume Agent.

Вы не уточняя это свойство (hdfs.fileType), поэтому он принимает это как SequenceFile по умолчанию

Попробуйте добавить эту строку в HDFS мойку и дайте мне знать, если это работает.

agent.sinks.hdfs-sink.hdfs.fileType=DataStream 
+0

нормально одно, конечно, я собираюсь отметить это как правильный ответ, это сработало. Однако, так как это первый раз, когда я использую перехватчик, я хочу понять, что именно он делает. Я думал, что он обработает мои данные в реальном времени и фактически проверит тело, содержащее «текст», и заменит его «hadoop». Этого не происходит, никаких предложений? – AJ84

+0

Возможно, вы можете преобразовывать и обогащать данные, как вам нравится, с помощью перехватчика. используйте этот метод для этого общедоступного перехвата событий (событие события) {} Дайте заявления распечатки и здесь, чтобы отлаживать и конвертировать сообщения по своему усмотрению. – RAJESH

+0

Вы можете опубликовать отдельные вопросы, если у вас их больше. Этот вопрос правильно ответил. Вы можете отметить это как ответ, так как вы говорите, что он сработал. – RAJESH

Смежные вопросы