2013-02-12 2 views
9

Используйте Amazon SWF для обмена сообщениями между серверами?Использование Amazon SWF Для связи между серверами

  1. На сервере AI хочет запустить скрипт
  2. Когда это закончится, я хочу, чтобы отправить сообщение на сервер B, чтобы запустить скрипт B
  3. Если завершается успешно, я хочу, чтобы очистить работу из очереди рабочего процесса

У меня очень трудная работа над тем, как я могу использовать Boto и SWF в комбинации для этого. Я не следую за каким-то полным кодом, но то, что я за ним, это если кто-нибудь может объяснить немного больше о том, что происходит.

  • Как я могу сказать серверу B, чтобы проверить выполнение сценария A?
  • Как я могу убедиться, что сервер не получит завершение сценария A и попробуйте запустить сценарий B (так как сервер B должен запустить это)?
  • Как я могу уведомить SWF скрипта? Является ли ваш флаг или сообщение , или что?

Как вы можете видеть, я действительно очень смущен обо всем этом, если кто-то может пролить свет на это, я был бы очень признателен.

ответ

17

Я думаю, вы задаете очень хорошие вопросы, которые подчеркивают, насколько полезным SWF может быть услуга. Короче говоря, вы не говорите своим серверам координировать работу между собой. Ваш ресивер организует все это для вас, с помощью службы SWF.

Реализация рабочий процесс будет идти следующим образом:

  1. Регистрация рабочего процесса и его деятельности со службой (одноразовым).
  2. Внедрение решения и рабочих.
  3. Пусть ваши рабочие и дебиторы бегут.
  4. Запустите новый рабочий процесс.

Существует несколько способов передачи учетных данных в код boto.swf. Для целей данного упражнения, я рекомендую экспортировать их в окружающую среду, прежде чем запустить код ниже:

export AWS_ACCESS_KEY_ID=<your access key> 
export AWS_SECRET_ACCESS_KEY=<your secret key> 

1) Для того, чтобы зарегистрировать домен, рабочий процесс и деятельность выполнить следующие действия:

# ab_setup.py 
import boto.swf.layer2 as swf 

DOMAIN = 'stackoverflow' 
ACTIVITY1 = 'ServerAActivity' 
ACTIVITY2 = 'ServerBActivity' 
VERSION = '1.0' 

swf.Domain(name=DOMAIN).register() 
swf.ActivityType(domain=DOMAIN, name=ACTIVITY1, version=VERSION, task_list='a_tasks').register() 
swf.ActivityType(domain=DOMAIN, name=ACTIVITY2, version=VERSION, task_list='b_tasks').register() 
swf.WorkflowType(domain=DOMAIN, name='MyWorkflow', version=VERSION, task_list='default_tasks').register() 

2) Реализовать и запускать решателей и рабочих.

# ab_decider.py 
import time 
import boto.swf.layer2 as swf 

DOMAIN = 'stackoverflow' 
ACTIVITY1 = 'ServerAActivity' 
ACTIVITY2 = 'ServerBActivity' 
VERSION = '1.0' 

class ABDecider(swf.Decider): 

    domain = DOMAIN 
    task_list = 'default_tasks' 
    version = VERSION 

    def run(self): 
     history = self.poll() 
     # Print history to familiarize yourself with its format. 
     print history 
     if 'events' in history: 
      # Get a list of non-decision events to see what event came in last. 
      workflow_events = [e for e in history['events'] 
           if not e['eventType'].startswith('Decision')] 
      decisions = swf.Layer1Decisions() 
      # Record latest non-decision event. 
      last_event = workflow_events[-1] 
      last_event_type = last_event['eventType'] 
      if last_event_type == 'WorkflowExecutionStarted': 
       # At the start, get the worker to fetch the first assignment. 
       decisions.schedule_activity_task('%s-%i' % (ACTIVITY1, time.time()), 
        ACTIVITY1, VERSION, task_list='a_tasks') 
      elif last_event_type == 'ActivityTaskCompleted': 
       # Take decision based on the name of activity that has just completed. 
       # 1) Get activity's event id. 
       last_event_attrs = last_event['activityTaskCompletedEventAttributes'] 
       completed_activity_id = last_event_attrs['scheduledEventId'] - 1 
       # 2) Extract its name. 
       activity_data = history['events'][completed_activity_id] 
       activity_attrs = activity_data['activityTaskScheduledEventAttributes'] 
       activity_name = activity_attrs['activityType']['name'] 
       # 3) Optionally, get the result from the activity. 
       result = last_event['activityTaskCompletedEventAttributes'].get('result') 

       # Take the decision. 
       if activity_name == ACTIVITY1: 
        # Completed ACTIVITY1 just came in. Kick off ACTIVITY2. 
        decisions.schedule_activity_task('%s-%i' % (ACTIVITY2, time.time()), 
         ACTIVITY2, VERSION, task_list='b_tasks', input=result) 
       elif activity_name == ACTIVITY2: 
        # Server B completed activity. We're done. 
        decisions.complete_workflow_execution() 

      self.complete(decisions=decisions) 
      return True 

Рабочие намного проще, вам не нужно использовать наследование, если вы этого не хотите.

# ab_worker.py 
import os 
import time 
import boto.swf.layer2 as swf 

DOMAIN = 'stackoverflow' 
ACTIVITY1 = 'ServerAActivity' 
ACTIVITY2 = 'ServerBActivity' 
VERSION = '1.0' 

class MyBaseWorker(swf.ActivityWorker): 

    domain = DOMAIN 
    version = VERSION 
    task_list = None 

    def run(self): 
     activity_task = self.poll() 
     print activity_task 
     if 'activityId' in activity_task: 
      # Get input. 
      # Get the method for the requested activity. 
      try: 
       self.activity(activity_task.get('input')) 
      except Exception, error: 
       self.fail(reason=str(error)) 
       raise error 

      return True 

    def activity(self, activity_input): 
     raise NotImplementedError 

class WorkerA(MyBaseWorker): 
    task_list = 'a_tasks' 

    def activity(self, activity_input): 
     result = str(time.time()) 
     print 'worker a reporting time: %s' % result 
     self.complete(result=result) 

class WorkerB(MyBaseWorker): 
    task_list = 'b_tasks' 

    def activity(self, activity_input): 
     result = str(os.getpid()) 
     print 'worker b returning pid: %s' % result 
     self.complete(result=result) 

3) Запустите своих решений и работников. Ваш ресивер и работники могут работать с отдельных хостов или с одного и того же компьютера. Открытые четыре терминала и запустить ваши актеры:

Первый ваш решающая

$ python -i ab_decider.py 
>>> while ABDecider().run(): pass 
... 

Тогда работник А, вы могли бы сделать это с сервера A:

$ python -i ab_workers.py 
>>> while WorkerA().run(): pass 

Тогда работник B, возможно, от сервера B, но если вы запустите их все с ноутбука, он будет работать так же хорошо:

$ python -i ab_workers.py 
>>> while WorkerB().run(): pass 
... 

4) Наконец, начинайте рабочий процесс.

$ python 
Python 2.6.5 (r265:79063, Apr 16 2010, 13:57:41) 
[GCC 4.4.3] on linux2 
Type "help", "copyright", "credits" or "license" for more information. 
>>> import boto.swf.layer2 as swf 
>>> workflows = swf.Domain(name='stackoverflow').workflows() 
>>> workflows 
[<WorkflowType 'MyWorkflow-1.0' at 0xdeb1d0>] 
>>> execution = workflows[0].start(task_list='default_tasks') 
>>> 

Вернитесь, чтобы узнать, что происходит с вашими актерами. Они могут отключиться от службы после одной минуты бездействия. Если это произойдет, нажмите стрелку вверх + введите, чтобы повторно ввести петлю опроса.

Теперь вы можете перейти на панель SWF консоли управления AWS, посмотреть, как выполняются казни и просматривать их историю. Кроме того, вы можете запросить его через командную строку.

>>> execution.history() 
[{'eventId': 1, 'eventType': 'WorkflowExecutionStarted', 
'workflowExecutionStartedEventAttributes': {'taskList': {'name': 'default_tasks'}, 
'parentInitiatedEventId': 0, 'taskStartToCloseTimeout': '300', 'childPolicy': 
'TERMINATE', 'executionStartToCloseTimeout': '3600', 'workflowType': {'version': 
'1.0', 'name': 'MyWorkflow'}}, 'eventTimestamp': 1361132267.5810001}, {'eventId': 2, 
'eventType': 'DecisionTaskScheduled', 'decisionTaskScheduledEventAttributes': 
{'startToCloseTimeout': '300', 'taskList': {'name': ... 

Это просто пример рабочего процесса с последовательным выполнением работ, но это также возможно, решающий к schedule and coordinate parallel execution of activities.

Надеюсь, это поможет вам хотя бы начать. Для немного более сложного примера последовательного документооборота я рекомендую looking at this.

+0

Thank вы так много для этого, это действительно всеобъемлющий ответ. – Jimmy

+0

Большое спасибо! – Vor

+0

@ oozie- большой ответ, удивительный класс. Нужно быть в Boto docs- – Yarin

1

Вы можете использовать SNS, Когда сценарий А завершается он должен вызвать SNS, и это будет вызывать уведомление сервера B

+0

SNS не имеет функции, которые мне нужны несчастливо – Jimmy

5

у меня нет ни одного примера кода для обмена, но вы определенно можете использовать SWF для координации выполнения сценариев на двух серверах. Основная идея заключается в создании трех фрагментов кода, которые говорят с SWF:

  • Компонент, который знает, какой сценарий выполнить первым и что делать после выполнения первого скрипта. Это называется «решателем» в условиях SWF.
  • Два компонента, каждый из которых понимает, как выполнить конкретный скрипт, который вы хотите запустить на каждой машине. Они называются «работниками деятельности» в условиях SWF.

Первый компонент, определитель, вызывает два SWF-интерфейса: PollForDecisionTask и RespondDecisionTaskCompleted. Запрос опроса даст компоненту-определителю текущую историю исполняемого рабочего процесса, в основном информацию о состоянии «где я» для вашего бегуна сценария. Вы пишете код, который просматривает эти события и выясняет, какой сценарий должен выполнить. Эти «команды» для выполнения сценария будут представлены в виде планирования задачи активности, которая возвращается как часть вызова RespondDecisionTaskCompleted.

Вторые компоненты, которые вы пишете, рабочие, каждый из которых вызывают два SWF-API: PollForActivityTask и RespondActivityTaskCompleted. Запрос опроса даст работнику активности указание, что он должен выполнить скрипт, о котором он знает, что SWF вызывает задачу действия. Информация, возвращаемая из запроса опроса в SWF, может включать в себя отдельные данные, относящиеся к выполнению, которые были отправлены в SWF как часть планирования задачи активности. Каждый из ваших серверов будет независимо опросить SWF для задач активности, чтобы указать на выполнение локального скрипта на этом хосте. После того, как работник закончил выполнение скрипта, он обращается к SWF через API-интерфейс RespondActivityTaskCompleted.

Обратный вызов вашего работника активности в SWF приводит к тому, что новая история передается компоненту-определителю, о котором я уже упоминал. Он будет смотреть на историю, видеть, что первый скрипт сделан, и запланировать вторую для выполнения. Как только он увидит, что второй выполнен, он может «закрыть» рабочий процесс, используя другой тип решения.

Вы запускаете весь процесс выполнения сценариев на каждом хосте, вызывая API StartWorkflowExecution. Это создает запись общего процесса в SWF и выдает первую историю процессу принятия решения, чтобы запланировать выполнение первого скрипта на первом хосте.

Надеюсь, это даст немного больше информации о том, как выполнить этот тип рабочего процесса с использованием SWF. Если вы еще этого не сделали, я бы заглянул в руководство разработчика на странице SWF для получения дополнительной информации.

1

хороший пример,

Кроме того, если вы не хотите, чтобы экспортировать свои учетные данные в среде, вы можете позвонить в классах:

swf.set_default_credentials(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) 
Смежные вопросы