2014-11-09 3 views
0

Я только начинаю с флюма и должен вставлять некоторые заголовки в раковину hdfs.Форматирование Apache Flume HDFS Serializer

У меня это работает, хотя формат неправильный, и я не могу управлять столбцами.

Используя эту конфигурацию:

a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 

a1.sources.r1.type = syslogudp 
a1.sources.r1.host = 0.0.0.0 
a1.sources.r1.port = 44444 

a1.sources.r1.interceptors = i1 i2 
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder 
a1.sources.r1.interceptors.i1.preserveExisting = false 
a1.sources.r1.interceptors.i1.hostHeader = hostname 

a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder 
a1.sources.r1.interceptors.i2.preserveExisting = false 

a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = hdfs://localhost:9000/user/vagrant/syslog/%y-%m-%d/ 
a1.sinks.k1.hdfs.rollInterval = 120 
a1.sinks.k1.hdfs.rollCount = 100 
a1.sinks.k1.hdfs.rollSize = 0 
a1.sinks.k1.hdfs.fileType = DataStream 
a1.sinks.k1.hdfs.writeFormat = Text 

a1.sinks.k1.serializer = header_and_text 
a1.sinks.k1.serializer.columns = timestamp hostname 
a1.sinks.k1.serializer.format = CSV 
a1.sinks.k1.serializer.appendNewline = true 

a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 

a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

Журналы, записанные HDFS являются в основном в порядке помимо сериализованными аспектов:

{timestamp=1415574695138, Severity=6, host=PolkaSpots, Facility=3, hostname=127.0.1.1} hostapd: wlan0-1: STA xx WPA: group key handshake completed (RSN) 

Как форматировать журналы, так как они выглядят следующим образом:

1415574695138 127.0.1.1 hostapd: wlan0-1: STA xx WPA: group key handshake completed (RSN) 

Временная метка сначала сопровождается именем хоста, а затем телом syslog msg.

ответ

1

Причина этого заключается в том, что два перехватчика, которые вы сконфигурировали, записывают значения в заголовки событий Flume, которые сериализуются в тело с помощью элемента HeaderAndBodyTextEventSerializer. Последний раз это делает:

public void write(Event e) throws IOException { 
    out.write((e.getHeaders() + " ").getBytes()); 
    out.write(e.getBody()); 
    if (appendNewline) { 
     out.write('\n'); 
    } 
    } 

делегируя e.getHeaders() будет только сериализации карты в строку JSON.

Чтобы устранить эту проблему, я бы предложил создать собственный сериализатор и перегрузить метод write() для форматирования вывода на значения, разделенные вкладкой. В этом случае вам просто нужно будет указать путь к классу в:

a1.sinks.k1.serializer = com.mycompany.MySerlizer 

и поместите банку в классе пути Flume в.

+0

Спасибо за ответ, я отказался от чего-то ожидать. Отвлеклись от этого в настоящее время, но попытаемся взглянуть на ваше решение в ближайшее время. Спасибо. – simonmorley

Смежные вопросы