Local Split feature for optimized TsFile synchronization between IoTDB instances
Introduction
This PR introduces Local Split, an optimized procedure of syncing historical TsFiles from one IoTDB instance to another with Pipe. Here is a brief introduction to Local Split and comparisons with the old procedure (called Basic).
-
The event generation is performed batched with a BatchedTsFileExtractor, i.e., it generates PipeBatchTsFileInsertionEvents containing more than one TsFile. The number of events is thus reduced, and with multiple files in the same event, there are more tuning opportunities during the latter transfer. It also reduces network traffic if each TsFile is small. The size of a batch is controlled by two factors: the total size of files (in bytes) and the number of files. If the total size is so large that timeouts may be triggered, a feedback mechanism collects the throughput during standard cases. It adjusts the total file size according to the product of average throughput and timeout length after a timeout. The recorded throughput history is purged statistically to distinguish timeout triggered by GC stalls and large files.
-
TsFiles are transferred series-by-series instead of file-by-file. TsFiles in a batch are merge-read, thanks to the sorted manner of timeseries. Chunks of the same timeseries are collected and sent to the receiver, resulting in only one file in the receiver for a batched event. This increases the locality of a timeseries just like chunk merge in compaction, but it does not uncompress and decode the chunk if unnecessary; thus, it is called Shallow Merge. Moreover, as the number of files is reduced in the receiver, so are the higher-level metadata and statistics, which may result in faster aggregation.
-
Chunks are further compressed. Although a chunk is compressed by default, when using Shallow Merge mentioned above, chunks of the same timeseries are sent in a batch, enabling opportunities for further compression. Chunks in a batch are compressed by LZ4 by default to reduce network bandwidth consumption.
-
Local Split to avoid chunk-forwarding. In Basic, the receiver may not be the actual data holder; it must forward data to the holders. In Local Split, the sender queries the partition info from the receiver, performs chunk split locally, and sends chunks directly to the data holders. This avoids the possible coordinator and unnecessary traffic. Considering the cross-cluster bandwidth could be sacred, the sender does not send chunks to all replicas of the receiver. Instead, it only sends to one replica and lets the replica forward to others. A throughput-based method is applied to select the best relay among the replicas.
-
Grouped and parallel chunk sending. Notice that for non-align timeseries, the order of chunks within a device can be arbitrary. This motivates parallel processing when sending chunks of the same device. The chunks within a device are grouped and sent to the receiver in parallel to maximize the resource utility. Chunks are grouped according to the similarity of timeseries, involving the measurementId, data type, and samples from the data. Since groups are compressed as aforesaid, putting similar timeseries together is helpful to get a higher compression ratio.
-
The second phase, namely closing and loading the file into the memory list, is triggered with a consensus command, which may prevent the case that the load command is executed on some replicas while not on others. Also, it help guarantee the order of loading files is consistent across the replicas.
-
The schema registration is done right after the chunks are sent to the receivers. As they are sent series-by-series, most of the redundant registration can be avoided. Also, it avoids registering too many timeseries in one request, which may block the schema region for a while.
How to Use
To use the Local Split Feature in a Pipe, the user should add configs when creating a pipe like the following:
create pipe a2b with extractor ('extractor.local-split.enable'='true') with connector ('connector.local-split.enable'='true', 'connector.external.config-nodes'='nelbds-15:11710', 'connector'='iotdb-thrift-connector', 'connector.ip'='nelbds-16', 'connector.port'='7667')
'extractor.local-split.enable' and 'connector.local-split.enable'='true' must be set to true to enable the feature. 'connector.external.config-nodes' should be the config node's IP and port of the receiver. 'connector'='iotdb-thrift-connector' is required, as only 'iotdb-thrift-connector' supports the feature currently. 'connector.ip' and 'connector.port' should point to the receiver's data node as other pipes do.
As implied in the Loca Split, the sender must have direct access to all nodes in the receiver. In other words, the sender should be on the firewall white list.
Other configs are: "extractor.split.max-concurrent-file", an integer specifying the maximum number of files in an event. "extractor.split.max-file-batch-size", an integer specifying the maximum total size of files in an event, in bytes. The feedback mechanism may reduce this after a timeout. "connector.split.max-size", an integer specifying the maximum byte size to transfer in a request. "connector.split.max-concurrent-file", an integer specifying the maximum number of files to be Shallow Merged, should be lesser than or equal to "extractor.split.max-concurrent-file". "connector.external.user-name", the user name to be used on the receiver side. "connector.external.password", the password to be used on the receiver side.
Evaluation
To perform the evaluation, around 800GB of data is first written to a 1C1D instance, then synchronized to a 1C3D instance through a Pipe. The number of replicas is the x-axis.
Below is the total task completion time:
The results show a 50% reduction in completion time compared with basic.
This is the query latency of aggregating one single timeseries:
Thanks to Shallow Merge, the increased locality and reduced metadata speed up such queries.
On more datasets, Local Split also shows better performance on throughput and query latency.
The liuzhen dataset contains so many timeseries that schema registration becomes a bottleneck, and the improvement is less significant.
Slides that demonstrate the motivation and intuition: TsFileSync.pptx
Codecov Report
Attention: 1476 lines in your changes are missing coverage. Please review.
Comparison is base (
919a24f) 49.04% compared to head (07b60fe) 49.13%. Report is 1 commits behind head on master.
Additional details and impacted files
@@ Coverage Diff @@
## master #11580 +/- ##
============================================
+ Coverage 49.04% 49.13% +0.08%
- Complexity 24917 25088 +171
============================================
Files 2815 2835 +20
Lines 176137 178249 +2112
Branches 21132 21421 +289
============================================
+ Hits 86388 87581 +1193
- Misses 89749 90668 +919
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
SonarCloud Quality Gate failed. 
0 Bugs
0 Vulnerabilities
3 Security Hotspots
32 Code Smells
0.0% Coverage
4.2% Duplication
Catch issues before they fail your Quality Gate with our IDE extension
SonarLint