2015-06-04 5 views
0

Я испытываю непредвиденное поведение при использовании сохранения Akka. Я довольно новичок в Akka, поэтому извиняюсь заранее, если я пропустил что-то очевидное.Акка persistence receiveRecover получает моментальные снимки из других экземпляров актеров

У меня есть актер, называемый PCNProcessor. Я создаю экземпляр актера для каждого идентификатора PCN, который у меня есть. Проблема, которую я испытываю, заключается в том, что когда я создаю экземпляр первого актера, все работает нормально, и я получаю ответ Обработанный ответ. Однако, когда я создаю дополнительные экземпляры PCNProcessor, использующие разные идентификаторы PCN, я получаю Уже обработанный PCN ответ.

По существу, по какой-то причине снимок, сохраненный как часть первого процессора идентификатора PCN, повторно применяется к последующим экземплярам идентификатора PCN, даже если он не относится к этому PCN, а идентификатор PCN ​​отличается. Чтобы подтвердить это поведение, я распечатал журнал в getRecover, и каждый последующий экземпляр PCNProcessor получает снимки, которые ему не принадлежат.

Мой вопрос:

  1. Должен ли я хранить снимки определенным образом, чтобы они были ключом против Id PCN? И тогда я должен отфильтровать снимки, которые не связаны с PCN в контексте?
  2. Или если структура Akka будет заботиться об этом за кулисами, и мне не нужно беспокоиться о сохранении моментальных снимков с идентификатором PCN.

Исходный код для актера внизу. Я использую осколки.


package com.abc.pcn.core.actors 

import java.util.UUID 

import akka.actor._ 
import akka.persistence.{AtLeastOnceDelivery, PersistentActor, SnapshotOffer} 
import com.abc.common.AutoPassivation 
import com.abc.pcn.core.events.{PCNNotProcessedEvt, PCNProcessedEvt} 

object PCNProcessor { 

    import akka.contrib.pattern.ShardRegion 
    import com.abc.pcn.core.PCN 

    val shardName = "pcn" 
    val idExtractor: ShardRegion.IdExtractor = { 
    case ProcessPCN(pcn) => (pcn.id.toString, ProcessPCN(pcn)) 
    } 
    val shardResolver: ShardRegion.ShardResolver = { 
    case ProcessPCN(pcn) => pcn.id.toString 
    } 

    // shard settings 
    def props = Props(classOf[PCNProcessor]) 

    // command and response 
    case class ProcessPCN(pcn: PCN) 

    case class NotProcessed(reason: String) 

    case object Processed 

} 

class PCNProcessor 
    extends PersistentActor 
    with AtLeastOnceDelivery 
    with AutoPassivation 
    with ActorLogging { 

    import com.abc.pcn.core.actors.PCNProcessor._ 

    import scala.concurrent.duration._ 

    context.setReceiveTimeout(10.seconds) 

    private val pcnId = UUID.fromString(self.path.name) 
    private var state: String = "not started" 

    override def persistenceId: String = "pcn-processor-${pcnId.toString}" 

    override def receiveRecover: Receive = { 
    case SnapshotOffer(_, s: String) => 
     log.info("Recovering. PCN ID: " + pcnId + ", State to restore: " + s) 
     state = s 
    } 

    def receiveCommand: Receive = withPassivation { 

    case ProcessPCN(pcn) 
     if state == "processed" => 
     sender ! Left(NotProcessed("Already processed PCN")) 

    case ProcessPCN(pcn) 
     if pcn.name.isEmpty => 
     val error: String = "Name is invalid" 
     persist(PCNNotProcessedEvt(pcn.id, error)) { evt => 
     state = "invalid" 
     saveSnapshot(state) 
     sender ! Left(NotProcessed(error)) 
     } 

    case ProcessPCN(pcn) => 
     persist(PCNProcessedEvt(pcn.id)) { evt => 
     state = "processed" 
     saveSnapshot(state) 
     sender ! Right(Processed) 
     } 
    } 
} 

Update:

После выхода из метаданных для полученного снимка, я могу видеть, проблема заключается в том, что snapshotterId не решает правильно и всегда быть установлен на PCN-процессор - $ {pcnId.toString} без разрешения курсива курсивом.

[INFO] [06/06/2015 09: 10: 00.329] [ECP-akka.actor.default-dispatcher-16] [akka.tcp: //ECP @127.0.0.1: 2551/user/sharding/pcn/16b3d4dd-9e0b-45de-8e32-de799d21e7c5] Восстановление. PCN ID: 16b3d4dd-9e0b-45de-8e32-de799d21e7c5, Метаданные снимка SnapshotMetadata (PCN-процессор - $ {pcnId.toString}, 1,1433577553585)

ответ

1

Я думаю, что вы неправильно используете функцию интерполяции строк Scala.
Try следующим образом:

override def persistenceId: String = s"pcn-processor-${pcnId.toString}" 

Пожалуйста, обратите внимание на использование s перед строкового литерала.

+0

Спасибо, что заметили это. Действительно, вы правы. –

0

Хорошо фиксируется, изменив идентификатор персистентности к следующей строке:

override def persistenceId: String = "pcn-processor-" + pcnId.toString 

оригинал в струнной версии:

override def persistenceId: String = "pcn-processor-${pcnId.toString}" 

работает только для сохраняющихся в журнале, но не для снимков.

Смежные вопросы