Я использую akka BalancingPool для распределения задач над работниками. Он работает очень хорошо, пока я не добавлю/удаляю работников в пул. Я хочу, потому что некоторые из рабочих ненадежны и плохо работают. Однако балансирующий пул отправляет все сообщения только одному работнику после замены.Замена работников в BalancingPool
Вот тест для этого Scala
import scala.concurrent.duration._
import org.scalatest._
import akka.util.Timeout
import akka.actor._
import akka.routing._
import akka.testkit._
class BalancingPoolSpec extends TestKit(ActorSystem("BalancingPoolSpec")) with ImplicitSender
with WordSpecLike with Matchers with BeforeAndAfterAll {
override def afterAll {
TestKit.shutdownActorSystem(system)
}
val numberOfTestMessages = 5
val numberOfWorkers = 3
val pool = system.actorOf(BalancingPool(numberOfWorkers).props(Props[Worker]), "pool")
def sendMessagesAndCollectStatistic = {
for (i <- 1 to numberOfTestMessages) pool ! "task"
(currentRoutes, collectResponces)
}
def collectResponces = receiveN(numberOfTestMessages, 10.second).groupBy(l => l).map(t => (t._1, t._2.length))
def currentRoutes = {
pool ! GetRoutees
val Routees(routees) = expectMsgAnyClassOf(classOf[Routees])
routees
}
def replaceWorkers(oldRoutees: Seq[Routee]) = {
//Adding new Routees before removing old ones to make it work :)
for (i <- 1 to numberOfWorkers) pool ! AddRoutee(ActorRefRoutee(system.actorOf(Props[Worker])))
for (r <- oldRoutees) pool ! RemoveRoutee(r)
Thread.sleep(500) //Give some time to BalancingPool
}
"test" in {
val (routees1, responces1) = sendMessagesAndCollectStatistic
replaceWorkers(routees1)
val (routees2, responces2) = sendMessagesAndCollectStatistic
assert(responces2.size > 1 , s"""
Before replacement distribution over ${routees1.size} workers: ${responces1}
After replacement distribution over ${routees2.size} workers: ${responces2}""")
}
}
//For each task worker simulate some work for 1 second and sends back to sender worker's id
object Worker {
var i = 0
def newId = synchronized {
i += 1
i
}
}
class Worker extends Actor {
val id = Worker.newId
def receive = {
case _ => Thread.sleep(1000); sender ! id
}
}
Failing сообщения
1 was not greater than 1
Before replacement distribution over 3 workers: Map(2 -> 2, 1 -> 1, 3 -> 2)
After replacement distribution over 3 workers: Map(4 -> 5)
Поэтому, прежде чем задачи замен были распределены более 3 рабочих, после того, как все 5 заданий пошли к одному работнику. Предполагается ли BalancingPool обрабатывать сообщения AddRoutee/RemoveRoutee
ожидаемым образом?
Более общий вопрос http://stackoverflow.com/questions/29161763/replacing-bad-performing-workers-in-pool –
Тест работает для '' RoundRobinPool' и RandomPool'. –