janusgraph icon indicating copy to clipboard operation
janusgraph copied to clipboard

Janusgraph bigtable rows exceeds the limit 256MiB when exported via Dataflow in Parquet format

Open opan opened this issue 1 year ago • 0 comments

Hi team,

Currently, we are using Janusgraph with Bigtable as the storage backend. And we wanted to export the data out of Bigtable using Dataflow in a Parquet format to cloud storage. But during the process it failed because some of the rows size too large that exceeds the limit with the following error messages:

Error message from worker: java.io.IOException: Failed to start reading from source: BigtableSource{config=BigtableConfig{projectId=gopay-ds-staging, instanceId=risk-serving-bt, appProfileId=default, userAgent=null, emulator=null}, readOptions=BigtableReadOptions{tableId=risk-serving-bt-batch-feature-engine, rowFilter=null, keyRanges=[ByteKeyRange{startKey=[39adad4f015489a062715f5f637573746f6d65725f6167655f796561725f5f696e665f32645f5f637573746f6d65725f5f6e756d657269635f5f6461696c795f5f76b1], endKey=[3a34898871a2d37c2add4a2c502c568a3d3c84378375f55aad094c4a683b6775cb50f7dab18254bf3059ebe0c8f64a87effcc14d107f1d7a6cc1c384a391aa079281a1]}], maxBufferElementCount=null, attemptTimeout=null, operationTimeout=null, waitTimeout=null}, estimatedSizeBytes=67108864}
	org.apache.beam.runners.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:634)
	org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:381)
	org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:211)
	org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
	org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
	org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:304)
	org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:276)
	org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:206)
	org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:150)
	org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:130)
	org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:117)
	java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
	java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.google.api.gax.rpc.FailedPreconditionException: io.grpc.StatusRuntimeException: FAILED_PRECONDITION: Error while reading table 'projects/gopay-ds-staging/instances/risk-serving-bt/tables/risk-serving-bt-batch-feature-engine' : Read returned 269MiB from row '9\255\255O\001T\211\240bq__customer_age_year__inf_2d__customer__numeric__daily_...(length 67)' which exceeds the limit of 256MiB. Make sure you are setting an appropriate request filter to retrieve only recent versions and only the columns you want. If columns are accumulating more versions than you need to read, you can also create a garbage collection policy: https://cloud.google.com/bigtable/docs/configuring-garbage-collection#versions
	com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:102)
	com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:41)
	com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:86)
	com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
	com.google.api.gax.grpc.ExceptionResponseObserver.onErrorImpl(ExceptionResponseObserver.java:82)
	com.google.api.gax.rpc.StateCheckingResponseObserver.onError(StateCheckingResponseObserver.java:84)
	com.google.api.gax.grpc.GrpcDirectStreamController$ResponseObserverAdapter.onClose(GrpcDirectStreamController.java:148)
	io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:570)
	io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	com.google.cloud.bigtable.data.v2.stub.metrics.ConnectionErrorCountInterceptor$1$1.onClose(ConnectionErrorCountInterceptor.java:66)
	io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	com.google.cloud.bigtable.data.v2.stub.CookiesInterceptor$UpdateCookieListener.onClose(CookiesInterceptor.java:92)
	io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	com.google.api.gax.grpc.GrpcMetadataHandlerInterceptor$1$1.onClose(GrpcMetadataHandlerInterceptor.java:76)
	io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	io.grpc.census.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:814)
	io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	io.grpc.census.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:494)
	io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574)
	io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72)
	io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
	io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
	io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	... 3 more
	Suppressed: java.lang.RuntimeException: Asynchronous task failed
		at com.google.api.gax.rpc.ServerStreamIterator.hasNext(ServerStreamIterator.java:105)
		at org.apache.beam.sdk.io.gcp.bigtable.BigtableServiceImpl$BigtableReaderImpl.advance(BigtableServiceImpl.java:193)
		at org.apache.beam.sdk.io.gcp.bigtable.BigtableServiceImpl$BigtableReaderImpl.start(BigtableServiceImpl.java:188)
		at org.apache.beam.sdk.io.gcp.bigtable.BigtableIO$BigtableReader.start(BigtableIO.java:2029)
		at org.apache.beam.runners.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:631)
		at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:381)
		at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:211)
		at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
		at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
		at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:304)
		at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:276)
		at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:206)
		at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:150)
		at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:130)
		at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:117)
		at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
		at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
		... 3 more
Caused by: io.grpc.StatusRuntimeException: FAILED_PRECONDITION: Error while reading table 'projects/gopay-ds-staging/instances/risk-serving-bt/tables/risk-serving-bt-batch-feature-engine' : Read returned 269MiB from row '9\255\255O\001T\211\240bq__customer_age_year__inf_2d__customer__numeric__daily_...(length 67)' which exceeds the limit of 256MiB. Make sure you are setting an appropriate request filter to retrieve only recent versions and only the columns you want. If columns are accumulating more versions than you need to read, you can also create a garbage collection policy: https://cloud.google.com/bigtable/docs/configuring-garbage-collection#versions
	io.grpc.Status.asRuntimeException(Status.java:533)
	... 34 more

We have talked with GCP support if there is a workaround for this and they suggest to change the GC policy of the columns in the table. But since the rows and columns structure are created and managed directly by Janusgraph, we have concern that if we modify/change the GC policy, it might corrupt the data.

Our question is, is there a way to configure the size of the rows in janusgraph? Or is it possible to configure the GC policy directly from Janusgraph?

Do let me know if I posted this in a wrong section.

Column families that have large row size:

risk-serving-bt-feature-engine 

family {
  name: "l"
  locality_group: "user_flash"
  administrator: "chubby!mdb/cloud-bigtable-internal-bigtable-administrators-prod"
  administrator: "chubby!user/cloud-bigtable"
  writer: "chubby!mdb/cloud-bigtable-internal-bigtable-writers-prod"
  writer: "chubby!user/cloud-bigtable"
  reader: "chubby!mdb/cloud-bigtable-internal-bigtable-readers-prod"
  reader: "chubby!user/cloud-bigtable"
  gcexpr: "(age() > 604800000000 || version() > 1)"
}

janusgraph version: 0.6.4 storage backend: bigtable

opan avatar Oct 24 '24 05:10 opan