2016-08-11 4 views
1

Я пытаюсь использовать Play Framework (Scala) в качестве клиента Akka Cluster для отправки сообщений в другой Akka Cluster, который запускает мои приложения.Не удалось отправить сообщение Buffer Buffer от клиента Akka Cluster

Вот что я сделал:

  1. Я определил сообщения в другом модуле с использованием протокола буфера и совместно проекта с запущенными службами и Play приложение (с помощью Git подмодулей)

    syntax = "proto2"; 
    option java_package = "com.myproject.api.common.messages"; 
    option java_outer_classname = "IsValidClientMessage"; 
    
    message IsValidClient { 
    required int32 clientId = 1; 
    required string clientSecret = 2; 
    } 
    
  2. Started Услуги в порту 2560

    akka { 
    
        remote.netty.tcp.port=${?AKKA_REMOTE_PORT} 
        remote.netty.tcp.hostname=127.0.0.1 
    
        cluster { 
    
        seed-nodes = [ 
         "akka.tcp://[email protected]:2560" 
        ] 
    
        auto-down-unreachable-after = 10s 
        } 
    
        extensions = ["akka.cluster.client.ClusterClientReceptionist"] 
    
        loglevel = DEBUG 
    
        actor { 
    
        serializer { 
         proto = "akka.remote.serialization.ProtobufSerializer" 
        } 
    
        serialization-bindings { 
         "com.myproject.api.common.messages.IsValidClientMessage$IsValidClient" = proto 
        } 
    
        serialize-messages = on 
    
        provider = "akka.cluster.ClusterActorRefProvider" 
    
        debug { 
         receive = on 
        } 
        } 
    } 
    
  3. И побежал Play приложения, используя ниже Akka конфигурации:

    akka { 
    
        remote.netty.tcp.port=2552 
        remote.netty.tcp.hostname=127.0.0.1 
        remote.enabled-transports = ["akka.remote.netty.tcp"] 
    
        cluster { 
    
        seed-nodes = [ 
         "akka.tcp://[email protected]:2552" 
        ] 
    
        auto-down-unreachable-after = 10s 
        } 
    
        extensions = ["akka.cluster.client.ClusterClientReceptionist"] 
    
        loglevel = DEBUG 
    
        actor { 
    
        serializer { 
         proto = "akka.remote.serialization.ProtobufSerializer" 
        } 
    
        serialization-bindings { 
        "com.myproject.api.common.messages.IsValidClientMessage$IsValidClient" = proto 
        } 
    
        serialize-messages = on 
    
        provider = "akka.cluster.ClusterActorRefProvider" 
    
        debug { 
         receive = on 
        } 
        } 
    } 
    

Это код, который я пытался отправить сообщение ApiServiceSystem:

package com.myproject.api.akka.actors.socket 

import ... 

class ClientActor extends Actor with ActorLogging { 

    ClusterClientReceptionist(context.system).registerService(self) 

    val outActors: ArrayBuffer[ActorRef] = ArrayBuffer.empty 
    val apiServiceClient = context.system.actorOf(ClusterClient.props(
    ClusterClientSettings(context.system).withInitialContacts(Set(ActorPath.fromString("akka.tcp://[email protected]:2560/system/receptionist"))) 
)) 

    override def receive = { 
    case WatchOutActor(a) => 
     context.watch(a) 
     outActors += a 
    case Terminated(a) => 
     context.unwatch(a) 
     outActors.remove(outActors.indexOf(a)) 
    case other => 

     implicit val to: Timeout = 2 seconds 

     val isValidClient = IsValidClient.newBuilder() // Protocol Buffer Message 

     isValidClient.setClientId(1000) 
     isValidClient.setClientSecret("notsosecret") 

     (apiServiceClient ? ClusterClient.Send("/user/clientActor", isValidClient.build(), false)).mapTo[Future[Either[ServiceError, Boolean]]] map { f => 
     f map { 
      case Left(e) => 
      outActors foreach { a => a ! e.msg } 
      case Right(bool) => 
      outActors foreach { a => a ! bool.toString } 
     } 
     } recover { 
     case e: Exception => println(s"-=> Exception ${e.getMessage}") 
     } 
    } 
} 

object ClientActor { 

    case class WatchOutActor(actorRef: ActorRef) 
} 

Как я вижу снизу бревна, что моя УПА подключенный к кластеру работает служба:

[DEBUG] [08/11/2016 10:11:05.936] [ApiServiceActorSystem-akka.remote.default-remote-dispatcher-24] [akka.remote.Remoting] Associated [akka.tcp://[email protected]:2560] <- [akka.tcp://[email protected]:2552] 
[DEBUG] [08/11/2016 10:11:05.998] [ApiServiceActorSystem-akka.remote.default-remote-dispatcher-24] [akka.serialization.Serialization(akka://ApiServiceActorSystem)] Using serializer[akka.cluster.client.protobuf.ClusterClientMessageSerializer] for message [akka.cluster.client.ClusterReceptionist$Internal$GetContacts$] 
[DEBUG] [08/11/2016 10:11:06.000] [ApiServiceActorSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2560/system/receptionist] Client [akka.tcp://[email protected]:2552/user/$a] gets contactPoints [akka.tcp://[email protected]:2560/system/receptionist] (all nodes) 
[DEBUG] [08/11/2016 10:11:06.002] [ApiServiceActorSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2560/system/receptionist] Client [akka.tcp://[email protected]:2552/user/$a] gets contactPoints [akka.tcp://[email protected]:2560/system/receptionist] (all nodes) 
[DEBUG] [08/11/2016 10:11:06.002] [ApiServiceActorSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2560/system/receptionist] Client [akka.tcp://[email protected]:2552/user/$a] gets contactPoints [akka.tcp://[email protected]:2560/system/receptionist] (all nodes) 
[DEBUG] [08/11/2016 10:11:06.002] [ApiServiceActorSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2560/system/receptionist] Client [akka.tcp://[email protected]:2552/user/$a] gets contactPoints [akka.tcp://[email protected]:2560/system/receptionist] (all nodes) 
[DEBUG] [08/11/2016 10:11:06.004] [ApiServiceActorSystem-akka.remote.default-remote-dispatcher-76] [akka.serialization.Serialization(akka://ApiServiceActorSystem)] Using serializer[akka.cluster.client.protobuf.ClusterClientMessageSerializer] for message [akka.cluster.client.ClusterReceptionist$Internal$Contacts] 
[DEBUG] [08/11/2016 10:11:06.033] [ApiServiceActorSystem-akka.remote.default-remote-dispatcher-76] [akka.serialization.Serialization(akka://ApiServiceActorSystem)] Using serializer[akka.serialization.JavaSerializer] for message [akka.actor.Identify] 
[DEBUG] [08/11/2016 10:11:06.037] [ApiServiceActorSystem-akka.remote.default-remote-dispatcher-24] [akka.serialization.Serialization(akka://ApiServiceActorSystem)] Using serializer[akka.serialization.JavaSerializer] for message [akka.actor.ActorIdentity] 
[DEBUG] [08/11/2016 10:11:06.126] [ApiServiceActorSystem-akka.remote.default-remote-dispatcher-76] [akka.serialization.Serialization(akka://ApiServiceActorSystem)] Using serializer[akka.serialization.JavaSerializer] for message [akka.remote.EndpointWriter$AckIdleCheckTimer$] 
[DEBUG] [08/11/2016 10:11:07.749] [ApiServiceActorSystem-akka.remote.default-remote-dispatcher-76] [akka.serialization.Serialization(akka://ApiServiceActorSystem)] Using serializer[akka.cluster.client.protobuf.ClusterClientMessageSerializer] for message [akka.cluster.client.ClusterReceptionist$Internal$Heartbeat$] 

Но всякий раз, когда я пытаюсь отправить сообщение, я получаю эту ошибку:

java.lang.RuntimeException: Unable to find proto buffer class: com.myproject.api.common.messages.IsValidClientMessage$IsValidClient 
    at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:1192) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1810) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) 
    at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:241) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) 
    at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:241) 
    at akka.serialization.Serialization$$anonfun$deserialize$3.apply(Serialization.scala:142) 
    at scala.util.Try$.apply(Try.scala:192) 
    at akka.serialization.Serialization.deserialize(Serialization.scala:142) 
    at akka.actor.dungeon.Dispatch$class.sendMessage(Dispatch.scala:128) 
    at akka.actor.ActorCell.sendMessage(ActorCell.scala:374) 
    at akka.actor.Cell$class.sendMessage(ActorCell.scala:295) 
    at akka.actor.ActorCell.sendMessage(ActorCell.scala:374) 
    at akka.actor.RepointableActorRef.$bang(RepointableActorRef.scala:169) 
    at akka.actor.ActorRef.tell(ActorRef.scala:128) 
    at akka.pattern.AskableActorRef$.internalAsk$extension(AskSupport.scala:295) 
    at akka.pattern.AskableActorRef$.$qmark$extension1(AskSupport.scala:281) 
    at com.myproject.api.akka.actors.socket.ClientActor$$anonfun$receive$1.applyOrElse(ClientActor.scala:43) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:480) 
    at com.myproject.api.akka.actors.socket.ClientActor.aroundReceive(ClientActor.scala:16) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:495) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:224) 
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: java.lang.ClassNotFoundException: com.myproject.api.common.messages.IsValidClientMessage$IsValidClient 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at java.lang.Class.forName0(Native Method) 
    at java.lang.Class.forName(Class.java:264) 
    at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:1183) 
    ... 38 common frames omitted 

Как я могу сериализовать свое сообщение? Почему это ClassNotFoundException происходит во время выполнения? Любая помощь будет очень благодарна

+0

Там может быть 2 проблемы в вашем коде: 1. скомпилированный protobuf нет как у клиента, так и у сервера или 2. Так как совместимость с java-scala сложна, вы можете иметь классы case внутри объекта, который определяет ваш беспорядок Протокол возраста неверно разрешен protobuf. –

+0

Какую версию библиотеки protobuf-java вы используете? –

+0

@frank_neff Свой 2.6.1 – surenyonjan

ответ

2

Кажется, есть проблема с загрузкой класса с помощью классов akka-remote и protobuf 3.x (даже если вы используете определения proto2). Не удалось решить эту проблему, используя компилятор protobuf 2.5 istead 3.x: https://github.com/google/protobuf/releases/tag/v2.5.0

Вы должны скомпилировать его самостоятельно, потому что нет прекомпилированных двоичных файлов. Просто голова к загруженному каталогу и выполните следующие команды:

./configure 
make 
make install 

После компиляции v2.5 protoc двоичных, использовать его, чтобы восстановить свои классы Java и использовать их. Будет открыта проблема, чтобы получить поддержку proto 3 в акке.

Пояснения: Это не protobuf-java версии, но классы, порожденный protoc двоичной 3,0

+0

Akka 2.4.x внутренне использует затененную версию Protobuf 2.5 и не должен вмешиваться в вашу собственную зависимость Protobuf 3.x. Вы добавили 3.x зависит от вашего проекта? –

+0

У меня есть зависимость от выполнения до 3.0.0. Это не java lib, но классы, сгенерированные с помощью protoc-компилятора v2 (works) v3 (ClassNotFoundException) –

+0

У меня тоже такая же проблема – Matt

0

Вы должны явно связать классы Lite и V3 для вашего прот сериализатора:

akka { 
    loglevel = "INFO" 

    actor { 
    provider = "akka.remote.RemoteActorRefProvider" 

    serializers { 
     proto = "akka.remote.serialization.ProtobufSerializer" # or use your own.. 
    } 

    serialization-bindings { 
     "com.google.protobuf.Message" = proto 
     "com.google.protobuf.GeneratedMessageLite" = proto 
     "com.google.protobuf.GeneratedMessageV3" = proto 
    } 
    }