Вам могут потребоваться два актера: один (координатор) отправит клиентам уведомления о командах чата. Другой (дроссель) - будет передавать данные в базу данных каждые 2 минуты. Ваша очередь будет только внутреннее состояние throttler:
class Coordinator extends Actor {
def receive = {
case command: Record =>
broadcast(command)
throttler ! command
}
}
class Throttler extends Actor {
import system.dispatcher
val queue = mutable.List[Record] //it may be a cache instead
def schedule = system.scheduler.scheduleOnce(2 minutes, self, Tick) // http://doc.akka.io/docs/akka/snapshot/scala/scheduler.html
def receive = {
case Start => schedule
case command: Record =>
queue ++= command
case Tick =>
schedule
try {
//---open transaction here---
for (r <- queue) push(r)
//---close transaction here---
queue.clear //will not be cleared in case of exception
} catch {...}
}
}
Вы также можете использовать FSM-based implementation как сказал @abatyuk.
Если вам нужно уменьшить нагрузку на почтовые ящики - вы можете попробовать некоторые противодавления/балансировки, такие как Akka Work Pulling.
Если вы хотите защитить сам узел (чтобы восстановить состояние очереди, если некоторые из узлов вашего сервера не работают), вы можете использовать Akka Cluster для репликации (вручную) состояния очереди. В этом случае координатор должен быть Cluster Singleton и должен отправлять тики сам по себе случайному игроку (вы можете использовать для этого автобус) и сохранять свои успехи и неудачи в качестве наблюдателя. Обратите внимание, что состояние супервизора может быть потеряно, поэтому вы также должны реплицировать его через узлы (и объединяться между ними каждые 2 минуты, поэтому лучше использовать SortedSet
для очередей, потому что слияние будет чем-то вроде sets.reduce(_ ++ _)
).
Склады, такие как Riak, уже предлагают простой способ решить clusterization problem, так что вы можете использовать их в качестве очередей (и оба координатора и дросселя будут «без гражданства»). В случае Riak вы можете настроить его как «Доступно + Разделение» (см. CAP-теорему), потому что слияние данных здесь не является проблемой - ваша история чата - это тип данных CRDT(conflict-free).
Другим решением является кеш с режимом WriteBehind (настроенный на запуск каждые 2 минуты) в качестве дросселя.
Событие sourcing может также защитить состояние вашего актора, но это более полезно, когда вам нужно повторить все действия после восстановления (вам это не нужно - он будет повторно отправлять все в базу данных). Вы можете использовать моментальные снимки (это почти так же, как использование кеша напрямую), но лучше сохранить их в кеш (путем реализации SnapshotStore) вместо локальной FS, если вам нужна доступность. Обратите внимание, что вам также может потребоваться удалить ранее сохраненные снимки, чтобы уменьшить размер хранилища. И вы должны синхронно сохранять каждую запись, чтобы избежать потери состояния.
P.S. Не забудьте подтвердить сообщение отправителю (на ваш javascript) или потерять последние сообщения (в почтовых ящиках) даже с кешем в очереди.
P.S/2 База данных почти всегда является плохим решением для сохранения состояния актера, потому что она медленная и может стать недоступной. Я бы также не рекомендовал сильные последовательные решения NoSQL, такие как MongoDB - возможная согласованность - лучший выбор в вашем случае.
Взгляните на http://letitcrash.com/post/28901663062/throttling-messages-in-akka-2 - это на самом деле № 4-5 из вашего варианта использования – abatyuk