Я хочу использовать Apache Flink для следующего. У меня есть один основной поток, который должен быть обогащен данными другого потока. Этот основной поток имеет элементы с атрибутами «сайт» и «метка времени». Другой поток (назовем его countrystream) имеет атрибуты «сайт» и «страна». Countrystream должен отслеживать последнюю страну, используемую для сайта. Например, если ("klm.com", "netherlands")
прибыл первым, а через некоторое время появился кортеж ("klm.com", "france")
, то «klm.com» должен указывать на «france» (поскольку это был последний). Таким образом, он должен поддерживать состояние. Предположим, что кортеж («klm.com», 100) прибыл в основной поток. Теперь это должно быть обогащено до ("klm.com", 100, "france")
. Если какой-либо сайт не найден в countrystream, он должен быть обогащен «?». Так, например, ("stackoverflow.com", 150, "?")
. Как я могу архивировать это?Обогащение одного потока другим потоком
0
A
ответ
0
Я нашел решение (мне потребовалось некоторое время). Является ли это эффективным? Можно ли улучшить? Означает ли это, что у меня нет контрольных точек для моего итеративного потока?
val env = StreamExecutionEnvironment.getExecutionEnvironment
val mainStream = env.fromElements("a", "a", "b", "a", "a", "b", "b", "a", "c", "b", "a", "c")
val infoStream = env.fromElements((1, "a", "It is F"), (2, "b", "It is B"), (3, "c", "It is C"), (4, "a", "Whoops, it is A"))
.iterate(
iteration => {
(iteration, iteration)
}
)
mainStream
.coGroup(infoStream)
.where[String]((x: String) => x)
.equalTo(_._2)
.window(TumblingProcessingTimeWindows.of(Time.seconds(1))) {
(first: Iterator[String], second: Iterator[(Int, String, String)], out: Collector[(String, String)]) => {
first.foreach((key: String) => {
val matchingRecords = second
.filter(_._2 == key)
if (matchingRecords.nonEmpty) {
val matchingRecord = matchingRecords.maxBy(_._1)
out.collect((matchingRecord._2, matchingRecord._3))
}
}
)
}
}
.print()
env.execute("proof_of_concept")
Смежные вопросы
- 1. Python Threading, погрузка одного потока за другим
- 2. Как изменить переменные из потока пользовательского интерфейса другим потоком
- 3. Будет ли исключение основного потока catch исключено другим потоком?
- 4. Может ли заблокированный метод потока java выполняться другим потоком?
- 5. Java получить значение потока, используемого для сопоставления с другим потоком
- 6. Убив процесс, порожденный другим потоком
- 7. Управление потоком графов потока
- 8. Почему начинается поток, заблокированный другим потоком?
- 9. Значение pword(), измененное другим потоком
- 10. Чтение из потока с потоком
- 11. Ошибка потока с потоком Java
- 12. Sync parse getInBackground() с другим потоком
- 13. Чтение переменных другим потоком - переменные не изменятся
- 14. Угловая с ng-потоком: существующий объект потока
- 15. Есть ли UserControl, измененный другим потоком, который является основным потоком?
- 16. Как определить приоритет одного потока над другим, когда это необходимо?
- 17. Ошибка ArrayStoreException с JNI и другим потоком
- 18. Обогащение параллельно после сплита
- 19. Обогащение объект подмешать черты
- 20. Обогащение внутреннего класса
- 21. CUDA транспонировать более одного потока
- 22. Как создать крючок для клавиатуры с другим потоком в C#?
- 23. Стоп scanf от ожидающего ввода, с другим потоком
- 24. Что происходит с другим потоком, когда один поток блокируется?
- 25. Spring Webflow 2.0 - связь с другим потоком
- 26. Android - использование неопределенного ProgressBar с другим потоком
- 27. Итерация через список, изменяемый другим потоком
- 28. Почему мой объект используется другим потоком?
- 29. Как определить, что сокет выключен() другим потоком?
- 30. Доступ к словарю элементы, добавленные другим потоком