scio icon indicating copy to clipboard operation
scio copied to clipboard

@ToTable support for nested selectedFields for use in BQ Storage API

Open sisidra opened this issue 4 years ago • 5 comments

Cont for https://github.com/spotify/scio/issues/3944 ;)

Current implementation supplies top level fields to the BQ Storage API even if selected field is a record with only small subset of nested fields (quite often the case for different export tables).

Expected behaviour is to enumerate only nested fields in selectedFields.

sisidra avatar Sep 09 '21 16:09 sisidra

Here is test to demonstrate: https://github.com/sisidra/scio/commit/df9e9ef5d701eb7ab1f72f6fe96100532cf9be74#diff-c3f7d7df2dcfd1a7c4a6b1e9084044eee6f2f1d4022646cb6bddc2073bebae3b

sisidra avatar Sep 09 '21 16:09 sisidra

hi! Unfortunately the Scio offering reflects the limitations of the current BQ Storage API (what Beam's BigQueryIO uses for Storage reads):

  message TableReadOptions {
    // Names of the fields in the table that should be read. If empty, all
    // fields will be read. If the specified field is a nested field, all
    // the sub-fields in the field will be selected. The output field order is
    // unrelated to the order of fields in selected_fields.
    repeated string selected_fields = 1;

I'm not sure if nested field support is on the roadmap or not

clairemcginty avatar Sep 09 '21 20:09 clairemcginty

The documentation actually is poor in this regard, I noticed that. ;)

It states that selecting root field will include all child fields, it does not say that child fields can not be specified. 🤷

My tests show that storage API will accept nested fields with notation "rootF.childF" and performance will be better than just by specifying root field only.

sisidra avatar Sep 10 '21 06:09 sisidra

Here is example with raw BQ Storage API. There is a caveat tho - currently selecting nested field on repeated record does not seem to work.

import com.google.cloud.bigquery.storage.v1._
import com.google.cloud.bigquery.storage.v1.ReadSession._
import org.apache.avro._
import org.apache.avro.generic._
import org.apache.avro.io.DecoderFactory;

val client = BigQueryReadClient.create()
val srcTable = "projects/bigquery-public-data/datasets/samples/tables/github_nested"
val options = TableReadOptions.newBuilder()
  .addSelectedFields("payload.name").addSelectedFields("payload.url")
  // .addSelectedFields("payload.pages.action") // selecting nested field on repeated record does not work
  .setRowRestriction("payload.name IS NOT NULL")
  .build()
val sessionBuilder = ReadSession.newBuilder().setTable(srcTable).setDataFormat(DataFormat.AVRO).setReadOptions(options)
val builder = CreateReadSessionRequest.newBuilder().setParent("projects/[REPLACE]").setReadSession(sessionBuilder).setMaxStreamCount(1)
val session = client.createReadSession(builder.build())
val streamName = session.getStreams(0).getName()
val readRowsRequest = ReadRowsRequest.newBuilder().setReadStream(streamName).build()
val stream = client.readRowsCallable().call(readRowsRequest)
val datumReader = new GenericDatumReader[GenericRecord](new Schema.Parser().parse(session.getAvroSchema().getSchema()))
val avroRows = stream.iterator().next().getAvroRows()
val decoder = DecoderFactory.get().binaryDecoder(avroRows.getSerializedBinaryRows().toByteArray(), null)
val avroRecord = datumReader.read(null, decoder)

stream.cancel()
client.close()

sisidra avatar Sep 10 '21 07:09 sisidra

@clairemcginty any update on this ? I feel that this would be solved when moving to magnolify

RustedBones avatar Jul 20 '23 10:07 RustedBones