2015-11-15 3 views
1

Я пытаюсь передать файл csv из flume в kafka. Я могу передать файл напрямую, используя следующий файл конфигурации, чтобы передать весь файл из лотка в Kafka.Преобразование csv-файла в JSON в flume

# Name the components on this agent 
    a1.sources = r1 
    a1.sinks = k1 
    a1.channels = c1 

    # Describe the source 
    a1.sources.r1.type = exec 
    a1.sources.r1.command = cat /User/Desktop/logFile.csv 


    # Describe the sink 
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink 
    a1.sinks.k1.topic = kafkaTopic 
    a1.sinks.k1.brokerList = localhost:9092 
    a1.sinks.sink1.batchSize = 20 

    # Use a channel which buffers events in memory 
    a1.channels.c1.type = memory 
    a1.channels.c1.capacity = 10000 
    a1.channels.c1.transactionCapacity = 10000 

    # Bind the source and sink to the channel 
    a1.sources.r1.channels = c1 
    a1.sinks.k1.channel = c1 

Но я хочу, чтобы он был преобразован в формат JSON, прежде чем перейти к kafka для дальнейшей обработки. Может кто-то, пожалуйста, сообщите мне, как конвертировать файл из csv в формат JSON.

Спасибо!

ответ

2

Думаю, вам нужно написать свой собственный перехватчик.

  1. начать с реализацией интерфейса перехватчика
  2. Читать CSV из водопропускного тела события.
  3. Синтаксический его и создавать JSON
  4. вставить его обратно в тело события

Пример: https://questforthought.wordpress.com/2014/01/13/using-flume-interceptor-multiplexing/

Смежные вопросы