2016-07-11 2 views
4

Есть ли способ одновременной записи в разные потоки в пользовательском процессоре в NiFi? К примеру у меня есть сторонние библиотеки, которые делают значительную обработку с использованием API-интерфейсов, которые работают что-то вроде этого:Apache NiFi: Вывод на несколько потоков одновременно?

public void process(InputStream in, OutputStream foo, OutputStream baa, List<String> args) 
{ 
    ... 
    foo.write(things); 
    baa.write(stuff); 
    ... 
} 

Но единственные примеры, которые я могу найти все просто использовать один выходной поток:

FlowFile transform = session.write(original, new OutputStreamCallback() { 
     @Override 
     public void process(OutputStream out) throws IOException { 
      out.write("stuff"); 
     } 
    }); 

обработка выполняется в партиях (из-за его большого масштаба), поэтому нецелесообразно выполнять всю обработку, а затем выписывать отдельные потоки.

Единственный путь я могу придумать это процесс ввода несколько раз :(

Чтобы уточнить, я хочу написать к нескольким FlowFiles, используя метод session.write(flowfile, callback), таким образом, различные потоки могут быть отправлены/управляются отдельно

+0

Использование [TeeOutputStream] (https://commons.apache.org/proper/commons-io/javadocs/api-2.4/org/apache/commons/io/output/TeeOutputStream.html) из вопрос? Смотрите: http://stackoverflow.com/questions/7987395/how-to-write-data-to-two-java-io-outputstream-objects-at-once – GPI

+0

Я так не думаю, единственный способ, которым я знаю, писать в файл потока используется OutputStreamCallback, который имеет только одну функцию (процесс), которая принимает только один аргумент (OutputStream). – foobarking

+0

Да, но TeeOutputStream позволяет вам иметь 1 поток, который записывает в 2 отдельных файла, isn (t, что достаточно? – GPI

ответ

4

Nifi API основан на воздействуя на один файл потока в то время, но вы должны быть в состоянии сделать что-то вроде этого:

 FlowFile flowFile1 = session.create(); 
     final AtomicReference<FlowFile> holder = new AtomicReference<>(session.create()); 

     flowFile1 = session.write(flowFile1, new OutputStreamCallback() { 
      @Override 
      public void process(OutputStream out) throws IOException { 

       FlowFile flowFile2 = session.write(holder.get(), new OutputStreamCallback() { 
        @Override 
        public void process(OutputStream out) throws IOException { 

        } 
       }); 
       holder.set(flowFile2); 

      } 
     }); 
+0

Спасибо это единственное изменение, которое сделало бы первое OutputStream другим именем (и окончательным), чтобы вы могли писать как во внутренней внутренней функции. – foobarking

3

Поскольку вы делаете различные выходы из того же входа вы, возможно, также con sider, выполняющий эти шаги, разбивается на дискретные процессоры, которые фокусируются на выполнении своей конкретной функции. Выше вы показываете «вещи» и «материал», поэтому, например, я предлагаю вам иметь процессор DoThings и DoStuff. В своем потоке вы можете отправить один и тот же файл потока, просто используя соединение источника дважды. Затем это позволяет хорошо проводить параллельные операции и позволяет им иметь разные промежутки времени/etc. NiFi по-прежнему будет поддерживать прослеживаемость для вас, и на самом деле это не будет копирование байтов вообще, а передача указателя на исходный контент.

+0

Я согласен, что у этого есть свои преимущества, но если есть большой объем обработки сделанный за входной поток, он расточительно бросает все это и делает это снова (или дважды параллельно). Его компромисс. Спасибо. – foobarking

+0

Не могли бы вы выполнить промежуточную (или «общую») обработку в одном процессоре, а затем разделить на два последующих процессора, когда действия отличаются? Ничто не говорит о том, что содержимое потока должно находиться в определенном состоянии, когда процессор A заканчивается, пока B и B 'могут принимать выход A в качестве входного сигнала. – Andy