Я все еще новичок в потоке данных Apache Beam/Cloud, поэтому прошу прощения, если мое понимание неверно.Ошибка определения входного сигнала на стороне Apache Beach
Я пытаюсь прочитать файл данных длиной ~ 30 000 строк по конвейеру. Мой простой конвейер сначала открыл csv из GCS, вытащил заголовки из данных, запустил данные через функцию ParDo/DoFn, а затем написал весь вывод в csv обратно в GCS. Этот трубопровод работал и был моим первым испытанием.
Затем я редактировал конвейер для чтения csv, вытаскивал заголовки, удалял заголовки из данных, запускал данные через функцию ParDo/DoFn с заголовками в качестве бокового входа, а затем записывал весь вывод в csv. Единственный новый код передавал заголовки в виде бокового ввода и отфильтровывал его из данных.
Функция build_rows Pardo/DoFn только дает context.element, чтобы я мог убедиться, что мои боковые входы работают.
Я получаю ошибку ниже:
Я не совсем уверен, что проблема, но я думаю, что это может быть связано с ограничением памяти. Я обрезал свои данные образца с 30 000 строк до 100 строк, и мой код, наконец, работал.
Трубопровод без боковых входов делает чтение/запись всех 30 000 строк, но в конце мне понадобятся боковые входы для преобразования данных.
Как исправить мой конвейер, чтобы обрабатывать большие CSV-файлы из GCS и по-прежнему использовать боковые входы в качестве псевдо-глобальной переменной для файла?
* Примечание: Это проверено локально. Когда я добавляю код, я делаю инкрементные тесты. Если он работает локально, я запускаю его в Google Cloud Dataflow, чтобы убедиться, что он также работает там. Если он работает в Cloud Dataflow, я добавляю больше кода. –