lance icon indicating copy to clipboard operation
lance copied to clipboard

Flink CDC streaming write support

Open xx789633 opened this issue 7 months ago • 21 comments

I'm trying to integrate Flink CDC with Lance, but I found the capabilities of Lance java api are quite limited.

It seems the fragment API https://github.com/lancedb/lance/blob/main/java/core/src/main/java/com/lancedb/lance/Fragment.java#L147 can only be used for appending data.

What I'm looking for is a delta writer to write both insert records and deletes, something like this: https://github.com/apache/iceberg/blob/main/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java#L79.

@jackye1995 @majin1102 @westonpace Could you possibly give some advice? Thank you.

xx789633 avatar Jun 09 '25 13:06 xx789633

@cwang9208 what you linked is a bit different, because it is BaseEqualityDeltaWriter, which will write equality deletes. And this feature easily hits performance issues and people rarely use it for large scale production workloads. My understanding is that even Alibaba, who originally contributed this feature, have moved away to Paimon and abandoned this approach at this point, but please correct me if I am wrong here.

With what we have now in Lance rust, take a look at our merge-insert semantics: https://github.com/lancedb/lance/blob/main/rust/lance/src/dataset/write/merge_insert.rs#L248-L282. If this semantics is sufficient, let's expose that in Java, and then you can accumulate some batch and then write it with that API.

jackye1995 avatar Jun 09 '25 16:06 jackye1995

When it comes to CDC, I think the first issue we need to consider is primary key.

If the table format doesn't have a primary key model. FlinkCDC writer would not know how to locate rows with the same primary key into one writer and reader(this is also called bucket index on primary key), then the runtime performance would be a mess. I think that's the problem as @jackye1995 mentioned: Iceberg is just an append-only model with partial primary key support in Flink(resolve the writer issue, leave the reader issue). There are also other gaps to talk. Let's just skip.

Primary key model could bring a better experiance out of box(like paimon and hudi). But I think this is really a complicated and big desicion to make when it comes to primary key issue, becase the engine runtime is totally different and hard to align across engines. For now, Iceberg and Delta still don't support primary key model as well as Lance.

The ususal solution of CDC on append-only format model like Iceberg and Delta is using merge into SQL(no need to intruduce primary key), which resolve the primary key match by batch shuffle totally in writing phase. Better efficient and perfermance with a little trade off on arrival latency. This is the same semantics with the merge-insert as metioned.

https://github.com/lancedb/lance/blob/main/rust/lance/src/dataset/write/merge_insert.rs

From my understanding, this implementation is a standalone execution right? @jackye1995 We may need to check the gap when using Flink or Spark to perform merge into operation. I think standalone execution by API may be a rare scenario in Java ecosystem

Back to CDC solution, I think a practical way is to split write view and read view:

  1. FlinkCDC writes to an append-only table supporting incremental query(Paimon/Hudi/Iceberg, Lance maybe later?). This is a write view, would provide a good performace of writing with low arrival latency.
  2. A merge into job scheduled periodically. this is the read view with better read performance and a bit high arrival latency

This approach has quite a few practice in CDC scenarios. We could also wrap the procedures above into one management system/platform like apache amoro, only for reference.

majin1102 avatar Jun 09 '25 18:06 majin1102

When it comes to CDC, I think the first issue we need to consider is primary key.

For primary key, I am working on that based on https://github.com/lancedb/lance/discussions/3842, was trying to complete the full prototype before proceeding with further discussion, but maybe I can first publish the primary key format change.

Back to CDC solution, I think a practical way is to split write view and read view:

I don't think that is needed? Assuming we expose the merge-insert API in Java, you can have the writer side accumulating a batch and then use the merge-insert API to commit the batch at the checkpointing interval. That is also what the referenced Iceberg writer is doing.

In this architecture, the data is not available for reader before the checkpoint. But that is what the LSM upsert proposal is trying to achieve, I am expecting that prototype to be done this month, and we could reconcile the architecture at that time.

jackye1995 avatar Jun 09 '25 18:06 jackye1995

In this architecture, the data is not available for reader before the checkpoint. But that is what the LSM upsert proposal is trying to achieve, I am expecting that prototype to be done this month, and we could reconcile the architecture at that time.

That's really quick and can't wait to see. 👍

I don't think that is needed? Assuming we expose the merge-insert API in Java, you can have the writer side accumulating a batch and then use the merge-insert API to commit the batch at the checkpointing interval. That is also what the referenced Iceberg writer is doing.

I don't know if I understand correctly. Do you mean flink writers could write merged data distrubuted and use merge-insert API to commit the writed fragments in the committer operator? If I did't get it wrong I think how to streamingly write merged data on an append-only format is not solved yet. For batch computing it could make a read-and-write plan while streaming writers is tricky to act like that?

Or do you mean Lance could solve this issue by providing merge-insert API and the new primary key model? That makes sense to me. I think Iceberg Flink writers could do that because equility-deleting protocal and the field-id act as the primary key model for Flink snenarios. The streaming write issue would be solved on your new primary key model. Otherwise I think spitting the streaming write and batch operation could be an optional solution for append-only formats(besides, the streaming data could server as some type of raw data).

This is really my big data views and thanks for correcting me if I am wrong

majin1102 avatar Jun 09 '25 19:06 majin1102

but maybe I can first publish the primary key format change.

+1

majin1102 avatar Jun 09 '25 20:06 majin1102

Do you mean flink writers could write merged data distrubuted and use merge-insert API to commit the writed fragments in the committer operator?

no, I was more thinking about a more naive model that you directly have distributed (write + commit) in each partitioned writer. And this is possible in Lance because it can resolve conflicts at row level and auto-retry, this is more efficient than Iceberg. Based on our testing you can concurrently commit 128 merge-inserts without problem.

jackye1995 avatar Jun 09 '25 22:06 jackye1995

Do you mean flink writers could write merged data distrubuted and use merge-insert API to commit the writed fragments in the committer operator?

Hopefully, I think a conflict-free approach using the fragment-level API should achieve better performance, but at the cost of development.

no, I was more thinking about a more naive model that you directly have distributed (write + commit) in each partitioned writer. And this is possible in Lance because it can resolve conflicts at row level and auto-retry, this is more efficient than Iceberg. Based on our testing you can concurrently commit 128 merge-inserts without problem.

If we adopt this approach, is there any advice on how to minimize the conflicts between writers? Like redistributing the data to the writers based on keys in Flink's SupportsPreWriteTopology. @jackye1995

xx789633 avatar Jun 10 '25 08:06 xx789633

Besides, the merge-insert API is not enough for handling delete operations from Flink CDC.

xx789633 avatar Jun 10 '25 08:06 xx789633

If we adopt this approach, is there any advice on how to minimize the conflicts between writers? Like redistributing the data to the writers based on keys in Flink's SupportsPreWriteTopology

I think that makes sense. Today Lance does not have a partitioning concept, so the engine can just partition input stream on its own, hash distribution probably makes the most sense in that situation?

jackye1995 avatar Jun 10 '25 16:06 jackye1995

Besides, the merge-insert API is not enough for handling delete operations from Flink CDC.

yes. After https://github.com/lancedb/lance/discussions/3842 I mentioned above the API will be updated to support deletes by having an additional delete marker column to indicate deletion. So if we can proceed in parallel that would be amazing.

jackye1995 avatar Jun 10 '25 16:06 jackye1995

Great! Can't wait to to see the primary key dataset and the updated API. Let's work in parallel!

xx789633 avatar Jun 11 '25 13:06 xx789633

FYI, I've completed the integration of Lance with Fluss - an Apache Incubator project that serves as a streaming storage solution purpose-built for real-time analytics. https://github.com/asf-transfer/fluss/issues/1155

xx789633 avatar Jul 01 '25 06:07 xx789633

FYI, I've completed the integration of Lance with Fluss - an Apache Incubator project that serves as a streaming storage solution purpose-built for real-time analytics. https://github.com/asf-transfer/fluss/issues/1155

btw for everyone interested, this is great intro to Fluss (why/what/future):

https://www.alibabacloud.com/blog/fluss-redefining-streaming-storage-for-real-time-data-analytics-and-ai_602412

Fluss's home: https://fluss.apache.org/

hpvd avatar Nov 15 '25 09:11 hpvd

FYI, I've completed the integration of Lance with Fluss - an Apache Incubator project that serves as a streaming storage solution purpose-built for real-time analytics. apache/fluss#1155

Hi, Fluss has completed the log table streaming ingestion support for Lance, but the primary key table is still missing.

The typical workflow of Fluss for primary-key table tiering is to distribute writes (including inserts, updates, deletes) to multiple workers and commit on a central committer. However, it seems currently the Lance Java API is insufficient for such workflow.

Any ideas or suggestions? @jackye1995 @majin1102

xx789633 avatar Dec 17 '25 14:12 xx789633

The typical workflow of Fluss for primary-key table tiering is to distribute writes (including inserts, updates, deletes) to multiple workers and commit on a central committer. However, it seems currently the Lance Java API is insufficient for such workflow.

@jackye1995 has given the solution above, let me quote

Assuming we expose the merge-insert API in Java, you can have the writer side accumulating a batch and then use the merge-insert API to commit the batch at the checkpointing interval. That is also what the referenced Iceberg writer is doing.

For tables with a primary key, inserts should be planned as merge-inserts. This functionality is already provided in the lance-java module, both at the dataset level (standalone) and the fragment level (distributed).

If you want to use a multi-worker architecture with a dedicated committer, you should refer to the fragment-level mergeColumn and use the transaction API to commit the changes. I think this could refer to the usage of Spark merge into

As well as update and delete

majin1102 avatar Dec 19 '25 03:12 majin1102

Hi @majin1102 thanks for your reply. It seems the fragment API only accepts rowIndexes, which means ingestion engine needs to maintain the indexes for all primary keys, correct?

xx789633 avatar Dec 23 '25 12:12 xx789633

Hi @majin1102 thanks for your reply. It seems the fragment API only accepts rowIndexes, which means ingestion engine needs to maintain the indexes for all primary keys, correct?

We don’t yet have a definitive answer on whether Lance will support a native primary key model(So there is no such thing of indexes for all primary keys at least for now). We can use merge-insert to implement upsert semantics—similar to the approach used in Apache Iceberg. Lance provides an out-of-box merge-insert API in dataset. I think we could figure out if this is what you can leverage.

majin1102 avatar Dec 23 '25 12:12 majin1102

Hi @majin1102 thanks for your reply. It seems the fragment API only accepts rowIndexes, which means ingestion engine needs to maintain the indexes for all primary keys, correct?

We don’t yet have a definitive answer on whether Lance will support a native primary key model(So there is no such thing of indexes for all primary keys at least for now). We can use merge-insert to implement upsert semantics—similar to the approach used in Apache Iceberg. Lance provides an out-of-box merge-insert API in dataset. I think we could figure out if this is what you can leverage.

In append-only table, Fluss distributes changelogs to multiples workers and each worker creates append fragments and finally commits on an coordinator. I'm wondering how does the merge-insert API in dataset fit into this distributed writer setting?

xx789633 avatar Dec 23 '25 13:12 xx789633

In append-only table, Fluss distributes changelogs to multiples workers and each worker creates append fragments and finally commits on an coordinator. I'm wondering how does the merge-insert API in dataset fit into this distributed writer setting?

let me quote from @jackye1995 :

no, I was more thinking about a more naive model that you directly have distributed (write + commit) in each partitioned writer. And this is possible in Lance because it can resolve conflicts at row level and auto-retry, this is more efficient than Iceberg. Based on our testing you can concurrently commit 128 merge-inserts without problem.

I think the idea is to directly build a Dataset object and use the merge-insert API in each writer. By that we need carefully partitioning data before writer to avoid primary key conflicts. Unlike other table formats the merge-insert is quite light do to this

majin1102 avatar Dec 24 '25 03:12 majin1102

I think the idea is to directly build a Dataset object and use the merge-insert API in each writer. By that we need carefully partitioning data before writer to avoid primary key conflicts. Unlike other table formats the merge-insert is quite light do to this

Noted. Thanks for the explanation.

xx789633 avatar Dec 24 '25 04:12 xx789633

I think the idea is to directly build a Dataset object and use the merge-insert API in each writer. By that we need carefully partitioning data before writer to avoid primary key conflicts. Unlike other table formats the merge-insert is quite light do to this

Noted. Thanks for the explanation.

One more thing to note is that we currently don’t support row-level conflict resolution. If multiple writers modify the same fragment, their commits are likely to fail due to conflicts. This is probably related to https://github.com/lance-format/lance/discussions/3842

majin1102 avatar Dec 24 '25 07:12 majin1102