2016-08-30 2 views
0

Допустим, у меня есть два DataStream «S различных типов:Как написать несколько DATASTREAM, чтобы один файл

val stream1: DataStream[(Int, Int, Int)] = ... 
val stream2: DataStream[(Int, Int, Int, Int, Float)] = ... 

Как я могу написать как потоки в один файл?

Я пробовал разные вещи, но, похоже, не работает. Например, я не могу просто написать сразу, как

stream1.writeAsText("path/to/file.txt").setParallelism(1) 
stream2.writeAsText("path/to/file.txt").setParallelism(1) 

потому Flink пожалуется со следующим сообщением:

java.io.IOException: File or directory already exists. 
Existing files and directories are not overwritten in NO_OVERWRITE mode. 
Use OVERWRITE mode to overwrite existing files and directories. 

С другой стороны, я не могу переписать так:

stream1.writeAsText("path/to/file.txt").setParallelism(1) 
stream2.writeAsText("path/to/file.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1) 

потому что (насколько я понимаю) второй поток перезапишет все, что написал первый поток.

Наконец, я думал о подключении потока как этот

val connectedStream: ConnectedStream = stream1.connect(stream2) 

, но тогда я бы получить ConnectedStream, который не имеет метод writeAsText.

(Для записи у меня на самом деле есть 4 потока, которые я бы хотел записать в один файл).

ответ

0

Очень простое решение состоит в том, чтобы использовать для каждого потока картографию, чтобы отображать каждое событие в String (или другой общий тип, такой как byte[]). Затем у вас есть четыре потока одного типа (DataStream[String]), которые вы можете объединить в один поток и записать как один поток в файл.

Это будет выглядеть следующим образом:

val s1: DataStream[String] = ??? 
val s2: DataStream[String] = ??? 
val s3: DataStream[String] = ??? 
val s4: DataStream[String] = ??? 

val out: DataStream[String] = s1.union(s2).union(s3).union(s4) 
out.writeAsText("path/to/file") 
+0

Спасибо! Но как я могу расширить это решение до 4 потоков? Если я присоединяюсь к 2 потокам, я получаю «JoinStream», у которого нет метода «join» для объединения «в цепочку» с оставшимися 2 потоками ... – houcros

+0

Я сказал «union» not 'join'. Это разные операции. Я добавил пример кода к моему ответу. –

+0

Извините, вы правы ... Я не знаю, почему я был уверен, что вы сказали «присоединяйтесь» ... – houcros

Смежные вопросы