2016-08-30 2 views
0

Для того, чтобы избежать использования изменяемую переменную внутри Actor, я сделал следующие вещи:Akka Актер - Просить сообщение дает мне пустой набор

import akka._ 
    import akka.actor._ 
    import play.api.Logger 
    import scala.language.implicitConversions 

    object Sessions_Buffer { 

     case class Sessions_Buffer_Data(
     data: scala.collection.immutable.Map[String, List[Array[Byte]]] 
    ) 

     // var sessions_buffer: Map[String, List[Array[Byte]]] = Map() 
     def props(): Props = Props(Sessions_Buffer()) 
    } 

    case class Sessions_Buffer() extends Actor { 

     import Sessions_Buffer._ 
    def receive = active(Map()) 

     def mergeMaps(m1: Map[String, List[Array[Byte]]], m2: Map[String, List[Array[Byte]]]) = 
     for ((key, values) <- m2) yield { 
      val existingData = m1.getOrElse(key, List()) 
      (key -> (existingData ++ values)) 
     } 

     def active(dataFromKafka: Map[String, List[Array[Byte]]]): Receive = { 
     case sessionData: Sessions_Buffer_Data => 
      context.become(active(mergeMaps(dataFromKafka, sessionData.data))) 
      for ((key, values) <- dataFromKafka) { 
      Logger.debug("key = " + key + " values =" + values.size) 
      } 
     case "get_session_data" => 
      sender() ! dataFromKafka 
     } 

class Test_Kakfa_To_Buffer 
    extends TestKit(ActorSystem("test-kafka-buffer")) 
    with ImplicitSender 
    with WordSpecLike 
    with Matchers 
    with BeforeAndAfterAll { 

    var kafkaProps: Properties = _ 

    override def beforeAll = { 
    Play.start(TestUtils.FAKE_APP) 
    kafkaProps = KafkaConfigService.securedDeviceKafkaProps 
    } 

    override def afterAll = { 
    TestKit.shutdownActorSystem(system) 
    Play.stop(TestUtils.FAKE_APP) 
    } 

val buffer_Dispatcher = system.actorOf(Sessions_Buffer.props()) 
... // some process here 
val futureSessionsData = buffer_Dispatcher ? "get_session_data" 
val sessionsData: Map[String, List[Array[Byte]]] = 

Await.result (futureSessionsData, timeout.duration) .asInstanceOf [Карта [String, List [массив [Byte]]]] утверждают (sessionsData.keys.size == (TestUtils.NUMBER_OF_DEVICES + 1))}

проблема заключается в том, что в одном из модульного тестирования, что я написал, когда я спрашиваю session_data, результирующая Карта пуста. Ошибка вывода из scalatest:

Set() had size 0 instead of expected size 4 

Во время процесса журналы показывают, что Карта не пуста, но когда я спрашиваю эту карту, она пуста! Что мне не хватает?

+0

Не могли бы вы показать оставшийся тестовый код? (где вы действительно отправляете сообщения). Я заметил, что ваша функция 'merge' может иметь ошибку - вы не рассматриваете ключи, которые находятся на' m1', но не на 'm2'. – virsox

+0

Какие сообщения? Потому что в коде, который я отправляю, я запрашиваю данные buffer_actor. И вы правы в функции слияния. – alifirat

+0

Проблема в том, что то, что вы печатаете в своем журнале, отличается от того, что вы возвращаете в сообщении '' get_session_data ". Поскольку ваш метод 'merge' является ошибкой, в результате вы получаете« Set »с размером 0, но вы печатаете данные сообщения, а не результат вызова' merge'. – virsox

ответ

1

В общем, ваш код структурирован правильно, но есть две проблемы, которые вызывают это «странное» поведение:

  1. Ваша merge функция есть ошибка - это не учитывает ключи, которые находятся в m1 но не на m2.

  2. Вы регистрируетесь содержание sessionData сообщения, но то, что вы возвращаетесь в результате get_session_data является merge из sessionData содержания с предыдущим содержанием.

Я считаю, что если вы исправите эти две проблемы, у вас будет то, что вы ищете!

+0

Да, исправление моих 'mergeMaps' решило мою проблему! благодаря :) – alifirat

Смежные вопросы