Допустим, у меня есть два DataStream
«S различных типов:Как написать несколько DATASTREAM, чтобы один файл
val stream1: DataStream[(Int, Int, Int)] = ...
val stream2: DataStream[(Int, Int, Int, Int, Float)] = ...
Как я могу написать как потоки в один файл?
Я пробовал разные вещи, но, похоже, не работает. Например, я не могу просто написать сразу, как
stream1.writeAsText("path/to/file.txt").setParallelism(1)
stream2.writeAsText("path/to/file.txt").setParallelism(1)
потому Flink пожалуется со следующим сообщением:
java.io.IOException: File or directory already exists.
Existing files and directories are not overwritten in NO_OVERWRITE mode.
Use OVERWRITE mode to overwrite existing files and directories.
С другой стороны, я не могу переписать так:
stream1.writeAsText("path/to/file.txt").setParallelism(1)
stream2.writeAsText("path/to/file.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1)
потому что (насколько я понимаю) второй поток перезапишет все, что написал первый поток.
Наконец, я думал о подключении потока как этот
val connectedStream: ConnectedStream = stream1.connect(stream2)
, но тогда я бы получить ConnectedStream
, который не имеет метод writeAsText
.
(Для записи у меня на самом деле есть 4 потока, которые я бы хотел записать в один файл).
Спасибо! Но как я могу расширить это решение до 4 потоков? Если я присоединяюсь к 2 потокам, я получаю «JoinStream», у которого нет метода «join» для объединения «в цепочку» с оставшимися 2 потоками ... – houcros
Я сказал «union» not 'join'. Это разные операции. Я добавил пример кода к моему ответу. –
Извините, вы правы ... Я не знаю, почему я был уверен, что вы сказали «присоединяйтесь» ... – houcros