2015-08-01 5 views
3

Я пишу простую программу для Twitter, где я читаю твиты с помощью Kafka и хочу использовать Avro для сериализации. До сих пор я только что настроил конфигурацию Twitter в Scala и теперь хочу читать твиты, используя эту конфигурацию.Импорт схемы avro в Scala

Как импортировать следующую схему avro, как определено в файле tweets.avsc в моей программе?

{ 
    "namespace": "tweetavro", 
    "type": "record", 
    "name": "Tweet", 
    "fields": [ 
     {"name": "name", "type": "string"}, 
     {"name": "text", "type": "string"} 
    ] 
} 

Я последовал за несколько примеров по холсту, который показывает что-то вроде import tweetavro.Tweet импортировать схему в Scala, так что мы можем использовать его как

def main (args: Array[String]) { 
    val twitterStream = TwitterStream.getStream 
    twitterStream.addListener(new OnTweetPosted(s => sendToKafka(toTweet(s)))) 
    twitterStream.filter(filterUsOnly) 
    } 

    private def toTweet(s: Status): Tweet = { 
    new Tweet(s.getUser.getName, s.getText) 
    } 

    private def sendToKafka(t:Tweet) { 
    println(toJson(t.getSchema).apply(t)) 
    val tweetEnc = toBinary[Tweet].apply(t) 
    val msg = new KeyedMessage[String, Array[Byte]](KafkaTopic, tweetEnc) 
    kafkaProducer.send(msg) 
    } 

Я следующий же и с использованием ниже следующих плагинов в pom.xml

<!-- AVRO MAVEN PLUGIN --> 
<plugin> 
    <groupId>org.apache.avro</groupId> 
    <artifactId>avro-maven-plugin</artifactId> 
    <version>1.7.7</version> 
    <executions> 
    <execution> 
     <phase>generate-sources</phase> 
     <goals> 
     <goal>schema</goal> 
     </goals> 
     <configuration> 
     <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> 
     <outputDirectory>${project.basedir}/src/main/scala/</outputDirectory> 
     </configuration> 
    </execution> 
    </executions> 
</plugin> 


<!-- MAVEN COMPILER PLUGIN --> 
<plugin> 
    <groupId>org.apache.maven.plugins</groupId> 
    <artifactId>maven-compiler-plugin</artifactId> 
    <configuration> 
    <source>1.7</source> 
    <target>1.7</target> 
    </configuration> 
</plugin> 

После всего этого, до сих пор я не могу сделать import tweetavro.Tweet

Может, anayone, пожалуйста, помогите?

Спасибо!

ответ

1

Сначала вы должны скомпилировать эту схему в класс. Я не уверен, есть библиотека для Avro в Scala, которая является производство готов, но вы можете создать класс для Java и использовать его в Scala:

java -jar /path/to/avro-tools-1.7.7.jar compile schema tweet.avsc .

Измените эту строку для ваших потребностей, и вы должны получить класс tweetavro.Tweet, сгенерированный этим инструментом. Затем вы можете поместить его в свой проект и использовать так, как вы только что описали.

Подробнее here

UPD: FYI, кажется, есть library in Scala, но я никогда не использовал его раньше

2

Я рекомендую использовать Avrohugger. Это новый парень на блоке с точки зрения классов Scala для Avro, но поддерживает все, что мне нужно, и мне очень нравится, что он не основан на макросах, поэтому я действительно могу видеть, что генерируется.

Сопровождающий был потрясающим, чтобы работать и очень принимать вклады и отзывы. Это не так и, вероятно, никогда не будет такой функциональной, как официальный Java-код, но он подойдет большинству потребностей людей.

В настоящее время отсутствует поддержка союзов (кроме факультативных типов) и рекурсивных типов.

плагин SBT работает очень хорошо, и есть новый веб-интерфейс, если вы хотите быстро увидеть, что она делает с вашими Avro схемами:

https://avro2caseclass.herokuapp.com/

Подробнее здесь:

https://github.com/julianpeeters/avrohugger

+0

Спасибо за ваш ответ. Avrohugger выглядит интересно, я, вероятно, воспользуюсь этим в своем следующем проекте. Мне удалось решить проблему с использованием градиента, поскольку проблема была в моем случае с помощью avro-maven-plugin. – PRP

3

Вы также можете использовать avro4s. Определите свой класс case (или сгенерируйте его) на основе схемы. Назовем этот класс Tweet. Затем вы создаете AvroOutputStream, который также выводит схему из класса case и используется для сериализации экземпляров. Затем мы можем записать в массив байтов и отправить это кафке.Например:

val tweet: Tweet= ... // the instance you want to serialize 

val out = new ByteArrayOutputStream // we collect the serialized output in this 
val avro = AvroOutputStream[Tweet](out) // you specify the type here as well 
avro.write(tweet) 
avro.close() 

val bytes = out.toByteArray 
val msg = new KeyedMessage[String, Array[Byte]](KafkaTopic, bytes) 
kafkaProducer.send(msg) 
Смежные вопросы