3

Использование SCIO от spotify написать работу для Dataflow после 2 примера e.g1 и e.g2 написать PubSub поток в GCS, но получить следующее сообщение об ошибке для кода нижеОшибка записи PubSub поток в Cloud Storage с помощью DataFlow

Ошибка

Exception in thread "main" java.lang.IllegalArgumentException: Write can only be applied to a Bounded PCollection 

Код

object StreamingPubSub { 
    def main(cmdlineArgs: Array[String]): Unit = { 
// set up example wiring 
val (opts, args) = ScioContext.parseArguments[ExampleOptions](cmdlineArgs) 
val dataflowUtils = new DataflowExampleUtils(opts) 
dataflowUtils.setup() 

val sc = ScioContext(opts) 


sc.pubsubTopic(opts.getPubsubTopic) 
.timestampBy { 
    _ => new Instant(System.currentTimeMillis() - (scala.math.random * RAND_RANGE).toLong) 
    } 
.withFixedWindows((Duration.standardHours(1))) 
.groupBy(_ => Unit) 
.toWindowed 
.toSCollection 
.saveAsTextFile(args("output")) 


val result = sc.close() 

// CTRL-C to cancel the streaming pipeline 
    dataflowUtils.waitToFinish(result.internal) 
    } 
} 

Я, возможно, смешивая понятие окна с ограниченной PCollection, есть способ для достижения этой цели, или мне нужно, чтобы применить некоторые преобразования, чтобы это произошло, кто может оказать помощь в этом

ответ

3

Я считаю, В SCSI saveAsTextFile используется преобразование Write Dataflow, которое поддерживает только ограниченные параметры PCollections. Dataflow не предоставляет прямой API для написания неограниченного PCollection в Google Cloud Storage, хотя это то, что мы изучаем.

Чтобы сохранить где-нибудь неограниченный PCollection, рассмотрите, например, BigQuery, Datastore или Bigtable. В API SCIO вы можете использовать, например, saveAsBigQuery.

+0

Спасибо за быстрый ответ – DAR

Смежные вопросы