2017-02-22 2 views
2

Я все еще новичок в потоке данных Apache Beam/Cloud, поэтому прошу прощения, если мое понимание неверно.Ошибка определения входного сигнала на стороне Apache Beach

Я пытаюсь прочитать файл данных длиной ~ 30 000 строк по конвейеру. Мой простой конвейер сначала открыл csv из GCS, вытащил заголовки из данных, запустил данные через функцию ParDo/DoFn, а затем написал весь вывод в csv обратно в GCS. Этот трубопровод работал и был моим первым испытанием.

Затем я редактировал конвейер для чтения csv, вытаскивал заголовки, удалял заголовки из данных, запускал данные через функцию ParDo/DoFn с заголовками в качестве бокового входа, а затем записывал весь вывод в csv. Единственный новый код передавал заголовки в виде бокового ввода и отфильтровывал его из данных.

enter image description here enter image description here

Функция build_rows Pardo/DoFn только дает context.element, чтобы я мог убедиться, что мои боковые входы работают.

Я получаю ошибку ниже: enter image description here
Я не совсем уверен, что проблема, но я думаю, что это может быть связано с ограничением памяти. Я обрезал свои данные образца с 30 000 строк до 100 строк, и мой код, наконец, работал.

Трубопровод без боковых входов делает чтение/запись всех 30 000 строк, но в конце мне понадобятся боковые входы для преобразования данных.

Как исправить мой конвейер, чтобы обрабатывать большие CSV-файлы из GCS и по-прежнему использовать боковые входы в качестве псевдо-глобальной переменной для файла?

+0

* Примечание: Это проверено локально. Когда я добавляю код, я делаю инкрементные тесты. Если он работает локально, я запускаю его в Google Cloud Dataflow, чтобы убедиться, что он также работает там. Если он работает в Cloud Dataflow, я добавляю больше кода. –

ответ

1

Я недавно закодировал CSV file source для Apache Beam, и я добавил его в пакет PiPy beam_utils. В частности, вы можете использовать его следующим образом:

  1. Установка утилиты пучка: pip install beam_utils
  2. Импорт: from beam_utils.sources import CsvFileSource.
  3. Используйте его как источник: beam.io.Read(CsvFileSource(input_file)).

В своем поведении по умолчанию CsvFileSource возвращает словари, индексированные заголовком, но вы можете взглянуть на документацию, чтобы решить, какой вариант вы хотите использовать.

В качестве дополнительного, если вы хотите реализовать свой собственный CsvFileSource, вам нужно создать подкласс луч FileBasedSource:

import csv 
class CsvFileSource(beam.io.filebasedsource.FileBasedSource): 
    def read_records(self, file_name, range_tracker): 
    self._file = self.open_file(file_name) 
    reader = csv.reader(self._file) 
    for i, rec in enumerate(reader): 
     yield res 

И вы можете расширить эту логику для разбора заголовков и других особенностей поведения.

Кроме того, в качестве примечания этот источник нельзя разделить, поскольку его необходимо последовательно анализировать, поэтому он может представлять собой узкое место при обработке данных (хотя это может быть хорошо).

+0

Привет, Пабло, Спасибо, что посмотрел на другой один из моих вопросов.Я изменил свой код, чтобы использовать файл bas_utils CsvFileSource, который вы написали, и кажется, что все работает намного лучше. Я знаю, что больше нужно использовать боковые входы, которые причиняли мне неприятности, но не могли бы вы рассказать мне, что моя проблема могла быть? Просто чтобы понять, что происходит. –

+0

Дайте мне немного времени, чтобы проверить, почему произошло утверждение. – Pablo

+0

Вам нужно добавить __init__, где вы четко указываете на то, что он является разделяемым. I.e super (CsvFileSource, s) .__ init __ (имя файла, splitittable = False). Если нет, вы рискуете, что несколько рабочих снова и снова будут читать одно и то же содержимое, полагая, что аргумент range_tracker в read_records соблюден. – innohead

Смежные вопросы