2016-12-01 2 views
3

Я пытаюсь передать HashMap в качестве бокового ввода в конвейере потока данных. Я не могу найти никаких примеров, кроме тех, которые передают либо String, Int, либо Long. Мой код:Как передать HashMap в качестве бокового входа в Dataflow

tagList = pipeline.apply(TextIO.Read.named("tagListTextRead").from("gs://mybucket/tag-list.json")); 

PCollection<Map<String,TagObject>> tagMap = tagList 
      .apply(ParDo.named("allTagsToTagMap").of(new Tags.BuildTagListMapFn())); 


PCollectionView<Map<String, TagObject>> tagMapView = 
      allTags.apply(View.<String, TagObject>asMap()); 

Третий оператор дает синтаксическую ошибку.

The method apply(PTransform<? super PCollection<Map<String,TagObject>>,OutputT>) in the type 
    PCollection<Map<String,TagObject>> is not applicable for the arguments 
    (View.AsMap<String,TagObject>) 

Может кто-нибудь показать мне, как передать HashMap в качестве бокового ввода в конвейере потока данных.

ответ

2

Вот два разных ответа, в зависимости от деталей вашего трубопровода.

  1. Если у вас есть PCollection<KV<K, V>>, то вы можете использовать View.asMap() для получения PCollectionView<Map<K, V>>. Вам не нужно самостоятельно строить Map.

  2. Если у вас есть PCollection<Map<K, V>> с одним элементом, вы можете использовать View.asSingleton() для бокового ввода.

Первый, вероятно, наиболее естественным, и ваш код будет в конечном итоге выглядит как

PCollectionView<Map<String, TagObject>> = pipeline 
    .apply("tagListTextRead", TextIO.Read.from("gs://mybucket/tag-list.json")) 
    .apply("tagsToKv", new Tags.TagToKvFunction()) 
    .apply("viewTags", View.<String, TagObject>asMap()) 

Расширение это, чтобы показать типы промежуточных значений:

PCollection<String> rawTags = 
    pipeline.apply("tagListTextRead", TextIO.Read.from("gs://mybucket/tag-list.json")) 

PCollection<KV<String, TagObject>> kvs = 
    rawTags.apply("tagsToKv", new Tags.TagToKvFunction()) 

PCollectionView<Map<String, TagObject>> = 
    kvs.apply("viewTags", View.<String, TagObject>asMap()) 
+0

Большое спасибо , Это сработало для меня. Мое дело было №2, поэтому использование asSingleton было тем, что мне было нужно. –

Смежные вопросы