3

Мне известно, что с Akka 2.4.16 не существует «удаленной» реализации реактивных потоков. Спецификация фокусируется на потоке, запущенном на одной JVM.Способы поддержания противодавления в потоках Akka с участием нескольких JVM

Однако, рассматривая прецедент для использования другой JVM для некоторой обработки при сохранении противодавления. Идея состоит в том, чтобы иметь основное приложение, которое предоставляет пользовательский интерфейс, запускающий поток. Например, этот поток имеет этап, выполняющий некоторые тяжелые вычисления, которые должны запускаться на другой машине. Я заинтересован в способах запуска потоков в распределенном образом - я наткнулся на несколько статей, указывая на некоторые идеи:

Какие альтернативы существуют? Имеются ли существенные минусы вышеизложенного? Любые особенности, которые следует учитывать?

Обновление: этот вопрос не ограничивается одним вариантом использования. Обычно меня интересуют все возможные способы работы с потоками в распределенной среде. Это означает, например, он может включать только один поток, который объединяет субъектов с .mapAsync или, например, на двух машинах, работающих через Akka HTTP, могут быть два отдельных потока. Единственное требование заключается в том, что противодавление должно быть соблюдено всеми компонентами.

+1

Я думаю, что вы неправильно понимаете что-то. Итак ... как у вас есть поток inter-jvm? Ну ... имея компоненты, которые на самом деле находятся в разных jvm. Теперь вам нужно понять, что те компоненты в этом конкретном случае будут Актерами. Итак ... вам просто нужно создать FlowShape/Sink/Source с помощью какого-то «удаленного актера», и артерия позаботится о передаче сообщений. –

+0

Я полностью согласен с вашим комментарием - согласно сообщению в блоге, Артерия поддерживает обратное давление, когда эти два актера общаются друг с другом. Мой вопрос скорее направлен на понимание того, например, использование '.mapAsync' для интеграции удаленных участников в потоке имеет тот же результат: наличие потока, который обрабатывает что-то на другой машине. В более общем плане: Каковы способы реализации потоков, пересекающих границы JVM? – Toaditoad

ответ

1

Ну ... Кажется, мне придется добавить пример для этого. Единственное, что вам нужно понять, это то, что BackPressure обрабатывается AsyncBoundries в GraphStages. Это действительно не имеет никакого отношения к компоненту, существующему где-то еще. Также ... Это не зависит от Артерии, которая не что иное, как новый удаленный транспорт.

Вот пример, вероятно простейшего потока кросс-Jvm,

Первое применение,

import akka.actor.{Actor, ActorLogging, ActorSystem, Props} 
import akka.actor.Actor.Receive 
import com.typesafe.config.{Config, ConfigFactory} 

class MyActor extends Actor with ActorLogging { 
    override def receive: Receive = { 
    case msg @ _ => { 
     log.info(msg.toString) 
     sender() ! msg 
    } 
    } 
} 

object MyApplication extends App { 

    val config = ConfigFactory.parseString(
    """ 
     |akka{ 
     | actor { 
     | provider = remote 
     | } 
     | remote { 
     | enabled-transports = ["akka.remote.netty.tcp"] 
     | untrusted-mode = off 
     | netty.tcp { 
     |  hostname="127.0.0.1" 
     |  port=18000 
     | } 
     | } 
     |} 
    """.stripMargin 
) 

    val actorSystem = ActorSystem("my-actor-system", config) 

    var myActor = actorSystem.actorOf(Props(classOf[MyActor]), "my-actor") 

} 

И Второе приложение ... на самом деле «работает» поток, который использует актер в первом приложении ,

import akka.actor.{ActorPath, ActorSystem} 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Flow, Keep, Sink, Source} 
import akka.pattern.ask 
import com.typesafe.config.ConfigFactory 

import scala.language.postfixOps 
import scala.concurrent.duration._ 

object YourApplication extends App { 

    val config = ConfigFactory.parseString(
    """ 
     |akka{ 
     | actor { 
     | provider = remote 
     | } 
     | remote { 
     | enabled-transports = ["akka.remote.netty.tcp"] 
     | untrusted-mode = off 
     | netty.tcp { 
     |  hostname="127.0.0.1" 
     |  port=19000 
     | } 
     | } 
     |} 
    """.stripMargin 
) 

    val actorSystem = ActorSystem("your-actor-system", config) 

    import actorSystem.dispatcher 

    val logger = actorSystem.log 

    implicit val implicitActorSystem = actorSystem 
    implicit val actorMaterializer = ActorMaterializer() 

    val myActorPath = ActorPath.fromString("akka.tcp://[email protected]:18000/user/my-actor") 

    val myActorSelection = actorSystem.actorSelection(myActorPath) 

    val source = Source(1 to 10) 

    // here this "mapAsync" wraps the given T => Future[T] function in a GraphStage 
    val myRemoteComponent = Flow[Int].mapAsync(2)(i => { 
    myActorSelection.resolveOne(1 seconds).flatMap(myActorRef => 
     (myActorRef.ask(i)(1 seconds)).map(x => x.asInstanceOf[Int]) 
    ) 
    }) 

    val sink = Sink.foreach[Int](i => logger.info(i.toString)) 

    val stream = source.via(myRemoteComponent).toMat(sink)(Keep.right) 

    val streamRun = stream.run() 

} 
+0

Благодарим вас за этот пример. Как сказал в моем вопросе, я знаком с подходом, использующим этап с '.mapAsync'. Но мне интересно, другие альтернативы этому. Например, можно было бы преобразовать ваш пример в нечто, имеющее потоки, подключенные через TCP по Akka HTTP. Я обновляю свой вопрос, чтобы сделать это более ясным ... – Toaditoad

+0

@Toaditoad Пожалуйста, не принимайте этот вопрос в противном случае - Вы новичок в Akka и Scala?Я, откровенно говоря, не понял смысла «потоки, связанные через TCP с помощью Akka HTTP», потому что это не имеет никакого смысла в моем общем понимании TCP + Akka + Http + Akka Streams + Akka HTTP. –

+0

Да, я довольно новичок в этом мире. Я изучаю возможные проекты для обработки видеокадров с актерами и потоками. Поэтому я не ищу только одно рабочее решение, но хочу попробовать некоторые альтернативы. Я еще не начинал с идеи TCP/Akka HTTP, но вы видели ответ Конрада Малавского (http://stackoverflow.com/a/30693174/4169741)? Он один из гуру Акки с Lightbend, и его ответ звучит довольно ясно для меня. – Toaditoad

Смежные вопросы