HDDS-10338. Implement a Client Datanode API to stream a block
What changes were proposed in this pull request?
To reduce round trips between the Client and Datanode for reading a block, we nee a new API to read.
Client -> block(offset, length) -> Datanode
Client <- chunkN <- Datanode
Client <- chunkN+1 <- Datanode
..
Client <-chunkLast <- Datanode
This is using the ability of gRPC to send bidirectional traffic such that the server can pipeline the chunks to the client without waiting for ReadChunk API calls. This also avoids the client from creating multiple Chunk Stream Clients and should simplify the read path on the client side by a bit.
Please describe your PR in detail:
- Add a new logic at both client and server side to read block as streaming chunks.
- Add a new
StreamBlockInputat client side called from KeyInputStream to read a block from the container. - Add unit tests and integration tests for `StreamBlockInput.
- Add a new version in datanode for compatibilities, while new client reading blocks from old server, it will fallback and read blocks by
BlockInputStream.
What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/HDDS-10338
How was this patch tested?
There are existed test for reading data.
@szetszwo could you please take a look?
@chungen0126 can you please address the merge conflicts?
Sure, I'll update it shortly.
I wonder if this patch is completed, we are looking forward to try this on our environment.
Test conducted on our cluster(3DN / HDD / 10 Gigabit network) shows this improvement can boost read speed by at least 30%.
The direction of the change looks good, I will need to go over the code a few more times. I flagged a few changes please take a look.
Test conducted on our cluster(3DN / HDD / 10 Gigabit network) shows this improvement can boost read speed by at least 30%.
In single thread read, stream read cut read time from 7.3 - 7.4s to 4.8 - 5.2s. In freon ozone-client-one-key-reader test, stream read increased read bandwidth from 427MB/s to 586MB/s.
Thanks for @chungen0126 's work. We previously discovered an issue with checksum calculation and it has been resolved.
Single threaded reading of 1GB files :
| times | grpc stream | grpc |
|---|---|---|
| 1 | 5223 ms | 7435 ms |
| 2 | 4874 | 7446 |
| 3 | 4953 | 7386 |
| 4 | 4851 | 7402 |
| 5 | 4836 | 7433 |
| 6 | 5178 | 7393 |
| 7 | 4949 | 7343 |
| 8 | 5344 | 7467 |
| 9 | 4858 | 8001 |
| 10 | 5176 | 7300 |
It can be seen that for 1GB file reading, streaming reading can reduce the reading time by about 30%.
@chungen0126 , thanks for the quick patch updating. I will try to finish the review this week.
Test conducted on our cluster(3DN / HDD / 10 Gigabit network) shows this improvement can boost read speed by at least 30%.
It can be seen that for 1GB file reading, streaming reading can reduce the reading time by about 30%.
Hi @chungen0126 @guohao-rosicky, do we know how much of an improvement is being seen for files smaller than 1GB?
Test conducted on our cluster(3DN / HDD / 10 Gigabit network) shows this improvement can boost read speed by at least 30%.
It can be seen that for 1GB file reading, streaming reading can reduce the reading time by about 30%.
Hi @chungen0126 @guohao-rosicky, do we know how much of an improvement is being seen for files smaller than 1GB?
@ptlrs conducted test using freon on the latest version of this pr, hardware is 3DN / HDD / 10 Gigabit network, tested file sizes are 1GB 128MB 16MB 2MB 256KB. there is an index bug when verifying checksum, tests were conducted after fixing it. here are the redunctions in mean read time given by freon:
- 1GB: 7%
- 128MB: 10%
- 16MB 8%
- 2MB 10%
- 256KB 8%
further tests were conducted with verifying checksum skipped, it turns out checksum have a substantial impact on read time. when testing using freon on 1GB file, turning on checksum caused read time to increase from around 3700ms to 5000ms. with checksum off, the read time reduction of stream block compared with current read method(checksum also off) is:
- 1GB: 30%
- 128MB: 21%
- 16MB: 11%
- 2MB: -5.6% (chunk size is 4MB)
Can you resolve the code conflict? @chungen0126
Temporarily converted to draft and assigned to myself, to resolve conflicts.
Thanks @adoroszlai for fixing the conflicts. I was just about to address it.
@chungen0126 Could you explain more detail how this change works, and why it should perform better than the current approach? Has this code been run on any cluster to prove it works and also that its performance is better than the current approach. eg for ozone sh key get even on a local docker environment?
As I understand the current approach, provided a client asks for chunk size reads, the logic is that we pull one chunk at a time from the server to the client. When the reader has consumed that, it will go back and make another read chunk call.
With the approach in this PR, it talks about using a GRPC bi-directional feature to pipeline the chunks to the client. I don't understand how this works exactly, but I immediately wonder:
- Is there more buffering memory needed on the client?
- How many chunks could the client need to buffer at any time for this to be effective?
- What triggers new chunks to come from server to client?
- What if the client needs to seek and read parts of the key, such as for ORC file?
- Is a GRPC handler tied up on the server for the entire time the block is being read by the client? Or is a thread pulled from the thread pool only when a new chunk needs to be sent to the client?
Thanks.
These are the steps I used to test the streaming block code.
- Download this PR.
- Merge in master - there was one small compile error I fixed by passing a string to the container scanner
- Create a single node docker cluster with the build, but adjust the config to make the blocksize 128MB rather than 1MB (docker_config file in the compose/ozone folder)
Then create a 1GB file:
dd if=/dev/urandom of=1gb bs=1M count=1024
Create a bucket, volume and the key:
ozone sh volume create sodonnell
ozone sh bucket create sodonnell/bucket
ozone sh key put sodonnell/bucket/1bg
export OZONE_ROOT_LOGGER=INFO,console
ozone sh key cat sodonnell/bucket/1gb > /dev/null
The cat command fails with this checksum error:
2025-09-19 10:17:17,474 [main] INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2025-09-19 10:17:17,531 [main] INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2025-09-19 10:17:17,531 [main] INFO impl.MetricsSystemImpl: XceiverClientMetrics metrics system started
2025-09-19 10:17:18,249 [main] WARN scm.XceiverClientGrpc: Failed to execute command ReadBlock on the pipeline Pipeline{ Id: dbc8f351-a151-432f-83de-ece94494810e, Nodes: [ {7fb912ed-eec9-4788-8b35-15f243b514ee(ozone-datanode-1.ozone_default/172.20.0.5), ReplicaIndex: 0},], ReplicationConfig: STANDALONE/ONE, State:ALLOCATED, leaderId:, CreationTimestamp2025-09-19T10:13:08.181Z[UTC]}.
2025-09-19 10:17:18,251 [main] WARN scm.XceiverClientGrpc: Failed to execute command ReadBlock on the pipeline Pipeline{ Id: dbc8f351-a151-432f-83de-ece94494810e, Nodes: [ {7fb912ed-eec9-4788-8b35-15f243b514ee(ozone-datanode-1.ozone_default/172.20.0.5), ReplicaIndex: 0},], ReplicationConfig: STANDALONE/ONE, State:ALLOCATED, leaderId:, CreationTimestamp2025-09-19T10:13:08.181Z[UTC]}.
2025-09-19 10:17:18,253 [main] WARN scm.XceiverClientGrpc: Failed to execute command ReadBlock on the pipeline Pipeline{ Id: dbc8f351-a151-432f-83de-ece94494810e, Nodes: [ {7fb912ed-eec9-4788-8b35-15f243b514ee(ozone-datanode-1.ozone_default/172.20.0.5), ReplicaIndex: 0},], ReplicationConfig: STANDALONE/ONE, State:ALLOCATED, leaderId:, CreationTimestamp2025-09-19T10:13:08.181Z[UTC]}.
Checksum count mismatched: thatChecksumsCount=1 > thisChecksumsCount (=256 ) - thisStartIndex (=256)
I captured debug logs before this error occurred, and I can see it performed 259 getBlock calls against the same block (4kb at a time)
grep "Executing command cmdType: ReadBlock" debug.log | wc -l
259
So the code as it stands in this test doesn't seem to be making a single getBlock call to read the entire block. Its making a call for every 4kb, which is even more strange as the checksum size is set to 16kb here.
The reason for the checksum error above, is this line in StreamBlockInputStream:
if (verifyChecksum) {
ChecksumData checksumData = ChecksumData.getFromProtoBuf(
chunkInfo.getChecksumData());
int startIndex = (int) readChunk.getChunkData().getOffset() / checksumData.getBytesPerChecksum();
The startIndex calculated needs to be hashed into the checksum position per block. Eg, for a 16kb checksum, there are 256 checksums per 4MB chunk. So the actual checksum index is startIndex % 256.
However, the problem is that we don't know the chunkSize in the client, and different blocks could have different chunk sizes if the cluster setting was changed at some point. Each readBlock response carries all the checksums for that chunk and the block offset for the data it contains, but without knowing the chunkSize I don't think there is a way to divide the overall block offset down to the chunk level offset to index into the checksum array for that chunk.
ChunkInputStream works this out by comparing the requested offset with the offset returned, its not clear to me why that works in the general case.
I've merged in master and added a commit which fixes the checksum issue by passing the chunk block offset back to the client in the chunkInfo proto. The code as it stands with these changes works in my above test with a 1GB file.
Hi @sodonnel , thanks for the fix. I didn't have time to work on this. I think it works well now. I change this part in a commit before, mistakenly thinking that this was the correct way to write it.
I change this part in a commit before, mistakenly thinking that this was the correct way to write it.
This was a difficult part. I am not sure I have fixed it the best way, but it was the only way I could think of to do it safely.
@chungen0126 , thanks a lot for working on this!
Looked at the existing code vs this change and had the following observations:
Current readChunk code:
-
Server side slowness: ⚠️ Repeatedly open/close local files: In ChunkUtils, we open, seek and close the local file for each chunk. It will be slow for large files.
-
Client side slowness: ⚠️ Sequential round trip waiting: readChunk(..) -> XceiverClientSpi.sendCommand(..) -> XceiverClientGrpc.sendCommandAsync(..) but XceiverClientSpi.sendCommand(..) will wait for the future for each chunk (i.e. it is NOT aysnc) ⚠️ Repeatedly open/close gRPC streams: In XceiverClientGrpc.sendCommandAsync(..) , we create a new gRPC stream, read a chunk and then close it. It repeats for each chunk.
This pull request:
-
Server side: ⚠️ Still use the same ChunkUtils code which opens/seek/closes the local file for each read.
-
Client side: ✅ Using gRPC async stream (i.e. it is real async) ½✅ sendCommandReadBlock(..) only creates a gRPC stream per read(..) call but not a single stream per block. If the application code uses small (e.g. 64KB) buffer, then it is going to create a lot of gRPC streams.
Ideally:
-
Server side: ✅ Open the local file at the first stream read and close it when the application closes the stream.
-
Client side: ✅ Async stream -- multiple read calls do not wait for the previous call to complete ✅ Single stream -- only one RPC stream (e.g. gRPC stream) per application stream (e.g. OzoneFSInputStream) ✅ Arbitrary buffer size -- support any buffer size although a larger buffer may improve the performance.
Looking at this some more, I added in some debug messages to see what was getting called. The client does make a new call to the server for each "read", which is at the checksum boundary.
On the server side, there is a some GRPC stuff getting setup with onNext() etc, which makes it look like a single call from the client will receive many chunks over the single call, but that isn't happening. Each 16kb read is a new call to the server.
On the client side, in XceiverClientGrpc, we have a new sendReadBlock() method, and it uses a StreamObserver with onNext too, but that doesn't seem to get used.
Surely what we need to do, is create some sort of GRPC stream which is returned to the StreamBlockInputStream, and it calls onNext() on it somehow until there is no more to return.
However then what happens with a seek, if I read some of the block and then seek and then read some more. How is that supposed to work?
2025-10-03 12:19:41,888 [ForkJoinPool-1-worker-19] INFO scm.XceiverClientGrpc (XceiverClientGrpc.java:sendCommandReadBlock(707)) - XXX XceiverClientGrpc-1 -> ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onNext : request 1 bytes
2025-10-03 12:19:41,906 [grpc-default-executor-2] INFO scm.XceiverClientGrpc (XceiverClientGrpc.java:onNext(667)) - XXX XceiverClientGrpc-1 -> ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onNext 1: response 262144 bytes
2025-10-03 12:19:41,906 [grpc-default-executor-2] INFO scm.XceiverClientGrpc (XceiverClientGrpc.java:onCompleted(689)) - XXX XceiverClientGrpc-1 -> ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onCompleted
2025-10-03 12:19:41,911 [ForkJoinPool-1-worker-19] INFO scm.XceiverClientGrpc (XceiverClientGrpc.java:sendCommandReadBlock(707)) - XXX XceiverClientGrpc-1 -> ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onNext : request 131072 bytes
2025-10-03 12:19:41,919 [grpc-default-executor-0] INFO scm.XceiverClientGrpc (XceiverClientGrpc.java:onNext(667)) - XXX XceiverClientGrpc-1 -> ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onNext 1: response 262144 bytes
2025-10-03 12:19:41,919 [grpc-default-executor-0] INFO scm.XceiverClientGrpc (XceiverClientGrpc.java:onCompleted(689)) - XXX XceiverClientGrpc-1 -> ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onCompleted
2025-10-03 12:19:41,923 [ForkJoinPool-1-worker-19] INFO scm.XceiverClientGrpc (XceiverClientGrpc.java:sendCommandReadBlock(707)) - XXX XceiverClientGrpc-1 -> ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onNext : request 393216 bytes
2025-10-03 12:19:41,927 [grpc-default-executor-2] INFO scm.XceiverClientGrpc (XceiverClientGrpc.java:onNext(667)) - XXX XceiverClientGrpc-1 -> ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onNext 1: response 524288 bytes
2025-10-03 12:19:41,927 [grpc-default-executor-2] INFO scm.XceiverClientGrpc (XceiverClientGrpc.java:onCompleted(689)) - XXX XceiverClientGrpc-1 -> ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onCompleted
...
Added some debug messages as below. As shown in the output above, the current change does not work as expected:
- onCompleted is called right after each onNext (thanks @sodonnel for pointing it out), and
- the request lengths and the response lengths look quite random.
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 09e01593fe..dfee5b7946 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -30,11 +30,14 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -102,6 +105,9 @@ public class XceiverClientGrpc extends XceiverClientSpi {
private boolean closed = false;
+
+ static final AtomicInteger idGenerator = new AtomicInteger(0);
+ private final String name = getClass().getSimpleName() + "-" + idGenerator.incrementAndGet();
/**
* Constructs a client that can communicate with the Container framework on
* data nodes via DatanodeClientProtocol.
@@ -649,11 +655,18 @@ public XceiverClientReply sendCommandReadBlock(
final StreamObserver<ContainerCommandRequestProto> requestObserver =
asyncStubs.get(dnId).withDeadlineAfter(timeout, TimeUnit.SECONDS)
.send(new StreamObserver<ContainerCommandResponseProto>() {
+ int step = 0;
@Override
public void onNext(
ContainerCommandResponseProto responseProto) {
+ final ContainerProtos.ReadChunkResponseProto chunk = responseProto.getReadChunk();
+ final Long length = Optional.ofNullable(chunk)
+ .map(ContainerProtos.ReadChunkResponseProto::getChunkData)
+ .map(ContainerProtos.ChunkInfo::getLen)
+ .orElse(null);
+ LOG.info("XXX {} -> {}, onNext {}: response {} bytes", name, dn, ++step, length);
if (responseProto.getResult() == Result.SUCCESS) {
- readBlock.addReadChunk(responseProto.getReadChunk());
+ readBlock.addReadChunk(chunk);
} else {
future.complete(
ContainerCommandResponseProto.newBuilder(responseProto)
@@ -663,6 +676,7 @@ public void onNext(
@Override
public void onError(Throwable t) {
+ LOG.info("XXX {} -> {}, onError", name, dn);
future.completeExceptionally(t);
metrics.decrPendingContainerOpsMetrics(cmdType);
metrics.addContainerOpsLatency(
@@ -672,6 +686,7 @@ public void onError(Throwable t) {
@Override
public void onCompleted() {
+ LOG.info("XXX {} -> {}, onCompleted", name, dn);
if (readBlock.getReadChunkCount() > 0) {
future.complete(response.setReadBlock(readBlock)
.setCmdType(Type.ReadBlock).setResult(Result.SUCCESS).build());
@@ -687,6 +702,9 @@ public void onCompleted() {
semaphore.release();
}
});
+
+
+ LOG.info("XXX {} -> {}, onNext : request {} bytes", name, dn, request.getReadBlock().getLen());
requestObserver.onNext(request);
requestObserver.onCompleted();
return new XceiverClientReply(future);
the request lengths and the response lengths look quite random.
It may be due to the test which reads 1 byte and then some more bytes.
After changed the test to reads 10MB, it failed with IllegalStateException.
2025-10-06 10:49:36,161 [a38e63c4-83c0-4a67-ad0d-45b74141c76b-ChunkReader-4] ERROR server.GrpcXceiverService (GrpcXceiverService.java:onNext(120)) - Got exception when processing ContainerCommandRequestProto cmdType: ReadBlock
traceID: ""
containerID: 1
datanodeUuid: "a0bb7c49-0425-40d6-8347-e5f39552daf7"
version: 3
readBlock {
blockID {
containerID: 1
localID: 115816896921600001
blockCommitSequenceId: 16
}
offset: 8388608
len: 2097152
verifyChecksum: true
}
java.lang.IllegalStateException
at com.google.common.base.Preconditions.checkState(Preconditions.java:497)
at org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler.readBlock(KeyValueHandler.java:2072)
at org.apache.hadoop.ozone.container.common.impl.HddsDispatcher.streamDataReadOnly(HddsDispatcher.java:864)
at org.apache.hadoop.ozone.container.common.transport.server.GrpcXceiverService$1.onNext(GrpcXceiverService.java:114)
at org.apache.hadoop.ozone.container.common.transport.server.GrpcXceiverService$1.onNext(GrpcXceiverService.java:98)
at org.apache.ratis.thirdparty.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:262)
at org.apache.ratis.thirdparty.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
at org.apache.hadoop.hdds.tracing.GrpcServerInterceptor$1.onMessage(GrpcServerInterceptor.java:49)
The change of TestStreamBlockInputStream:
@@ -73,7 +74,7 @@ void testAll(ContainerLayoutVersion layout) throws Exception {
*/
private void testBlockReadBuffers(TestBucket bucket) throws Exception {
String keyName = getNewKeyName();
- int dataLength = (2 * BLOCK_SIZE) + (CHUNK_SIZE);
+ int dataLength = 10 << 20;
byte[] inputData = bucket.writeRandomBytes(keyName, dataLength);
try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) {
@@ -82,13 +83,8 @@ private void testBlockReadBuffers(TestBucket bucket) throws Exception {
(StreamBlockInputStream) keyInputStream.getPartStreams().get(0);
- // To read 1 byte of chunk data, ChunkInputStream should get one full
- // checksum boundary worth of data from Container and store it in buffers.
- IOUtils.readFully(block0Stream, new byte[1]);
- checkBufferSizeAndCapacity(block0Stream.getCachedBuffers());
-
// Read > checksum boundary of data from chunk0
- int readDataLen = BYTES_PER_CHECKSUM + (BYTES_PER_CHECKSUM / 2);
+ int readDataLen = dataLength;
byte[] readData = readDataFromBlock(block0Stream, 0, readDataLen);
bucket.validateData(inputData, 0, readData);
Thanks, @sodonnel, for the updates. I apologize that I don't have time to work on this right now, but I gave a rough review. I have one question regarding the updates: The read chunk in Ozone and previous implementation of this pr caches the entire chunks and blocks, whereas the current version only caches a single response. While this will reduce memory usage, it may slow down seeking performance. Is this the intended trade-off?
While this will reduce memory usage, it may slow down seeking performance. Is this the intended trade-off?
While only the current buffer and one pending buffer are stored in the reader, there is scope to configure the size of that queue. However what I found in practice is that GRPC seems to buffer about 1MB of data on the connection socket, so that when the item is consumed from the queue it immediately gets refilled.
Compared to the existing implementation in Ozone with 16KB checksums, the time to read 1GB dropped from about 30 seconds to 7 seconds in local tests. 1MB checksums went from 4.5s to 2.6s.
Seeks are another matter. I have not attempted to benchmark them as yet, but they would tend to be slow with any approach, as we can only cache a reasonable amount of data in the client - say at most 4MB, so there is every chance a seek takes you outside of that.
The approach I have implemented simplifies the code quite a bit I think, and it reads the entire block in a single RPC call and only opens the block file on the server one time.
I think it will be much better than the current code in Ozone and it can of course be refined further once we get the first version committed and more extensive testing done.
I still have a bit more work to do on this PR around testing and retrying reads on failed DNs.
@swamirishi , @devabhishekpal , since you have requested changes, could you review the latest changes?
Suggested approach: Non-blocking API (gRPC) + Non-blocking threads
More details:
An application opens an input stream and calls read(L) (blocking call), where L is the requested data length:
- Ozone client checks the buffer queue if it already has the data.
- If not, send an onNext(L) request to a datanode.
- Pull data from the buffer queue until L bytes and then return to the application
- We may keep using BlockingQueue responseQueue as the buffer queue with unlimited capacity (since the number of responses is limited).
When a datanode receives onNext(L)
- Open the file if it is the first call.
- Read the file at least L bytes of data.
- It could read up to checksum/chunk boundary, or even a few chunks more for pre-read.
- Return the data by one or more onNext() responses.
When Ozone client receive onNext() responses
- Put the data in the buffer queue.
When the application closes the input stream:
- The Ozone client sends an onComplete() to the datanode.
- When the datanode receives onComplete(), close the file.
When the datanode hits EOF of the block file:
- The datanode closes the file and sends onComplete() to the Ozone client.
- Ozone client closes the gRPC stream and continue with the next block.
When a datanode receives onNext(L)
* Open the file if it is the first call. * Read the file at least L bytes of data. * It could read up to checksum/chunk boundary, or even a few chunks more for pre-read. * Return the data by one or more onNext() responses.
How is this going to look on the server? If the server gives up its handler thread after sending each piece of the block to the client then for each onNext() from the client, its going to have to:
- Acquire a thread
- Lookup RocksDB for the checksum and block meta data
- Calculate various offsets etc to align the checksums.
- Open the block file, seek
- Read the desired amount and close the file
- Release the thread back to the pool
My observations is that for most calls, these client reads are not "50MB" or anything like it. They are generally 4kb, and get rounded up to the next checksum boundary. So 16kb by default. So to read the 256MB block we are going to have to do the above 16k times. This cannot be efficient for something which is trying to read a block from start to end.
While this is an async API, from the clients point of view its blocking. The client needs more data and it has to wait for it to arrive. With the streaming approach I have implemented in this PR, the data is queuing up to a reasonable limit on the socket and as the client needs it, it will be immediately available usually and avoids the rework if initializing the read each time.
With the approach you are describing, compared to the approach used in Ozone today, it is just keeping the GRPC stream alive. Its still a very chatty back and forward protocol passing small pieces of data and I don't see how it is going to drastically improve things due to the amount of rework required for each tiny read.
How is this going to look on the server? If the server gives up its handler thread after sending each piece of the block to the client then for each onNext() from the client, ...
Not sure about you question. Here is an example:
-
Client: opens the block and requests with 10KB. Client: sends an onNext() request with 10KB. Server: Since this is the first request, it opens the file. Server: invokes a readChunk call to read 1MB (rounding up the chunk boundary) Server: sends an onNext() response with 1MB data.
-
Client: requests 20KB Client: returns the data since it is already in client buffer (No onNext() request)
-
Client: requests 50MB Client: sends an onNext() request with 50MB. Server: loop 50 times below in
onNext(ContainerCommandRequestProto request)in GrpcXceiverService (a) invokes a readChunk call to read 1MB (b) sends an onNext() response with 1MB data.