2017-02-21 3 views
0

Я пытаюсь реализовать Message Broker, настроенный с помощью Lagom 1.2.2, и столкнулся с стеной. Документация имеет следующий пример для дескриптора сервиса:Полный пример Message Broker в Лагоме

default Descriptor descriptor() { 
return named("helloservice").withCalls(...) 
    // here we declare the topic(s) this service will publish to 
    .publishing(
    topic("greetings", this::greetingsTopic) 
) 
    ....; 
} 

И этот пример реализации:

public Topic<GreetingMessage> greetingsTopic() { 
return TopicProducer.singleStreamWithOffset(offset -> { 
    return persistentEntityRegistry 
     .eventStream(HelloEventTag.INSTANCE, offset) 
     .map(this::convertEvent); 
    }); 
} 

Однако, нет ни одного примера того, что аргумент типа или типа возвращаемого значения функции convertEvent() являются , и вот где я рисую пробел. С другой стороны, абонент на MessageBroker, кажется, что это отнимает много GreetingMessage объектов, но когда я создаю функцию convertEvent вернуть GreetingMessage объектов, я получаю ошибку компиляции:

Error:(61, 21) java: method map in class akka.stream.javadsl.Source<Out,Mat> cannot be applied to given types; 
    required: akka.japi.function.Function<akka.japi.Pair<com.example.GreetingEvent,com.lightbend.lagom.javadsl.persistence.Offset>,T> 
    found: this::convertEvent 
    reason: cannot infer type-variable(s) T 
    (argument mismatch; invalid method reference 
    incompatible types: akka.japi.Pair<com.example.GreetingEvent,com.lightbend.lagom.javadsl.persistence.Offset> cannot be converted to com.example.GreetingMessage) 

Есть ли еще более тщательно примеры того, как это использовать? Я уже проверял пример приложения Chirper, и похоже, что у него нет примера.

Спасибо!

ответ

3

Сообщение об ошибке вставили говорит вам именно то, что ожидает map:

required: akka.japi.function.Function<akka.japi.Pair<com.example.GreetingEvent,com.lightbend.lagom.javadsl.persistence.Offset>,T> 

Итак, вам нужно передать функцию, которая принимает Pair<GreetingEvent, Offset>. Что должна вернуть функция? Ну, обновите его, чтобы принять это, и затем вы получите следующую ошибку, которая еще раз скажет вам, что она ожидала от вас, и в этом случае вы найдете ее Pair<GreetingMessage, Offset>.

Чтобы объяснить, что эти типы - Lagom должен отслеживать, какие события были опубликованы в Kafka, чтобы при перезапуске службы он не начинался с начала вашего журнала событий и не публиковал все события из начало времени снова. Он делает это, используя смещения. Таким образом, журнал событий создает пары событий и смещений, а затем вам нужно преобразовать эти события в сообщения, которые будут опубликованы в Kafka, и когда вы вернете преобразованное сообщение в Lagom, оно должно быть в паре со смещением которые вы получили из журнала событий, так что после публикации в Kafka Lagom может сохранить смещение и использовать его в качестве отправной точки в следующий раз, когда служба будет перезапущена.

Полный пример можно увидеть здесь: https://github.com/lagom/online-auction-java/blob/a32e696/bidding-impl/src/main/java/com/example/auction/bidding/impl/BiddingServiceImpl.java#L91

+0

Спасибо; Я рассматривал приложение примера аукциона, которое вы предоставили, и оно было полезно (хотя оно использует «taggedStreamWithOffset» вместо «singleStreamWithOffset» (что, если честно, может быть тем, что я хочу в любом случае). Часть I отсутствовал, был аргумент 'convertEvent', по причинам, я предполагал, что я испортил тип возврата, даже когда у меня был правильный тип возврата, и это сбило меня с толку. Опять же, спасибо за помощь! –

Смежные вопросы