2014-10-03 2 views
1

У меня возникла проблема с выполнением пользовательского асинхронного действия в Oozie. Мой класс распространяется от ActionExecutor и перезаписывает методы initActionType, start, end, check, kill и isCompleted.Oozie custom асинхронное действие

В методе начала я хочу запустить задание YARN, которое реализуется через мой класс BiohadoopClient. Для того, чтобы сделать асинхронный вызов, я завернул метод client.run() в отзывном:

public void start(final Context context, final WorkflowAction action) { 
... 
    Callable<String> biohadoop = new Callable<String>() { 
    BiohadoopClient client = new BiohadoopClient(); 
    client.run(); 
    } 

    // submit callable to executor 
    executor.submit(biohadoop); 

    // set the start data, according to https://oozie.apache.org/docs/4.0.1/DG_CustomActionExecutor.html 
    context.setStartData(externalId, callBackUrl, callBackUrl); 
... 
} 

Это прекрасно работает, и, например, когда я использую мои пользовательские действия в вилке/нарисуйте образом, исполнение из действия выполняются параллельно.

Теперь проблема заключается в том, что Оози остается в состоянии RUNNING для этих действий. Кажется невозможным изменить это на законченное состояние. Метод check() никогда не вызывается Oozie, это же верно для метода end(). Это не помогает установить context.setExternalStatus(), context.setExecutionData() и context.setEndData() вручную в Callable (после завершения client.run()). Я также попытался вручную поставить в очередь ActionEndXCommand, но без везения.

Когда я жду в методе start() для Callable для завершения, состояние будет обновляться правильно, но выполнение в fork/join больше не является параллельным (что кажется логичным, так как выполнение ожидает вызова Callable to полный).

How external clients notify Oozie workflow with HTTP callback не помогло, так как использование обратного вызова, похоже, ничего не меняет (хорошо, я вижу, что это произошло в файлах журнала, но кроме того, ничего ...). Кроме того, в ответе упоминалось, что действие SSH выполняется асинхронно, но я не понял, как это делается. Существует некоторая упаковка внутри Callable, но в конце метод call() вызываемого вызова вызывается напрямую (без подчинения Исполнителю).

До сих пор я не нашел никакого примера, как написать асинхронное пользовательское действие. Может ли кто-нибудь помочь мне?

Благодаря

Редактировать

Вот реализации initActionType(), начало(), проверка(), конец(), то вызываемая реализация может быть найден в действии старт().

Вызываемый передается исполнителю в действии start(), после которого вызывается его метод shutdown(), поэтому исполнитель выключается после завершения Callable. В качестве следующего шага вызывается context.setStartData (externalId, callBackUrl, callBackUrl).

private final AtomicBoolean finished = new AtomicBoolean(false); 

public void initActionType() { 
    super.initActionType(); 
    log.info("initActionType() invoked"); 
} 

public void start(final Context context, final WorkflowAction action) 
     throws ActionExecutorException { 
    log.info("start() invoked"); 

    // Get parameters from Node configuration 
    final String parameter = getParameters(action.getConf()); 

    Callable<String> biohadoop = new Callable<String>() { 
     @Override 
     public String call() throws Exception { 
      log.info("Starting Biohadoop"); 

      // No difference if check() is called manually 
      // or if the next line is commented out 
      check(context, action); 

      BiohadoopClient client = new BiohadoopClient(); 
      client.run(parameter); 
      log.info("Biohadoop finished");    

      finished.set(true); 
      // No difference if check() is called manually 
      // or if the next line is commented out 
      check(context, action); 

      return null; 
     } 
    }; 

    ExecutorService executor = Executors.newCachedThreadPool(); 
    biohadoopResult = executor.submit(biohadoop); 
    executor.shutdown(); 

    String externalId = action.getId(); 
    String callBackUrl = context.getCallbackUrl("finished"); 
    context.setStartData(externalId, callBackUrl, callBackUrl); 
} 

public void check(final Context context, final WorkflowAction action) 
     throws ActionExecutorException { 
    // finished is an AtomicBoolean, that is set to true, 
    // after Biohadoop has finished (see implementation of Callable) 
    if (finished.get()) { 
     log.info("check(Context, WorkflowAction) invoked - 
      Callable has finished"); 
     context.setExternalStatus(Status.OK.toString()); 
     context.setExecutionData(Status.OK.toString(), null); 
    } else { 
     log.info("check(Context, WorkflowAction) invoked"); 
     context.setExternalStatus(Status.RUNNING.toString()); 
    } 
} 

public void end(Context context, WorkflowAction action) 
     throws ActionExecutorException { 
    log.info("end(Context, WorkflowAction) invoked"); 
    context.setEndData(Status.OK, Status.OK.toString()); 
} 
+0

Можете ли вы показать, как вы реализовали методы check() и initActionType() и как вы реализуете метод call() в Callable? –

+0

@SSaikia_JtheRocker: Я добавил реализации – gappc

ответ

0

Одна вещи - я могу видеть, вы выключая исполнитель сразу после того, как вы представившая работу - executor.shutdown();. Это может вызвать проблему. Не могли бы вы попытаться переместить это заявление на метод end()?

+0

Спасибо за ваши идеи. Я попробовал, но это не имело значения.JavaDoc довольно четко говорит об использовании shutdown: 'Инициирует упорядоченное завершение работы, в котором выполняются ранее поставленные задачи, но новые задачи не будут приняты. Вызов не имеет дополнительного эффекта, если он уже выключен. Этот метод не дождался выполнения ранее поставленных задач для завершения. Возможно, у вас есть другая идея? – gappc

+0

@gappc: Я сомневаюсь, что переменная AtomicBoolean не обновляется или что-то в этом роде. Удалите инструкции метода проверки из метода start(), а затем проверьте, отображаются ли сообщения журнала, которые вы делаете внутри check(). Также запишите значение final.get(). Было бы намного лучше, если бы вы могли протестировать его с помощью тестового примера JUnit. –

+0

AtomicBoolean настроен правильно, я вижу это из файлов журнала, если я вызываю check() вручную (другой выход журнала). Если я удалю вызовы ручной проверки(), как check(), так и end() вообще не вызываются. Что касается тестов JUnit: вы правы :) Но на нынешнем этапе, когда желаемое решение вообще не работает, дополнительные усилия не стоят - по крайней мере, на мой взгляд. Я получил несколько ответов из списка рассылки oozie, который я сейчас тестирую, я сообщу вам о прогрессе – gappc

0

В конце концов, я не нашел «реального» решения проблемы. Решение, которое работало для меня, состояло в том, чтобы реализовать действие, которое вызывает экземпляры Biohadoop параллельно с использованием инфраструктуры Java Executor. После invokation, я жду (все еще внутри действия), чтобы потоки закончили

Смежные вопросы