2014-10-11 3 views
1

Scala В этом коде:Передача данных от производителя Кафки в искре с использованием

import java.limport java.lang.StringBuilder 
import java.util.Properties 
import kafka.producer.{KeyedMessage, Producer, ProducerConfig} 
import org.jnetpcap.Pcap 
import org.jnetpcap.packet.{PcapPacket, PcapPacketHandler} 

object kafkaproducer extends Serializable{ 
    def main(args: Array[String]) { 
    if (args.length < 4) { 
     System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " + 
     "<messagesPerSec> <wordsPerMessage>") 
     System.exit(1) 
    } 
    //metadata.broker.list=localhost:9092 
    //zookeeper.connect=localhost:2181 
    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args 
    // Zookeeper connection properties 
    val props = new Properties() 
    props.put("metadata.broker.list", brokers.toString) 
    props.put("serializer.class", "kafka.serializer.StringEncoder") 
    val config = new ProducerConfig(props) 
    val producer = new Producer[String, PcapPacket](config) 
    // Send some messages 
    val snaplen = 64 * 1024 // Capture all packets, no truncation 
    val flags = Pcap.MODE_PROMISCUOUS // capture all packets 
    val timeout = 10 * 1000 
    val jsb = new java.lang.StringBuilder() 
    val errbuf = new StringBuilder(jsb); 
    val pcap = Pcap.openLive("eth0", snaplen, flags, timeout, errbuf) 
    if (pcap == null) { 
     println("Error : " + errbuf.toString()) 
    } 

    while(true){ 

     val jpacketHandler = new PcapPacketHandler[String]() { 

     def nextPacket(packet: PcapPacket, user: String) { 
      val data = new KeyedMessage[String,PcapPacket](topic.toString,(packet)) 
      println(data) 
      producer.send(data) 


     } 
     } 
     pcap.loop(50, jpacketHandler, "jNetPcap works!") 


    } 

    } 
} 

Я получаю это исключение:

Exception in thread "main" java.lang.ClassCastException: org.jnetpcap.packet.PcapPacket cannot be cast to java.lang.String 
at kafka.serializer.StringEncoder.toBytes(Unknown Source) 
at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(Unknown Source) 
at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(Unknown Source) 
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) 
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
at kafka.producer.async.DefaultEventHandler.serialize(Unknown Source) 
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source) 
at kafka.producer.Producer.send(Unknown Source) 
at kafkaproducer$$anon$1.nextPacket(kafkaproducer.scala:50) 
at kafkaproducer$$anon$1.nextPacket(kafkaproducer.scala:40) 
at org.jnetpcap.Pcap.loop(Native Method) 
at org.jnetpcap.Pcap.loop(Unknown Source) 
at kafkaproducer$.main(kafkaproducer.scala:55) 
at kafkaproducer.main(kafkaproducer.scala) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) 

следующей ошибку при отправке данных от производителя Кафки. В изготовителе kafka пакеты загружаются с использованием библиотеки jnetpcap. Кто-нибудь может мне помочь, пожалуйста!

+0

Не могли бы вы также опубликовать код? Вероятно, вы пытаетесь отправить объект типа PcapPacket с помощью StringSerializer – maasg

+0

https://github.com/swe0523/producer/blob/master/producer.scala. Вот ссылка на код – user3823859

ответ

2

Причина этого исключения заключается в том, что производитель настроен на использование StringEncoder здесь:

props.put("serializer.class", "kafka.serializer.StringEncoder") 

Тем не менее, фактическое значение при условии, имеет тип PcapPacket. Производитель будет использовать кодировщик для сериализации объекта и boem у вас есть исключение класса.

Также обратите внимание, что после документации JNetPcap вы можете не использовать, используя захваченную PcapPacket для передачи данных. Этот объект изменчив и будет изменяться в каждом захвате с вновь захваченными данными. From the docs:

Так же, как с JBufferHandler одна копия PcapPacket повторно используется для каждого пакета из одного экземпляра цикла PCAP отправки. Пакет поступает полностью декодированным и может быть доступен немедленно, , но может не быть помещенным в очередь или другое постоянное/полупостояно . Он должен быть немедленно обработан приложением пользователя, отброшен или скопирован в более постоянную ячейку памяти.

Как я уже говорил на this question:

Если вы хотите получить доступ к специфике PcapPacket, я предлагаю yIf вы хотите получить доступ к специфике PcapPacket, я предлагаю вам извлечь что информация на производителя и помещайте его в строку или настраиваемый объект сериализуемого типа .

Это остается актуальным советом для этого случая.

+0

Спасибо. В приведенной выше ссылке вниз страница, они говорят, что пакет может быть помещен в очередь. Могу ли я поставить то же самое на кафкинскую очередь? – user3823859

+0

Я обновил этот вопрос указателем на то, как реализовать функции кодировщика/декодера - http://stackoverflow.com/questions/26258553/how-to-implement-deserialisation-in-kafka-consumer-using-scala – maasg

Смежные вопросы