Scala В этом коде:Передача данных от производителя Кафки в искре с использованием
import java.limport java.lang.StringBuilder
import java.util.Properties
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import org.jnetpcap.Pcap
import org.jnetpcap.packet.{PcapPacket, PcapPacketHandler}
object kafkaproducer extends Serializable{
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
"<messagesPerSec> <wordsPerMessage>")
System.exit(1)
}
//metadata.broker.list=localhost:9092
//zookeeper.connect=localhost:2181
val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
// Zookeeper connection properties
val props = new Properties()
props.put("metadata.broker.list", brokers.toString)
props.put("serializer.class", "kafka.serializer.StringEncoder")
val config = new ProducerConfig(props)
val producer = new Producer[String, PcapPacket](config)
// Send some messages
val snaplen = 64 * 1024 // Capture all packets, no truncation
val flags = Pcap.MODE_PROMISCUOUS // capture all packets
val timeout = 10 * 1000
val jsb = new java.lang.StringBuilder()
val errbuf = new StringBuilder(jsb);
val pcap = Pcap.openLive("eth0", snaplen, flags, timeout, errbuf)
if (pcap == null) {
println("Error : " + errbuf.toString())
}
while(true){
val jpacketHandler = new PcapPacketHandler[String]() {
def nextPacket(packet: PcapPacket, user: String) {
val data = new KeyedMessage[String,PcapPacket](topic.toString,(packet))
println(data)
producer.send(data)
}
}
pcap.loop(50, jpacketHandler, "jNetPcap works!")
}
}
}
Я получаю это исключение:
Exception in thread "main" java.lang.ClassCastException: org.jnetpcap.packet.PcapPacket cannot be cast to java.lang.String
at kafka.serializer.StringEncoder.toBytes(Unknown Source)
at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(Unknown Source)
at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(Unknown Source)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.producer.async.DefaultEventHandler.serialize(Unknown Source)
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafkaproducer$$anon$1.nextPacket(kafkaproducer.scala:50)
at kafkaproducer$$anon$1.nextPacket(kafkaproducer.scala:40)
at org.jnetpcap.Pcap.loop(Native Method)
at org.jnetpcap.Pcap.loop(Unknown Source)
at kafkaproducer$.main(kafkaproducer.scala:55)
at kafkaproducer.main(kafkaproducer.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
следующей ошибку при отправке данных от производителя Кафки. В изготовителе kafka пакеты загружаются с использованием библиотеки jnetpcap. Кто-нибудь может мне помочь, пожалуйста!
Не могли бы вы также опубликовать код? Вероятно, вы пытаетесь отправить объект типа PcapPacket с помощью StringSerializer – maasg
https://github.com/swe0523/producer/blob/master/producer.scala. Вот ссылка на код – user3823859