2016-03-21 4 views
0

Я пытаюсь выяснить, как заменить Coder (StringUtf8Coder) для пользовательской реализации.Изменение кодера для преобразования с обратным совместимым кодером

я реализовал кодировщик, который добавляет возможность обрабатывать мгновенные сжатые строки:

import com.google.cloud.dataflow.sdk.coders.ByteArrayCoder; 
import com.google.cloud.dataflow.sdk.coders.Coder; 
import com.google.cloud.dataflow.sdk.coders.CoderException; 
import com.google.cloud.dataflow.sdk.coders.DelegateCoder; 
import com.google.common.base.Charsets; 
import org.xerial.snappy.Snappy; 

import java.io.IOException; 

public class CompressedByteArrayCoder extends DelegateCoder<String, byte[]> { 

    private static String decompressSnappy(byte[] input) throws IOException { 
     if (input == null) { 
      throw new CoderException("null input is not accepted"); 
     } 
     if (Snappy.isValidCompressedBuffer(input)) { 
      return Snappy.uncompressString(input); 
     } 
     return new String(input, Charsets.UTF_8); 
    } 

    private static byte[] compressSnappy(String input) throws IOException { 
     return Snappy.compress(input); 
    } 

    public static CompressedByteArrayCoder of() { 
     return new CompressedByteArrayCoder(ByteArrayCoder.of(), CompressedByteArrayCoder::compressSnappy, CompressedByteArrayCoder::decompressSnappy); 
    } 

    private CompressedByteArrayCoder(Coder<byte[]> coder, CodingFunction<String, byte[]> toFn, CodingFunction<byte[], String> fromFn) { 
     super(coder, toFn, fromFn); 
    } 
} 

Я пытаюсь выяснить способ допускает замену StringUtf8Coder (по умолчанию для PubSubIO.Read) в способ, который не приводит к сбою обновления конвейера потока данных.

Я пытаюсь выяснить, как сообщить бегуну службы обработки данных, что оба кодера «совместимы».

ответ

1

К сожалению, кодеки PCollection не могут быть изменены при обновлении работающего конвейера в службе Google Cloud Dataflow в настоящее время. В этом случае вам нужно будет представить конвейер в качестве нового задания Dataflow.

Для получения дополнительной информации см. Updating an Existing Pipeline, в частности раздел о проверке совместимости.

Это то, что мы могли бы затронуть в будущем. Пожалуйста, проверьте нашу документацию на наличие обновлений.

Смежные вопросы