Я испытываю непредвиденное поведение при использовании сохранения Akka. Я довольно новичок в Akka, поэтому извиняюсь заранее, если я пропустил что-то очевидное.Акка persistence receiveRecover получает моментальные снимки из других экземпляров актеров
У меня есть актер, называемый PCNProcessor. Я создаю экземпляр актера для каждого идентификатора PCN, который у меня есть. Проблема, которую я испытываю, заключается в том, что когда я создаю экземпляр первого актера, все работает нормально, и я получаю ответ Обработанный ответ. Однако, когда я создаю дополнительные экземпляры PCNProcessor, использующие разные идентификаторы PCN, я получаю Уже обработанный PCN ответ.
По существу, по какой-то причине снимок, сохраненный как часть первого процессора идентификатора PCN, повторно применяется к последующим экземплярам идентификатора PCN, даже если он не относится к этому PCN, а идентификатор PCN отличается. Чтобы подтвердить это поведение, я распечатал журнал в getRecover, и каждый последующий экземпляр PCNProcessor получает снимки, которые ему не принадлежат.
Мой вопрос:
- Должен ли я хранить снимки определенным образом, чтобы они были ключом против Id PCN? И тогда я должен отфильтровать снимки, которые не связаны с PCN в контексте?
- Или если структура Akka будет заботиться об этом за кулисами, и мне не нужно беспокоиться о сохранении моментальных снимков с идентификатором PCN.
Исходный код для актера внизу. Я использую осколки.
package com.abc.pcn.core.actors
import java.util.UUID
import akka.actor._
import akka.persistence.{AtLeastOnceDelivery, PersistentActor, SnapshotOffer}
import com.abc.common.AutoPassivation
import com.abc.pcn.core.events.{PCNNotProcessedEvt, PCNProcessedEvt}
object PCNProcessor {
import akka.contrib.pattern.ShardRegion
import com.abc.pcn.core.PCN
val shardName = "pcn"
val idExtractor: ShardRegion.IdExtractor = {
case ProcessPCN(pcn) => (pcn.id.toString, ProcessPCN(pcn))
}
val shardResolver: ShardRegion.ShardResolver = {
case ProcessPCN(pcn) => pcn.id.toString
}
// shard settings
def props = Props(classOf[PCNProcessor])
// command and response
case class ProcessPCN(pcn: PCN)
case class NotProcessed(reason: String)
case object Processed
}
class PCNProcessor
extends PersistentActor
with AtLeastOnceDelivery
with AutoPassivation
with ActorLogging {
import com.abc.pcn.core.actors.PCNProcessor._
import scala.concurrent.duration._
context.setReceiveTimeout(10.seconds)
private val pcnId = UUID.fromString(self.path.name)
private var state: String = "not started"
override def persistenceId: String = "pcn-processor-${pcnId.toString}"
override def receiveRecover: Receive = {
case SnapshotOffer(_, s: String) =>
log.info("Recovering. PCN ID: " + pcnId + ", State to restore: " + s)
state = s
}
def receiveCommand: Receive = withPassivation {
case ProcessPCN(pcn)
if state == "processed" =>
sender ! Left(NotProcessed("Already processed PCN"))
case ProcessPCN(pcn)
if pcn.name.isEmpty =>
val error: String = "Name is invalid"
persist(PCNNotProcessedEvt(pcn.id, error)) { evt =>
state = "invalid"
saveSnapshot(state)
sender ! Left(NotProcessed(error))
}
case ProcessPCN(pcn) =>
persist(PCNProcessedEvt(pcn.id)) { evt =>
state = "processed"
saveSnapshot(state)
sender ! Right(Processed)
}
}
}
Update:
После выхода из метаданных для полученного снимка, я могу видеть, проблема заключается в том, что snapshotterId не решает правильно и всегда быть установлен на PCN-процессор - $ {pcnId.toString} без разрешения курсива курсивом.
[INFO] [06/06/2015 09: 10: 00.329] [ECP-akka.actor.default-dispatcher-16] [akka.tcp: //ECP @127.0.0.1: 2551/user/sharding/pcn/16b3d4dd-9e0b-45de-8e32-de799d21e7c5] Восстановление. PCN ID: 16b3d4dd-9e0b-45de-8e32-de799d21e7c5, Метаданные снимка SnapshotMetadata (PCN-процессор - $ {pcnId.toString}, 1,1433577553585)
Спасибо, что заметили это. Действительно, вы правы. –