1

Я слушаю данные из pub-sub, используя потоковые данные в потоке данных. Затем мне нужно загрузить на хранение, обработать данные и загрузить их в bigquery.Потоковая передача данных в Google Cloud Storage из PubSub с использованием Cloud Dataflow

вот мой код:

public class BotPipline { 

public static void main(String[] args) { 

    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); 
    options.setRunner(BlockingDataflowPipelineRunner.class); 
    options.setProject(MY_PROJECT); 
    options.setStagingLocation(MY_STAGING_LOCATION); 
    options.setStreaming(true); 

    Pipeline pipeline = Pipeline.create(options); 

    PCollection<String> input = pipeline.apply(PubsubIO.Read.maxNumRecords(1).subscription(MY_SUBSCRIBTION)); 

    input.apply(TextIO.Write.to(MY_STORAGE_LOCATION)); 

    input 
    .apply(someDataProcessing(...)).named("update json")) 
    .apply(convertToTableRow(...)).named("convert json to table row")) 
      .apply(BigQueryIO.Write.to(MY_BQ_TABLE).withSchema(tableSchema) 
    ); 
    pipeline.run(); 
} 

}

когда я запускаю код комментирует Сочинение на хранение код работает хорошо. , но когда я пытаюсь загрузить его в большой запрос, я получаю эту ошибку (которая, как ожидается, ..):

Write can only be applied to a Bounded PCollection 

Я не использую границу, так как мне нужно, чтобы запустить все это время, и мне нужно, чтобы данные были загружены немедленно . Любое решение?

EDIT: это мое желаемое поведение:

Я получаю сообщение через PubSub. Каждое сообщение должно храниться в собственном файле в GCS в качестве грубых данных, выполнить некоторую обработку данных, а затем сохранить его в большом запросе, имея имя файла в данных.

данных следует рассматривать сразу после того, как получил в BQ Например:

data published to pubsub : {a:1, b:2} 
data saved to GCS file UUID: A1F432 
data processing : {a:1, b:2} -> 
        {a:11, b: 22} -> 
        {fileName: A1F432, data: {a:11, b: 22}} 
data in BQ : {fileName: A1F432, data: {a:11, b: 22}} 

идея о том, что обработанные данные хранятся в BQ, имеющие связь с шероховатыми данными, хранящимися в ГКС

+0

Возможный дубликат [Может ли TextIO записывать префиксы, полученные из окна maxTimestamp?] (Https://stackoverflow.com/questions/33522178/can-textio-write-to-prefixes-derived-from-the-window- maxtimestamp) – jkff

ответ

2

В настоящее время мы не поддерживают запись неограниченных коллекций в TextIO.Write. См. related question.

Не могли бы вы пояснить, что вы хотите, чтобы поведение неограниченного TextIO.Write было? Например. вы бы хотели, чтобы один постоянно растущий файл или один файл за окно закрывался, когда окно закрывается, или что-то еще, или имеет значение только для вас, что общее содержание написанных файлов в конечном итоге будет содержать все сообщения PubSub, но это не имеет значения, как структурированы файлы и т. д.?

В качестве обходного пути, вы можете осуществить запись в ГКС как свой собственный DoFn, используя IOChannelFactory для взаимодействия с ГКС (на самом деле, TextIO.Write находится под капотом, только композитное преобразование, пользователь мог бы написать сам с нуля) ,

Вы можете получить доступ к окну данных, используя необязательный параметр BoundedWindow на @ProcessElement. Я бы мог дать больше советов, если вы объясните желаемое поведение.

+0

Я получаю сообщения через pubsub. Мне нужно, чтобы каждое сообщение сохранялось в собственном файле в GCS, выполняло некоторую обработку данных, а затем сохраняло его в большом запросе, имея имя файла в данных. – dina

+0

Данные должны быть видны сразу после получения в BQ ** пример **: данные опубликованы в pubsub: '{a: 1, b: 2}' данные, сохраненные в файле GCS UUID: A1F432' обработка данных: ' {a: 1, b: 2} '->' {a: 11, b: 22} '->' {fileName: A1F432, data: {a: 11, b: 22}} ' данные в BQ:' {fileName: A1F432, data: {a: 11, b: 22}} ' – dina

+0

Идея состоит в том, что обработанные данные хранятся в BQ, имеющем линг для грубых данных, хранящихся в GCS – dina

Смежные вопросы