У меня возникла проблема с выполнением пользовательского асинхронного действия в Oozie. Мой класс распространяется от ActionExecutor и перезаписывает методы initActionType, start, end, check, kill и isCompleted.Oozie custom асинхронное действие
В методе начала я хочу запустить задание YARN, которое реализуется через мой класс BiohadoopClient. Для того, чтобы сделать асинхронный вызов, я завернул метод client.run() в отзывном:
public void start(final Context context, final WorkflowAction action) {
...
Callable<String> biohadoop = new Callable<String>() {
BiohadoopClient client = new BiohadoopClient();
client.run();
}
// submit callable to executor
executor.submit(biohadoop);
// set the start data, according to https://oozie.apache.org/docs/4.0.1/DG_CustomActionExecutor.html
context.setStartData(externalId, callBackUrl, callBackUrl);
...
}
Это прекрасно работает, и, например, когда я использую мои пользовательские действия в вилке/нарисуйте образом, исполнение из действия выполняются параллельно.
Теперь проблема заключается в том, что Оози остается в состоянии RUNNING для этих действий. Кажется невозможным изменить это на законченное состояние. Метод check() никогда не вызывается Oozie, это же верно для метода end(). Это не помогает установить context.setExternalStatus(), context.setExecutionData() и context.setEndData() вручную в Callable (после завершения client.run()). Я также попытался вручную поставить в очередь ActionEndXCommand, но без везения.
Когда я жду в методе start() для Callable для завершения, состояние будет обновляться правильно, но выполнение в fork/join больше не является параллельным (что кажется логичным, так как выполнение ожидает вызова Callable to полный).
How external clients notify Oozie workflow with HTTP callback не помогло, так как использование обратного вызова, похоже, ничего не меняет (хорошо, я вижу, что это произошло в файлах журнала, но кроме того, ничего ...). Кроме того, в ответе упоминалось, что действие SSH выполняется асинхронно, но я не понял, как это делается. Существует некоторая упаковка внутри Callable, но в конце метод call() вызываемого вызова вызывается напрямую (без подчинения Исполнителю).
До сих пор я не нашел никакого примера, как написать асинхронное пользовательское действие. Может ли кто-нибудь помочь мне?
Благодаря
Редактировать
Вот реализации initActionType(), начало(), проверка(), конец(), то вызываемая реализация может быть найден в действии старт().
Вызываемый передается исполнителю в действии start(), после которого вызывается его метод shutdown(), поэтому исполнитель выключается после завершения Callable. В качестве следующего шага вызывается context.setStartData (externalId, callBackUrl, callBackUrl).
private final AtomicBoolean finished = new AtomicBoolean(false);
public void initActionType() {
super.initActionType();
log.info("initActionType() invoked");
}
public void start(final Context context, final WorkflowAction action)
throws ActionExecutorException {
log.info("start() invoked");
// Get parameters from Node configuration
final String parameter = getParameters(action.getConf());
Callable<String> biohadoop = new Callable<String>() {
@Override
public String call() throws Exception {
log.info("Starting Biohadoop");
// No difference if check() is called manually
// or if the next line is commented out
check(context, action);
BiohadoopClient client = new BiohadoopClient();
client.run(parameter);
log.info("Biohadoop finished");
finished.set(true);
// No difference if check() is called manually
// or if the next line is commented out
check(context, action);
return null;
}
};
ExecutorService executor = Executors.newCachedThreadPool();
biohadoopResult = executor.submit(biohadoop);
executor.shutdown();
String externalId = action.getId();
String callBackUrl = context.getCallbackUrl("finished");
context.setStartData(externalId, callBackUrl, callBackUrl);
}
public void check(final Context context, final WorkflowAction action)
throws ActionExecutorException {
// finished is an AtomicBoolean, that is set to true,
// after Biohadoop has finished (see implementation of Callable)
if (finished.get()) {
log.info("check(Context, WorkflowAction) invoked -
Callable has finished");
context.setExternalStatus(Status.OK.toString());
context.setExecutionData(Status.OK.toString(), null);
} else {
log.info("check(Context, WorkflowAction) invoked");
context.setExternalStatus(Status.RUNNING.toString());
}
}
public void end(Context context, WorkflowAction action)
throws ActionExecutorException {
log.info("end(Context, WorkflowAction) invoked");
context.setEndData(Status.OK, Status.OK.toString());
}
Можете ли вы показать, как вы реализовали методы check() и initActionType() и как вы реализуете метод call() в Callable? –
@SSaikia_JtheRocker: Я добавил реализации – gappc