2016-09-21 3 views
1

В настоящее время я работаю над потоком NiFi, который требует применения пользовательских процессоров для применения преобразований на записи csv.Apache NiFi - NullPointerException установка нескольких потоков на пользовательских процессорах

Я заметил это поведение во время некоторых тестов, которые я выполняю: если для каждого пользовательского процессора назначается только один поток, все работает хорошо. Назначение большего количества потоков для результатов пользовательских процессоров для не удалось обработать сеанс из-за java.lang.NullPointerException.

Поскольку ошибка не может быть воспроизведена с помощью одной нити, я больше думаю о некоторых проблемах при обработке файловых файлов или некоторых аспектах NiFi, о которых я не знаю.

Обработка выполняется для доступа к атрибутам потока. Содержимое потокового файла никогда не читается, и выходной файл потока возвращается после добавления некоторых атрибутов. Ниже приведен фрагмент из соответствующих частей коды процессора:

@Override 
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { 

    // Will hold all the processed attributes 
    Map<String, String> processedAttributes = new HashMap<>(); 
    FlowFile flowfile = session.get(); 
    ... 
    // Adds the attributes to the flowfile 
    flowfile = session.putAllAttributes(flowfile, processedAttributes); 
    session.transfer(flowfile, PROCESSED); 
    } 

Я бег Nifi 0.7 на m4.4xlarge Amazon EC2 экземпляра. Поскольку я ищу высокие результаты (кто этого не делает), я ищу безопасный способ увеличить количество потоков. Любое предложение действительно оценено.

Заранее спасибо.

+1

Не могли бы вы предоставить связанную трассировку стека и обеспечить предоставленный код? – apiri

ответ

2

Возможно (особенно с несколькими потоками/параллельными задачами), чтобы метод session.get() возвращал значение null для некоторых потоков. Это происходит, если запланировано два или более потоков, потому что есть доступный файл потока, затем один поток получает файл потока (через session.get()), а следующий поток получает значение null.

В вашем случае вы не можете читать или записывать файл потока, но другие методы (например, session.putAllAttributes()) вызывают методы в файле потока. Многие процессоры добавляют чек, чтобы увидеть, будет ли файл потока == null и будет возвращен (так как он хочет, чтобы файл потока работал), возможно, это также устранит вашу проблему.

+0

Пробовал сначала отдельно только на одном настраиваемом процессоре, и он не выбрал исключение NullPointerException. Изменен код для всех из них, исключение не отображается. Из любопытства, как NiFi обрабатывает параллелизм потоков в очереди? спасибо за помощь – riccamini

Смежные вопросы