2013-12-15 5 views
4

У меня есть цепочка акк Акка K -> L -> M, где K отправляет сообщения L, L отправляет сообщение в M, и каждый может ответить своему предшественнику, используя sender полученного сообщения.Как очистить/безопасно удалить ссылку из асинхронной цепочки связи

Мой вопрос заключается в следующем: как мы можем безопасно отсоединить L от цепи, чтобы после операции K посылал сообщения непосредственно M, а M видит K как свой sender?

Если каждый актер был стабильным и долгоживущим, я мог бы разобраться, как сообщить K и M поговорить друг с другом. L мог зависнуть достаточно долго, чтобы пересылать сообщения, уже находящиеся в ее почтовом ящике, до получения сигнала о том, что каждый из K и M больше не разговаривает с L.

Однако K и M также могут подумать о том, чтобы развязать себя, и вот где все становится волосатым.

Есть ли известный протокол для безопасной работы? До сих пор я не нашел правильных условий поиска.

Я знаю, что одним из вариантов было бы заморозить систему и скопировать всю вещь, минус L, в новую цепочку. Тем не менее, я хотел бы реализовать более прерывистое решение в режиме реального времени.

Я думал о каком-то обмене замками, в котором K и M обещают не отсоединиться, пока L не закончит отмену и пересылку всех сообщений. Но любое решение со словом «блокировка» кажется неудобным в асинхронном решении, хотя актеры не будут полностью заблокированы (просто откладывают свои собственные операции разблокировки до удобного времени).

+1

Что вы пытаетесь достичь? Какая большая картина здесь вы ищете, удалив ссылку из асинхронной цепи обмена? –

+1

Будет ли это работать, если вы замените L прокси-сервером с состоянием, который сначала делегирует ваш текущий L для обработки, но в какой-то момент забывает об этом делетете и просто передает сообщения непосредственно между K и M. У вас все еще есть 3 участника, но вы эффективно превращая один из них в no-op в какой-то момент. – erickson

+1

В случае, если вы хотите поменять L с помощью M, ваш актер K может отправить сообщение от Z до L, после чего он может прекратить отправку сообщений и дождаться подтверждения сообщения X.Ваш актер L в случае сообщения Z (если вы используете почтовый ящик на основе приоритета, не забудьте установить ниже обычного приоритета для Z), вы можете отправить подтверждающее сообщение X в K (и, при необходимости, остановить его). После получения X K можно безопасно отправлять сообщения непосредственно на M. Вы можете использовать [FSM] (http://doc.akka.io/docs/akka/snapshot/scala/fsm.html) для реализации государственной логики. –

ответ

1

Общей идеей для решения было бы поддерживать состояние в каждом узле для предыдущего и следующего, а затем поддерживать сообщения для развязывания узла из цепочки, а также сообщать узлу, что его следующий узел изменился. Когда следующий узел изменится, отправьте ядовитую таблетку на узел, который сообщил вам об этом (при условии, что он несвязан), чтобы изящно остановить его. . Когда отсоединили и до остановлено, действуют как чистая передача для любых более сообщений, которые могут возникнуть в Собирает все вместе, код будет выглядеть примерно так:

object ChainNode { 
    case object Unlink 
    case class Link(prev:Option[ActorRef], next:Option[ActorRef]) 
    case class ChangeNextNode(node:Option[ActorRef]) 
} 

trait ChainNode extends Actor{ 
    import ChainNode._ 
    import context._ 

    override def postStop{ 
    println(s"${self.path} has been stopped") 
    } 

    def receive = chainReceive() 

    def chainReceive(prevNode:Option[ActorRef] = None, nextNode:Option[ActorRef] = None):Receive = { 
    case Unlink => 
     prevNode foreach{ node => 
     println(s"unlinking node ${self.path} from sender ${node.path}") 
     node ! ChangeNextNode(nextNode) 
     } 
     become(unlinked(nextNode)) 

    case Link(prev, next) => 
     println(s"${self.path} is linking to $prev and $next") 
     become(chainReceive(prev, next)) 

    case ChangeNextNode(newNext) => 
     println(s"${self.path} is changing next node to $newNext") 
     become(chainReceive(prevNode, newNext)) 
     sender ! PoisonPill 

    case other =>  
     println(s"${self.path} received message $other") 
     val msg = processOther(other) 
     nextNode foreach{ node => 
     println(s"${self.path} forwarding on to ${node.path}") 
     node ! msg 
     } 
    } 

    def unlinked(nextNode:Option[ActorRef]):Receive = { 
    case any => 
     println(s"${self.path} has been unlinked, just forwarding w/o processing...") 
     nextNode foreach (_ ! any)  
    } 

    def processOther(msg:Any):Any 
} 

class NodeA extends ChainNode{ 
    def processOther(msg:Any) = "foo" 
} 

class NodeB extends ChainNode{ 
    def processOther(msg:Any) = "bar" 
} 

class NodeC extends ChainNode{ 
    def processOther(msg:Any) = "baz" 
} 

Затем простой сценарий теста, где связывание изменен в середине пути:

object ChainTest{ 
    import ChainNode._ 

    def main(args: Array[String]) { 
    val system = ActorSystem("chain") 
    val a = system.actorOf(Props[NodeA]) 
    val b = system.actorOf(Props[NodeA]) 
    val c = system.actorOf(Props[NodeA]) 

    a ! Link(None, Some(b)) 
    b ! Link(Some(a), Some(c)) 
    c ! Link(Some(b), None) 

    import system.dispatcher 
    Future{ 
     for(i <- 1 until 10){ 
     a ! "hello" 
     Thread.sleep(200) 
     } 
    } 

    Future{ 
     Thread.sleep(300) 
     b ! Unlink 
    } 
    } 
} 

Это не идеальный вариант, но это может послужить хорошей отправной точкой для вас. Одна из недостатков заключается в том, что сообщения, такие как Unlink и ChangeNextNode, будут обрабатываться в том порядке, в котором они были получены. Если в почтовых ящиках есть куча сообщений, они должны быть обработаны до того, как изменение (например, отмена) вступит в силу. Это может привести к нежелательной задержке в внесении изменений. Если это проблема, вы можете захотеть просмотреть почтовый ящик с приоритетом, где сообщения типа Unlink и ChangeNextNode имеют более высокий приоритет, чем остальные обрабатываемые сообщения.

+0

Вы описали почти тот алгоритм, который я получил, когда задал вопрос. Проблема, с которой я сталкиваюсь, заключается в том, что при тестировании под нагрузкой от сотен до тысяч сообщений неактивные/мертвые узлы иногда получают сообщение или два после того, как они получили PoisonPill. (Я заставил их стать неактивными, а не мертвыми, поэтому я могу отказаться от утверждения, если они когда-нибудь получат сообщение.) Я подозреваю, что проблема возникает, когда два последовательных узла одновременно пытаются отключить себя и отправить конфликтующие сообщения в их восходящий поток и нисходящие узлы. Возможно, я переработал его. –

+0

Спасибо, кстати, для написания кода. Я узнал кое-что, просто прочитав его. Например, я не обращал внимания на postStop, и ваша техника стать одним и тем же состоянием, но с разными параметрами была хорошей демонстрацией того, что я только что узнал в другом месте. Благодаря! –

Смежные вопросы