2014-12-20 2 views
2

Предположим, у меня есть приложение для чата.Сообщения процесса в определенное время надежно

Клиент отправляет сообщение в чат, что привело к некоторой команде какого-то Актера. Теперь я хочу обработать то, что он написал, и сделать его доступным для других пользователей в этом чате, поэтому я обрабатываю эту команду. В то же время я хочу сказать себе (актеру), что мне нужно сохранить это сообщение в базе данных истории чата, но не сейчас. Сохранение в базу данных должно происходить каждые 2 минуты. И если бы произошел сбой, я все равно мог бы оставаться в базе данных.

Я предполагаю, что рабочий процесс будет так:

  1. Пользователь отправить сообщение
  2. чат актер получил команду с этим сообщением
  3. Мы вещания это сообщение всем, и добавить это сообщение в какой-то очередь, чтобы сохранить его в базе данных истории чата.
  4. Некоторая команда persist запускается при прохождении 2-минутного таймаута. Он собирает все входящие сообщения чата, которые еще не были сохранены в порядке их прибытия.
  5. Запуск транзакции со всеми сообщениями, а затем удаление их из очереди.
  6. Если произошел сбой где-то после 3 и сообщения не сохранялись, я должен попытаться их снова сохранить. Если бы они были сохранены, я бы никогда не попытался их повторить.

Как построить что-то подобное в Акке? Какие функции следует использовать/какие шаблоны?

+2

Взгляните на http://letitcrash.com/post/28901663062/throttling-messages-in-akka-2 - это на самом деле № 4-5 из вашего варианта использования – abatyuk

ответ

5

Вам могут потребоваться два актера: один (координатор) отправит клиентам уведомления о командах чата. Другой (дроссель) - будет передавать данные в базу данных каждые 2 минуты. Ваша очередь будет только внутреннее состояние throttler:

class Coordinator extends Actor { 
    def receive = { 
    case command: Record => 
      broadcast(command) 
      throttler ! command 
    } 
} 


class Throttler extends Actor { 

    import system.dispatcher 

    val queue = mutable.List[Record] //it may be a cache instead 

    def schedule = system.scheduler.scheduleOnce(2 minutes, self, Tick) // http://doc.akka.io/docs/akka/snapshot/scala/scheduler.html 


    def receive = { 
     case Start => schedule 
     case command: Record => 
      queue ++= command 
     case Tick => 
      schedule 
      try { 
      //---open transaction here--- 
      for (r <- queue) push(r) 
      //---close transaction here--- 
      queue.clear //will not be cleared in case of exception 
      } catch {...} 
    } 
} 

Вы также можете использовать FSM-based implementation как сказал @abatyuk.

Если вам нужно уменьшить нагрузку на почтовые ящики - вы можете попробовать некоторые противодавления/балансировки, такие как Akka Work Pulling.

Если вы хотите защитить сам узел (чтобы восстановить состояние очереди, если некоторые из узлов вашего сервера не работают), вы можете использовать Akka Cluster для репликации (вручную) состояния очереди. В этом случае координатор должен быть Cluster Singleton и должен отправлять тики сам по себе случайному игроку (вы можете использовать для этого автобус) и сохранять свои успехи и неудачи в качестве наблюдателя. Обратите внимание, что состояние супервизора может быть потеряно, поэтому вы также должны реплицировать его через узлы (и объединяться между ними каждые 2 минуты, поэтому лучше использовать SortedSet для очередей, потому что слияние будет чем-то вроде sets.reduce(_ ++ _)).

Склады, такие как Riak, уже предлагают простой способ решить clusterization problem, так что вы можете использовать их в качестве очередей (и оба координатора и дросселя будут «без гражданства»). В случае Riak вы можете настроить его как «Доступно + Разделение» (см. CAP-теорему), потому что слияние данных здесь не является проблемой - ваша история чата - это тип данных CRDT(conflict-free).

Другим решением является кеш с режимом WriteBehind (настроенный на запуск каждые 2 минуты) в качестве дросселя.

Событие sourcing может также защитить состояние вашего актора, но это более полезно, когда вам нужно повторить все действия после восстановления (вам это не нужно - он будет повторно отправлять все в базу данных). Вы можете использовать моментальные снимки (это почти так же, как использование кеша напрямую), но лучше сохранить их в кеш (путем реализации SnapshotStore) вместо локальной FS, если вам нужна доступность. Обратите внимание, что вам также может потребоваться удалить ранее сохраненные снимки, чтобы уменьшить размер хранилища. И вы должны синхронно сохранять каждую запись, чтобы избежать потери состояния.

P.S. Не забудьте подтвердить сообщение отправителю (на ваш javascript) или потерять последние сообщения (в почтовых ящиках) даже с кешем в очереди.

P.S/2 База данных почти всегда является плохим решением для сохранения состояния актера, потому что она медленная и может стать недоступной. Я бы также не рекомендовал сильные последовательные решения NoSQL, такие как MongoDB - возможная согласованность - лучший выбор в вашем случае.

+0

Есть ли способ использовать существующее сохранение Акки (События и т. Д.) Для защиты состояния очереди? – bobby

+0

вы можете использовать Accka Persistence (saveSnapshot), но проблемы такие же, как и с DB - это может быть довольно медленно. А также вам придется синхронно сохранять каждую запись, чтобы избежать потери состояния, и, вероятно, вам придется отправить ее в БД или кеш (локальные файлы могут быть ненадежными для вас), поэтому на самом деле нет большой разницы. Обратите внимание, что это не решит проблему потенциальной доступности (для этого вам все еще нужен кластер). – dk14

+0

Могу ли я использовать deleteMessages, чтобы он не повторил все действия? Тогда мне не понадобится saveSnapshot. – bobby

Смежные вопросы