2015-03-22 2 views
0

Я использую akka BalancingPool для распределения задач над работниками. Он работает очень хорошо, пока я не добавлю/удаляю работников в пул. Я хочу, потому что некоторые из рабочих ненадежны и плохо работают. Однако балансирующий пул отправляет все сообщения только одному работнику после замены.Замена работников в BalancingPool

Вот тест для этого Scala

import scala.concurrent.duration._ 
import org.scalatest._ 
import akka.util.Timeout 
import akka.actor._ 
import akka.routing._ 
import akka.testkit._ 

class BalancingPoolSpec extends TestKit(ActorSystem("BalancingPoolSpec")) with ImplicitSender 
    with WordSpecLike with Matchers with BeforeAndAfterAll { 

    override def afterAll { 
    TestKit.shutdownActorSystem(system) 
    } 

    val numberOfTestMessages = 5 
    val numberOfWorkers = 3 
    val pool = system.actorOf(BalancingPool(numberOfWorkers).props(Props[Worker]), "pool") 

    def sendMessagesAndCollectStatistic = { 
    for (i <- 1 to numberOfTestMessages) pool ! "task" 
    (currentRoutes, collectResponces) 
    } 

    def collectResponces = receiveN(numberOfTestMessages, 10.second).groupBy(l => l).map(t => (t._1, t._2.length)) 

    def currentRoutes = { 
    pool ! GetRoutees 
    val Routees(routees) = expectMsgAnyClassOf(classOf[Routees]) 
    routees 
    } 

    def replaceWorkers(oldRoutees: Seq[Routee]) = { 
    //Adding new Routees before removing old ones to make it work :) 
    for (i <- 1 to numberOfWorkers) pool ! AddRoutee(ActorRefRoutee(system.actorOf(Props[Worker]))) 
    for (r <- oldRoutees) pool ! RemoveRoutee(r) 
    Thread.sleep(500) //Give some time to BalancingPool 
    } 

    "test" in { 
    val (routees1, responces1) = sendMessagesAndCollectStatistic 
    replaceWorkers(routees1) 
    val (routees2, responces2) = sendMessagesAndCollectStatistic 

    assert(responces2.size > 1 , s""" 
     Before replacement distribution over ${routees1.size} workers: ${responces1} 
     After replacement distribution over ${routees2.size} workers: ${responces2}""") 
    } 
} 


//For each task worker simulate some work for 1 second and sends back to sender worker's id 
object Worker { 
    var i = 0 
    def newId = synchronized { 
    i += 1 
    i 
    } 
} 

class Worker extends Actor { 
    val id = Worker.newId 
    def receive = { 
    case _ => Thread.sleep(1000); sender ! id 
    } 
} 

Failing сообщения

1 was not greater than 1 
    Before replacement distribution over 3 workers: Map(2 -> 2, 1 -> 1, 3 -> 2) 
    After replacement distribution over 3 workers: Map(4 -> 5) 

Поэтому, прежде чем задачи замен были распределены более 3 рабочих, после того, как все 5 заданий пошли к одному работнику. Предполагается ли BalancingPool обрабатывать сообщения AddRoutee/RemoveRoutee ожидаемым образом?

+0

Более общий вопрос http://stackoverflow.com/questions/29161763/replacing-bad-performing-workers-in-pool –

+0

Тест работает для '' RoundRobinPool' и RandomPool'. –

ответ

0

Из ответа по Patrik Нордвалла в akka user group:

Причина заключается в том, что BalancingPool использовать специальный диспетчер, когда он создает routees. Здесь вы создаете и добавляете маршруты без этого. Попробуйте это вместо:

def replaceWorkers(oldRoutees: Seq[Routee]): Unit = { 
    pool ! AdjustPoolSize(-numberOfWorkers) 
    pool ! AdjustPoolSize(numberOfWorkers) 
    Thread.sleep(500) //Give some time to BalancingPool 
    } 

Однако, не было бы лучше, чтобы позволить routees сгенерирует исключение и тем самым перезагрузка?

Смежные вопросы