在Apache Beam中向不同的BigQuery表写入不同的值
假设我有一个PCollection<Foo>
,我想把它写到多个BigQuery表中,为每个Foo
select一个可能不同的表。
我怎样才能使用Apache Beam BigQueryIO
API来做到这一点?
这可以使用最近添加到Apache Beam中BigQueryIO
的function。
PCollection<Foo> foos = ...; foos.apply(BigQueryIO.write().to(new SerializableFunction<ValueInSingleWindow<Foo>, TableDestination>() { @Override public TableDestination apply(ValueInSingleWindow<Foo> value) { Foo foo = value.getValue(); // Also available: value.getWindow(), getTimestamp(), getPane() String tableSpec = ...; String tableDescription = ...; return new TableDestination(tableSpec, tableDescription); } }).withFormatFunction(new SerializableFunction<Foo, TableRow>() { @Override public TableRow apply(Foo foo) { return ...; } }).withSchema(...));
根据input的PCollection<Foo>
是有界的还是无界的,在PCollection<Foo>
,这将创build多个BigQuery导入作业(每个表取决于数据量一个或多个表),或者使用BigQuerystream插入API。
最灵活的API版本使用DynamicDestinations
,它允许您为具有不同架构的不同表编写不同的值,甚至允许您在所有这些计算中使用来自其余pipe道的侧面input。
另外,BigQueryIO已经被重构为许多可重用的转换,您可以自己组合起来实现更复杂的用例 – 查看源目录中的文件 。
这个特性将被包含在Apache Beam的第一个稳定版本以及Dataflow SDK的下一个版本中(这将基于Apache Beam的第一个稳定版本)。 现在你可以通过在github的HEAD快照上运行你的pipe道来使用它。