Up to 20x : Binary Copy Capability For Compaction
Overview
In Lance’s current compaction process, the data files of the fragments to be merged are fully decompressed and deserialized into Arrow format, then re-serialized and compressed to produce a few larger fragments. However, in the simple “merge small files” scenario, we can directly copy the pages within data files at the binary level (binary copy) and reconstruct the metadata to form a larger fragment. This bypasses page data parsing and significantly improves the performance of small-file merging. Note that binary copy does not inspect data distribution within pages and therefore cannot handle fragments that contain a deletion vector. Consequently, during compaction, if a fragment is found to include a deletion file, the process should fall back to the original full read/write compaction approach.
Design
High-Level Flow
When compact_files is executed, it will determine whether this optimization path can be used through the can_use_binary_copy() function. It will only be triggered when all conditions are met. The specific conditions are as follows:
• Feature switch: CompactionOptions.enable_binary_copy must be set to true. This is a master switch, which is turned off by default to ensure backward compatibility and stability.
• Version constraints: All data files (*.lance) contained in all Fragments to be merged must have exactly the same file versions (major/minor versions). For example, a file written with version v2.0 cannot be merged with another file written with version v2.1.
• No deletion files: None of the Fragments to be merged can have associated deletion files (deletion_file). The Binary Copy path itself is not responsible for implementing the materialization of deleted rows; it only handles data transfer.
Binary Copy Details
The core logic resides in rewrite_files_binary_copy. It operates at the page and buffer level, stitching together new data files from the pieces of the old ones.
-
I/O Batching: To optimize I/O, the algorithm reads data in large, coalesced batches. It iterates through the pages of each column in each source file, grouping ranges of buffers to be read in a single request. The size of these batches is controlled by
binary_copy_read_batch_bytes. - 64-Byte Alignment: All data copied to the new file is aligned to a 64-byte boundary. This is a requirement for modern Lance file versions (v2.1+) and ensures optimal I/O performance. Padding bytes are inserted as needed.
- Column-Level Buffer Copying: In addition to page data, any column-level buffers (metadata or dictionaries stored outside of pages) are also copied with alignment.
-
Footer Finalization: After all data is copied to a new file, the footer is written using the standard
FileWriter::finish()method. This function takes the accumulated column metadata, page tables, and total row count, and writes the final manifest block, including schema, offset tables, and version information. This ensures the output file is a valid and well-formed Lance file.
Row IDs and Index Interactions
-
Stable Row IDs:
- When enabled: If the dataset has stable_row_ids enabled, the Binary Copy function is responsible for inheriting the row ID sequences (RowIdSequence) from the old Fragments without any changes. It reads the row IDs of all old Fragments, then rechunks them according to the row count boundaries of the new Fragments, and stores the new row ID sequences either inline or externally in the metadata of the new Fragments.
- When disabled: If the dataset uses row IDs based on addresses (fragment_id + offset). Lance must generate a complete mapping from every old row ID to its new row ID. For binary copy, Since it has already been ensured earlier that the fragments being processed do not have delete files, it is only necessary to simulate a row_id_rx, which consists of the fragment id and row size.
Compatibility Issues
Brand-new features with rollback capability, free from historical compatibility issues.
Performance Testing
Using the same environment (local mode) to process 5 million rows of data with a nested schema, the performance comparison before and after optimization is as follows (the test was executed repeatedly 5 times, and the performance conclusions were consistent):
full_compaction=418.587038012s binary_copy=15.332320858s
speedup=27.30x
{
"schema": {
"fields": [
{
"name": "vec1",
"type": "fixed_size_list",
"nullable": true,
"item_type": "float32",
"size": 12
},
{
"name": "vec2",
"type": "fixed_size_list",
"nullable": true,
"item_type": "float32",
"size": 8
},
{
"name": "i32",
"type": "int32",
"nullable": true
},
{
"name": "i64",
"type": "int64",
"nullable": true
},
{
"name": "f32",
"type": "float32",
"nullable": true
},
{
"name": "f64",
"type": "float64",
"nullable": true
},
{
"name": "bool",
"type": "boolean",
"nullable": true
},
{
"name": "date32",
"type": "date32",
"nullable": true
},
{
"name": "date64",
"type": "date64",
"nullable": true
},
{
"name": "ts_ms",
"type": "timestamp",
"unit": "millisecond",
"timezone": null,
"nullable": true
},
{
"name": "utf8",
"type": "utf8",
"nullable": true
},
{
"name": "large_utf8",
"type": "large_utf8",
"nullable": true
},
{
"name": "bin",
"type": "binary",
"nullable": true
},
{
"name": "large_bin",
"type": "large_binary",
"nullable": true
},
{
"name": "varbin",
"type": "binary",
"nullable": true,
"notes": ""
},
{
"name": "fsb16",
"type": "fixed_size_binary",
"size": 16,
"nullable": true
},
{
"name": "fsl4",
"type": "fixed_size_list",
"nullable": true,
"item_type": "float32",
"size": 4
},
{
"name": "struct_simple",
"type": "struct",
"nullable": true,
"fields": [
{
"name": "x",
"type": "uint32",
"nullable": true
},
{
"name": "y",
"type": "large_utf8",
"nullable": true
}
]
},
{
"name": "struct_nested",
"type": "struct",
"nullable": true,
"fields": [
{
"name": "inner",
"type": "struct",
"nullable": true,
"fields": [
{
"name": "x",
"type": "uint32",
"nullable": true
},
{
"name": "y",
"type": "large_utf8",
"nullable": true
}
]
},
{
"name": "fsb",
"type": "fixed_size_binary",
"size": 16,
"nullable": true
},
{
"name": "bin",
"type": "binary",
"nullable": true
}
]
},
{
"name": "events",
"type": "large_list",
"nullable": true,
"item": {
"type": "struct",
"nullable": true,
"fields": [
{
"name": "ts",
"type": "timestamp",
"unit": "millisecond",
"timezone": null,
"nullable": true
},
{
"name": "payload",
"type": "binary",
"nullable": true
}
]
}
}
]
}
}
The performance test code are as followed:
#[tokio::test]
async fn test_perf_binary_copy_vs_full() {
use arrow_schema::{DataType, Field, Fields, TimeUnit};
use lance_core::utils::tempfile::TempStrDir;
use lance_datagen::{array, gen_batch, BatchCount, Dimension, RowCount};
use std::time::Instant;
let row_num = 5_000_000;
let inner_fields = Fields::from(vec![
Field::new("x", DataType::UInt32, true),
Field::new("y", DataType::LargeUtf8, true),
]);
let nested_fields = Fields::from(vec![
Field::new("inner", DataType::Struct(inner_fields.clone()), true),
Field::new("fsb", DataType::FixedSizeBinary(16), true),
Field::new("bin", DataType::Binary, true),
]);
let event_fields = Fields::from(vec![
Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true),
Field::new("payload", DataType::Binary, true),
]);
let reader_full = gen_batch()
.col("vec1", array::rand_vec::<Float32Type>(Dimension::from(12)))
.col("vec2", array::rand_vec::<Float32Type>(Dimension::from(8)))
.col("i32", array::step::<Int32Type>())
.col("i64", array::step::<Int64Type>())
.col("f32", array::rand::<Float32Type>())
.col("f64", array::rand::<Float64Type>())
.col("bool", array::rand_boolean())
.col("date32", array::rand_date32())
.col("date64", array::rand_date64())
.col(
"ts_ms",
array::rand_timestamp(&DataType::Timestamp(TimeUnit::Millisecond, None)),
)
.col(
"utf8",
array::rand_utf8(lance_datagen::ByteCount::from(16), false),
)
.col("large_utf8", array::random_sentence(1, 6, true))
.col(
"bin",
array::rand_fixedbin(lance_datagen::ByteCount::from(24), false),
)
.col(
"large_bin",
array::rand_fixedbin(lance_datagen::ByteCount::from(24), true),
)
.col(
"varbin",
array::rand_varbin(
lance_datagen::ByteCount::from(8),
lance_datagen::ByteCount::from(32),
),
)
.col("fsb16", array::rand_fsb(16))
.col(
"fsl4",
array::cycle_vec(array::rand::<Float32Type>(), Dimension::from(4)),
)
.col("struct_simple", array::rand_struct(inner_fields.clone()))
.col("struct_nested", array::rand_struct(nested_fields))
.col(
"events",
array::rand_list_any(array::rand_struct(event_fields.clone()), true),
)
.into_reader_rows(RowCount::from(row_num), BatchCount::from(10));
let full_dir = TempStrDir::default();
let a = full_dir.as_into_string().into();
println!("full_dir: {:?}", a);
let mut dataset = Dataset::write(
reader_full,
&*full_dir,
Some(WriteParams {
data_storage_version: Some(LanceFileVersion::V2_1),
max_rows_per_file: (row_num / 100) as usize,
..Default::default()
}),
)
.await
.unwrap();
let opt_full = CompactionOptions {
enable_binary_copy: false,
..Default::default()
};
let opt_binary = CompactionOptions {
enable_binary_copy: true,
enable_binary_copy_force: true,
..Default::default()
};
let t0 = Instant::now();
let _ = compact_files(&mut dataset, opt_full, None).await.unwrap();
let d_full = t0.elapsed();
let before = dataset.count_rows(None).await.unwrap();
let versions = dataset.versions().await.unwrap();
let mut dataset = dataset.checkout_version(1).await.unwrap();
dataset.restore().await.unwrap();
let t1 = Instant::now();
let _ = compact_files(&mut dataset, opt_binary, None).await.unwrap();
let d_bin = t1.elapsed();
let after = dataset.count_rows(None).await.unwrap();
println!(
"perf: full_compaction={:?}, binary_copy={:?}, speedup={:.2}x",
d_full,
d_bin,
(d_full.as_secs_f64() / d_bin.as_secs_f64())
);
assert_eq!(before, after);
}
Thank you @zhangyue19921010 for working on this. This idea seems cool. One question I have for now is, we are currently trying to add more statistics like bloom/zonemap at the fragment level, how do we handle them during the fragment merge?
Thank you @zhangyue19921010 for working on this. This idea seems cool. One question I have for now is, we are currently trying to add more statistics like bloom/zonemap at the fragment level, how do we handle them during the fragment merge?
Hi @Xuanwo Thanks for your response. For statistical information such as bloom/zonemap, it may rely on the operations of the statistics themselves. For example, Bloom can perform bitwise OR operations, but it requires that all input segments' Bloom filters must have consistent configurations: m (number of bits), k (number of hash functions), hash function/seed, and encoding strategy.
Hey, the zonemap/bloomfilter uses a notion called zone size which is independent of the fragments so that it provides maximum flexibility. The TLDR version is that users can arbitrarily specify how many rows they want to put into each zone and it does not need to align with fragment size at all.(example note here the zone size is 5000 while the fragment sizes are not.)
Referring to the existing compaction, it does not apply to zonemap/bloomfilter by design. There is a guard called can_remap which isolates these two scalar indexes with compaction. If users want to update the indexes after the compaction they must re-train the indexes. This PR is good to go from on these two indexes' point of view per current impl.
Though the incoming new column statistics #4540 could be affected by this PR(?). I strongly encourage you to add a test case covering this scenario(compact data with zonemap/bloomfilter enabled) so that any issue will manifest itself in the future. Thanks.
The TLDR version is that users can arbitrarily specify how many rows they want to put into each zone and it does not need to align with fragment size at all.
@HaochengLIU my understanding is that, although a zone has a fixed size, it won't go beyond a fragment right? For example if I have zone size 100, and a fragment have 250 rows, it will have 3 zones [1-100], [101-200], [201-250], it won't go beyond that and include rows in the next fragment.
Because of that, I think doing binary copy would be fine and we just need to map old zone maps to point to the new zones and it should be a faster operation comparing to recompute the stats or bloom filters.
However, one thing I do worry about is the compatibility of this feature with what @wjones127 proposed in https://github.com/lance-format/lance/discussions/4540#discussioncomment-15176342, which would put index segments in the global buffer of the lance data file. If we have that, doing binary copy would mess up with the global buffer. I think at this point we should only limit this feature to files without unknown global buffers that we don't know how to handle.
The TLDR version is that users can arbitrarily specify how many rows they want to put into each zone and it does not need to align with fragment size at all.
@HaochengLIU my understanding is that, although a zone has a fixed size, it won't go beyond a fragment right? For example if I have zone size 100, and a fragment have 250 rows, it will have 3 zones [1-100], [101-200], [201-250], it won't go beyond that and include rows in the next fragment.
Because of that, I think doing binary copy would be fine and we just need to map old zone maps to point to the new zones and it should be a faster operation comparing to recompute the stats or bloom filters.
However, one thing I do worry about is the compatibility of this feature with what @wjones127 proposed in https://github.com/lance-format/lance/discussions/4540#discussioncomment-15176342, which would put index segments in the global buffer of the lance data file. If we have that, doing binary copy would mess up with the global buffer. I think at this point we should only limit this feature to files without unknown global buffers that we don't know how to handle.
Thanks for your reply @jackye1995 :)
There are two possible ways to handle the segment index. First, during a binary copy, we can handle the external buffer specially by serializing and deserializing it. This may impact overall performance, but if the I/O of the index segment is manageable, this could be acceptable. Another one is, we can add a special check at the point where we decide whether binary copy can be used, similar to how deletion vectors are handled. For such files, we would skip binary copy and fall back to regular compaction instead.
First, during a binary copy, we can handle the external buffer specially by serializing and deserializing it.
I think the main problem is that you don't always know what are in the buffer considering forward compatibility, and even if you know, those might not be easily mergeable.
, we can add a special check at the point where we decide whether binary copy can be used, similar to how deletion vectors are handled. For such files, we would skip binary copy and fall back to regular compaction instead.
Yeah this sounds like the right idea to me
First, during a binary copy, we can handle the external buffer specially by serializing and deserializing it.
I think the main problem is that you don't always know what are in the buffer considering forward compatibility, and even if you know, those might not be easily mergeable.
, we can add a special check at the point where we decide whether binary copy can be used, similar to how deletion vectors are handled. For such files, we would skip binary copy and fall back to regular compaction instead.
Yeah this sounds like the right idea to me
I totally agree. But MAYBE in the index segment scenario, during compaction, the index segment of the file is merged with the base index, similar to a remap operation, and after merging, the index segment of the fragment should no longer be effective. In this case, it seems we wouldn’t need to merge such external buffers individually at data file level. Of course, the question then becomes how to quickly identify these types of buffers :)
yes agree. overall looks like we have a general direction on this and we are not necessarily blocked by other workstreams, I will do a pass of the PR tonight.
- @HaochengLIU my understanding is that, although a zone has a fixed size, it won't go beyond a fragment right? For example if I have zone size 100, and a fragment have 250 rows, it will have 3 zones [1-100], [101-200], [201-250], it won't go beyond that and include rows in the next fragment.
Yes
- There are two possible ways to handle the segment index. First, during a binary copy, we can handle the external buffer specially by serializing and deserializing it. This may impact overall performance, but if the I/O of the index segment is manageable, this could be acceptable. Another one is, we can add a special check at the point where we decide whether binary copy can be used, similar to how deletion vectors are handled. For such files, we would skip binary copy and fall back to regular compaction instead.
+1 There is a clear path/s to address this issue when the columns stats in ready and I will keep it in mind
- Footer Finalization: After all data is copied to a new file, the footer is written using the standard FileWriter::finish() method. This function takes the accumulated column metadata, page tables, and total row count, and writes the final manifest block, including schema, offset tables, and version information. This ensures the output file is a valid and well-formed Lance file.
One more followup: Could you point me to how the metadata is accumulated? What happens if there is a collision?
Hi @HaochengLIU hope I can answer your question correctly :)
Could you point me to how the metadata is accumulated?
Metadata accumulation happens inside rewrite_files_binary_copy: for each source file it reads the existing footer and appends the decoded page metadata into col_pages[col_idx] and column-level buffers into col_buffers. Page priorities are shifted by the running row total so the logical order stays unique across merged files. When a flush is triggered, these accumulated vectors are converted back into ColumnInfo/pbfile::ColumnMetadata and written as the new footer in flush_footer.
What happens if there is a collision
Collisions are avoided up front: can_use_binary_copy requires all input files share the same file version, have no deletions, etc. If that check fails, binary copy is skipped and the code falls back to the normal re-write path.
Potential intra-file collisions (e.g., duplicate structural header pages in v2_0) are resolved by truncating to a single header page before writing the footer.
Even if there are conflicts that cannot be resolved, it will immediately roll back to a normal compaction.
Thanks for working on this!
I’m wondering—what if we have millions of 1 KB files that need compaction, and the pages within each file are highly fragmented? In that case, would a binary copy still provide meaningful I/O optimization benefits?
As I understand it, Lance schedules I/O at the page level. If fragmented pages aren’t actually consolidated during compaction, the I/O scheduler might not fully benefit from the compaction—even if the files are merged.
The tests seem to focus on compaction performance, not on the resulting read performance.
I’m not deeply familiar with the file format internals, so I’d really appreciate it if someone could clarify my concern on this.
I wonder if we should add some kind of page size threshold for merging pages if the concern is solid.
Here are my thoughts, hoping to answer your question @majin1102 :)
Binary copy is designed to help even when pages are extreme cases of small files
What this means for lots of tiny , fragmented files, IMO
- Still get fewer files: millions of 1KB files become a much smaller set of larger files, so can cut metadata lookups, open/close overhead, and object-store request count. That alone can be a big win.
- I/O scheduling sees larger contiguous spans: even though page boundaries remain, the scheduler can coalesce reads across what used to be separate files. That reduces per-file overhead and improves throughput on object stores.
- Fragmented pages remain fragmented: we don’t currently merge pages or re-chunk them, so intra-file fragmentation isn’t reduced. If pages are very small or oddly aligned, read patterns may still be less than ideal, though can avoid the overhead of hopping across millions of files.
In addition, in extreme scenarios involving small files, it might also be possible to guide parameter optimization on the writing end, which could yield better results compared to leaving it to Compaction.
the scheduler can coalesce reads across what used to be separate files
Still get fewer files: millions of 1KB files become a much smaller set of larger files, so can cut metadata lookups, open/close overhead, and object-store request count. That alone can be a big win.
Yeah. Makes sense to me. We could coalesce page requests if they are close enough. I think read coalescing helps significantly in this case.
It's great if we have this binary copy. I just wonder the actual benifits for different read patterns. Could you add some read tests on your tested data? I think if we could both test scanning and random access compared with original compaction. It could prove things, with which we will be more confident to make binary copy as the default action for compaction. What do you think? @zhangyue19921010
In another perspective, I think this question is equivalent to: how important is the max_page_bytes parameter to compaction
Hi @westonpace Can you share some expectations on this? Wondering if we have risks if we compact files ignoring max_page_bytes
I think @majin1102 's point is a valid problem 😦 . Small pages introduce issues in full scan performance against cloud storage. In particular, to avoid penalty, we normally want the pages to be at least 1MB (and AWS recommends 8-16 MB).
Attempting to binary copy the fuller pages while compacting the smaller pages would also be difficult because the first file might be full-full-partial and the second file might be full-full-partial. If we combine the two partial parts then we would reorder data in the column.
Another challenge is that the default fragment size (1Mi rows) lends itself to 1 page per column for most data types.
I/O scheduling sees larger contiguous spans: even though page boundaries remain, the scheduler can coalesce reads across what used to be separate files. That reduces per-file overhead and improves throughput on object stores.
The pages will not be contiguous if there are multiple columns. For example, we often have a pattern like:
Col 0 Page 0
Col 1 Page 0
Col 0 Page 1
Col 1 Page 1
If we just want to read Col 0 that means we are not doing contiguous reads. That is why pages have to be large.
Hi @majin1102 and @westonpace Thanks a lot for your input. I totally agree that for some cases, such as a large number of tiny files smaller than 1MB, binary copy is not suitable for merging. Instead, regular compaction should be used. In fact, I think the write side should be optimized to avoid this situation in the first place (usually caused by misconfiguration on the write end).
On the other hand,
I wonder if, in real-world scenarios, there are actually a lot of tiny files smaller than 1MB. In my experience, small files are usually a few megabytes or tens of megabytes and would need merging (unless there was a misconfiguration when writing them). There may not be a silver bullet that can take good care for both performance and effectiveness, but I still think that binary copy can be useful when the files are small but not too small 😄
Hi everyone. I did a further query performance test, as follows:
Scan using dataset.scanner API
Point query using dataset.take API
Case 1 Direct scanning of bare files, 1000 files, 20M per file, object storage point_time=0.016s, scan_time=7.207s
Case 2 Data merging using Binary Copy, 400M per file point_time=0.019s, scan_time=7.114s
Case 3 Data merging using Common Compaction, 400M per file point_time=0.019s, scan_time=6.918s
In terms of Scan, Common Compaction has better performance compared to Binary Copy as @westonpace said. (Perhaps the performance gap is controllable in the current scenario?)
cc @westonpace @jackye1995 and @majin1102 :)
Please let me know if anything you'd like to discuss ~