@ToTable support for nested selectedFields for use in BQ Storage API
Cont for https://github.com/spotify/scio/issues/3944 ;)
Current implementation supplies top level fields to the BQ Storage API even if selected field is a record with only small subset of nested fields (quite often the case for different export tables).
Expected behaviour is to enumerate only nested fields in selectedFields.
Here is test to demonstrate: https://github.com/sisidra/scio/commit/df9e9ef5d701eb7ab1f72f6fe96100532cf9be74#diff-c3f7d7df2dcfd1a7c4a6b1e9084044eee6f2f1d4022646cb6bddc2073bebae3b
hi! Unfortunately the Scio offering reflects the limitations of the current BQ Storage API (what Beam's BigQueryIO uses for Storage reads):
message TableReadOptions {
// Names of the fields in the table that should be read. If empty, all
// fields will be read. If the specified field is a nested field, all
// the sub-fields in the field will be selected. The output field order is
// unrelated to the order of fields in selected_fields.
repeated string selected_fields = 1;
I'm not sure if nested field support is on the roadmap or not
The documentation actually is poor in this regard, I noticed that. ;)
It states that selecting root field will include all child fields, it does not say that child fields can not be specified. 🤷
My tests show that storage API will accept nested fields with notation "rootF.childF" and performance will be better than just by specifying root field only.
Here is example with raw BQ Storage API. There is a caveat tho - currently selecting nested field on repeated record does not seem to work.
import com.google.cloud.bigquery.storage.v1._
import com.google.cloud.bigquery.storage.v1.ReadSession._
import org.apache.avro._
import org.apache.avro.generic._
import org.apache.avro.io.DecoderFactory;
val client = BigQueryReadClient.create()
val srcTable = "projects/bigquery-public-data/datasets/samples/tables/github_nested"
val options = TableReadOptions.newBuilder()
.addSelectedFields("payload.name").addSelectedFields("payload.url")
// .addSelectedFields("payload.pages.action") // selecting nested field on repeated record does not work
.setRowRestriction("payload.name IS NOT NULL")
.build()
val sessionBuilder = ReadSession.newBuilder().setTable(srcTable).setDataFormat(DataFormat.AVRO).setReadOptions(options)
val builder = CreateReadSessionRequest.newBuilder().setParent("projects/[REPLACE]").setReadSession(sessionBuilder).setMaxStreamCount(1)
val session = client.createReadSession(builder.build())
val streamName = session.getStreams(0).getName()
val readRowsRequest = ReadRowsRequest.newBuilder().setReadStream(streamName).build()
val stream = client.readRowsCallable().call(readRowsRequest)
val datumReader = new GenericDatumReader[GenericRecord](new Schema.Parser().parse(session.getAvroSchema().getSchema()))
val avroRows = stream.iterator().next().getAvroRows()
val decoder = DecoderFactory.get().binaryDecoder(avroRows.getSerializedBinaryRows().toByteArray(), null)
val avroRecord = datumReader.read(null, decoder)
stream.cancel()
client.close()
@clairemcginty any update on this ? I feel that this would be solved when moving to magnolify