iotdb icon indicating copy to clipboard operation
iotdb copied to clipboard

Local Split feature for optimized TsFile synchronization between IoTDB instances

Open jt2594838 opened this issue 2 years ago • 2 comments

Introduction

This PR introduces Local Split, an optimized procedure of syncing historical TsFiles from one IoTDB instance to another with Pipe. Here is a brief introduction to Local Split and comparisons with the old procedure (called Basic).

  1. The event generation is performed batched with a BatchedTsFileExtractor, i.e., it generates PipeBatchTsFileInsertionEvents containing more than one TsFile. The number of events is thus reduced, and with multiple files in the same event, there are more tuning opportunities during the latter transfer. It also reduces network traffic if each TsFile is small. The size of a batch is controlled by two factors: the total size of files (in bytes) and the number of files. If the total size is so large that timeouts may be triggered, a feedback mechanism collects the throughput during standard cases. It adjusts the total file size according to the product of average throughput and timeout length after a timeout. The recorded throughput history is purged statistically to distinguish timeout triggered by GC stalls and large files.

  2. TsFiles are transferred series-by-series instead of file-by-file. TsFiles in a batch are merge-read, thanks to the sorted manner of timeseries. Chunks of the same timeseries are collected and sent to the receiver, resulting in only one file in the receiver for a batched event. This increases the locality of a timeseries just like chunk merge in compaction, but it does not uncompress and decode the chunk if unnecessary; thus, it is called Shallow Merge. Moreover, as the number of files is reduced in the receiver, so are the higher-level metadata and statistics, which may result in faster aggregation.

  3. Chunks are further compressed. Although a chunk is compressed by default, when using Shallow Merge mentioned above, chunks of the same timeseries are sent in a batch, enabling opportunities for further compression. Chunks in a batch are compressed by LZ4 by default to reduce network bandwidth consumption.

  4. Local Split to avoid chunk-forwarding. In Basic, the receiver may not be the actual data holder; it must forward data to the holders. In Local Split, the sender queries the partition info from the receiver, performs chunk split locally, and sends chunks directly to the data holders. This avoids the possible coordinator and unnecessary traffic. Considering the cross-cluster bandwidth could be sacred, the sender does not send chunks to all replicas of the receiver. Instead, it only sends to one replica and lets the replica forward to others. A throughput-based method is applied to select the best relay among the replicas.

  5. Grouped and parallel chunk sending. Notice that for non-align timeseries, the order of chunks within a device can be arbitrary. This motivates parallel processing when sending chunks of the same device. The chunks within a device are grouped and sent to the receiver in parallel to maximize the resource utility. Chunks are grouped according to the similarity of timeseries, involving the measurementId, data type, and samples from the data. Since groups are compressed as aforesaid, putting similar timeseries together is helpful to get a higher compression ratio.

  6. The second phase, namely closing and loading the file into the memory list, is triggered with a consensus command, which may prevent the case that the load command is executed on some replicas while not on others. Also, it help guarantee the order of loading files is consistent across the replicas.

  7. The schema registration is done right after the chunks are sent to the receivers. As they are sent series-by-series, most of the redundant registration can be avoided. Also, it avoids registering too many timeseries in one request, which may block the schema region for a while.

How to Use

To use the Local Split Feature in a Pipe, the user should add configs when creating a pipe like the following: create pipe a2b with extractor ('extractor.local-split.enable'='true') with connector ('connector.local-split.enable'='true', 'connector.external.config-nodes'='nelbds-15:11710', 'connector'='iotdb-thrift-connector', 'connector.ip'='nelbds-16', 'connector.port'='7667')

'extractor.local-split.enable' and 'connector.local-split.enable'='true' must be set to true to enable the feature. 'connector.external.config-nodes' should be the config node's IP and port of the receiver. 'connector'='iotdb-thrift-connector' is required, as only 'iotdb-thrift-connector' supports the feature currently. 'connector.ip' and 'connector.port' should point to the receiver's data node as other pipes do.

As implied in the Loca Split, the sender must have direct access to all nodes in the receiver. In other words, the sender should be on the firewall white list.

Other configs are: "extractor.split.max-concurrent-file", an integer specifying the maximum number of files in an event. "extractor.split.max-file-batch-size", an integer specifying the maximum total size of files in an event, in bytes. The feedback mechanism may reduce this after a timeout. "connector.split.max-size", an integer specifying the maximum byte size to transfer in a request. "connector.split.max-concurrent-file", an integer specifying the maximum number of files to be Shallow Merged, should be lesser than or equal to "extractor.split.max-concurrent-file". "connector.external.user-name", the user name to be used on the receiver side. "connector.external.password", the password to be used on the receiver side.

Evaluation

To perform the evaluation, around 800GB of data is first written to a 1C1D instance, then synchronized to a 1C3D instance through a Pipe. The number of replicas is the x-axis. Below is the total task completion time: image The results show a 50% reduction in completion time compared with basic.

This is the query latency of aggregating one single timeseries: image Thanks to Shallow Merge, the increased locality and reduced metadata speed up such queries.

On more datasets, Local Split also shows better performance on throughput and query latency. The liuzhen dataset contains so many timeseries that schema registration becomes a bottleneck, and the improvement is less significant. image image

Slides that demonstrate the motivation and intuition: TsFileSync.pptx

jt2594838 avatar Nov 20 '23 04:11 jt2594838

Codecov Report

Attention: 1476 lines in your changes are missing coverage. Please review.

Comparison is base (919a24f) 49.04% compared to head (07b60fe) 49.13%. Report is 1 commits behind head on master.

Files Patch % Lines
...eryengine/execution/load/MergedTsFileSplitter.java 42.43% 251 Missing :warning:
.../load/nodesplit/ClusteringMeasurementSplitter.java 61.60% 139 Missing :warning:
...ol/thrift/impl/DataNodeInternalRPCServiceImpl.java 0.00% 134 Missing :warning:
...ommon/tsfile/TsFileListInsertionDataContainer.java 0.00% 119 Missing :warning:
...t/common/tsfile/PipeBatchTsFileInsertionEvent.java 0.00% 105 Missing :warning:
.../queryengine/execution/load/TsFileSplitSender.java 72.34% 86 Missing :warning:
...otocol/thrift/async/IoTDBThriftAsyncConnector.java 8.21% 67 Missing :warning:
...b/pipe/extractor/historical/ThroughputMonitor.java 0.00% 60 Missing :warning:
...e/extractor/historical/BatchedTsFileExtractor.java 0.00% 56 Missing :warning:
...e/plan/planner/plan/node/load/LoadCommandNode.java 0.00% 47 Missing :warning:
... and 38 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #11580      +/-   ##
============================================
+ Coverage     49.04%   49.13%   +0.08%     
- Complexity    24917    25088     +171     
============================================
  Files          2815     2835      +20     
  Lines        176137   178249    +2112     
  Branches      21132    21421     +289     
============================================
+ Hits          86388    87581    +1193     
- Misses        89749    90668     +919     

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

codecov-commenter avatar Nov 21 '23 03:11 codecov-commenter

SonarCloud Quality Gate failed.    Quality Gate failed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot E 3 Security Hotspots
Code Smell A 32 Code Smells

0.0% 0.0% Coverage
4.2% 4.2% Duplication

idea Catch issues before they fail your Quality Gate with our IDE extension sonarlint SonarLint

sonarqubecloud[bot] avatar Nov 30 '23 01:11 sonarqubecloud[bot]