Flink CDC streaming write support
I'm trying to integrate Flink CDC with Lance, but I found the capabilities of Lance java api are quite limited.
It seems the fragment API https://github.com/lancedb/lance/blob/main/java/core/src/main/java/com/lancedb/lance/Fragment.java#L147 can only be used for appending data.
What I'm looking for is a delta writer to write both insert records and deletes, something like this: https://github.com/apache/iceberg/blob/main/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java#L79.
@jackye1995 @majin1102 @westonpace Could you possibly give some advice? Thank you.
@cwang9208 what you linked is a bit different, because it is BaseEqualityDeltaWriter, which will write equality deletes. And this feature easily hits performance issues and people rarely use it for large scale production workloads. My understanding is that even Alibaba, who originally contributed this feature, have moved away to Paimon and abandoned this approach at this point, but please correct me if I am wrong here.
With what we have now in Lance rust, take a look at our merge-insert semantics: https://github.com/lancedb/lance/blob/main/rust/lance/src/dataset/write/merge_insert.rs#L248-L282. If this semantics is sufficient, let's expose that in Java, and then you can accumulate some batch and then write it with that API.
When it comes to CDC, I think the first issue we need to consider is primary key.
If the table format doesn't have a primary key model. FlinkCDC writer would not know how to locate rows with the same primary key into one writer and reader(this is also called bucket index on primary key), then the runtime performance would be a mess. I think that's the problem as @jackye1995 mentioned: Iceberg is just an append-only model with partial primary key support in Flink(resolve the writer issue, leave the reader issue). There are also other gaps to talk. Let's just skip.
Primary key model could bring a better experiance out of box(like paimon and hudi). But I think this is really a complicated and big desicion to make when it comes to primary key issue, becase the engine runtime is totally different and hard to align across engines. For now, Iceberg and Delta still don't support primary key model as well as Lance.
The ususal solution of CDC on append-only format model like Iceberg and Delta is using merge into SQL(no need to intruduce primary key), which resolve the primary key match by batch shuffle totally in writing phase. Better efficient and perfermance with a little trade off on arrival latency. This is the same semantics with the merge-insert as metioned.
https://github.com/lancedb/lance/blob/main/rust/lance/src/dataset/write/merge_insert.rs
From my understanding, this implementation is a standalone execution right? @jackye1995 We may need to check the gap when using Flink or Spark to perform merge into operation. I think standalone execution by API may be a rare scenario in Java ecosystem
Back to CDC solution, I think a practical way is to split write view and read view:
- FlinkCDC writes to an append-only table supporting incremental query(Paimon/Hudi/Iceberg, Lance maybe later?). This is a write view, would provide a good performace of writing with low arrival latency.
- A merge into job scheduled periodically. this is the read view with better read performance and a bit high arrival latency
This approach has quite a few practice in CDC scenarios. We could also wrap the procedures above into one management system/platform like apache amoro, only for reference.
When it comes to CDC, I think the first issue we need to consider is primary key.
For primary key, I am working on that based on https://github.com/lancedb/lance/discussions/3842, was trying to complete the full prototype before proceeding with further discussion, but maybe I can first publish the primary key format change.
Back to CDC solution, I think a practical way is to split write view and read view:
I don't think that is needed? Assuming we expose the merge-insert API in Java, you can have the writer side accumulating a batch and then use the merge-insert API to commit the batch at the checkpointing interval. That is also what the referenced Iceberg writer is doing.
In this architecture, the data is not available for reader before the checkpoint. But that is what the LSM upsert proposal is trying to achieve, I am expecting that prototype to be done this month, and we could reconcile the architecture at that time.
In this architecture, the data is not available for reader before the checkpoint. But that is what the LSM upsert proposal is trying to achieve, I am expecting that prototype to be done this month, and we could reconcile the architecture at that time.
That's really quick and can't wait to see. 👍
I don't think that is needed? Assuming we expose the merge-insert API in Java, you can have the writer side accumulating a batch and then use the merge-insert API to commit the batch at the checkpointing interval. That is also what the referenced Iceberg writer is doing.
I don't know if I understand correctly. Do you mean flink writers could write merged data distrubuted and use merge-insert API to commit the writed fragments in the committer operator? If I did't get it wrong I think how to streamingly write merged data on an append-only format is not solved yet. For batch computing it could make a read-and-write plan while streaming writers is tricky to act like that?
Or do you mean Lance could solve this issue by providing merge-insert API and the new primary key model? That makes sense to me. I think Iceberg Flink writers could do that because equility-deleting protocal and the field-id act as the primary key model for Flink snenarios. The streaming write issue would be solved on your new primary key model. Otherwise I think spitting the streaming write and batch operation could be an optional solution for append-only formats(besides, the streaming data could server as some type of raw data).
This is really my big data views and thanks for correcting me if I am wrong
but maybe I can first publish the primary key format change.
+1
Do you mean flink writers could write merged data distrubuted and use merge-insert API to commit the writed fragments in the committer operator?
no, I was more thinking about a more naive model that you directly have distributed (write + commit) in each partitioned writer. And this is possible in Lance because it can resolve conflicts at row level and auto-retry, this is more efficient than Iceberg. Based on our testing you can concurrently commit 128 merge-inserts without problem.
Do you mean flink writers could write merged data distrubuted and use merge-insert API to commit the writed fragments in the committer operator?
Hopefully, I think a conflict-free approach using the fragment-level API should achieve better performance, but at the cost of development.
no, I was more thinking about a more naive model that you directly have distributed (write + commit) in each partitioned writer. And this is possible in Lance because it can resolve conflicts at row level and auto-retry, this is more efficient than Iceberg. Based on our testing you can concurrently commit 128 merge-inserts without problem.
If we adopt this approach, is there any advice on how to minimize the conflicts between writers? Like redistributing the data to the writers based on keys in Flink's SupportsPreWriteTopology. @jackye1995
Besides, the merge-insert API is not enough for handling delete operations from Flink CDC.
If we adopt this approach, is there any advice on how to minimize the conflicts between writers? Like redistributing the data to the writers based on keys in Flink's SupportsPreWriteTopology
I think that makes sense. Today Lance does not have a partitioning concept, so the engine can just partition input stream on its own, hash distribution probably makes the most sense in that situation?
Besides, the merge-insert API is not enough for handling delete operations from Flink CDC.
yes. After https://github.com/lancedb/lance/discussions/3842 I mentioned above the API will be updated to support deletes by having an additional delete marker column to indicate deletion. So if we can proceed in parallel that would be amazing.
Great! Can't wait to to see the primary key dataset and the updated API. Let's work in parallel!
FYI, I've completed the integration of Lance with Fluss - an Apache Incubator project that serves as a streaming storage solution purpose-built for real-time analytics. https://github.com/asf-transfer/fluss/issues/1155
FYI, I've completed the integration of Lance with Fluss - an Apache Incubator project that serves as a streaming storage solution purpose-built for real-time analytics. https://github.com/asf-transfer/fluss/issues/1155
btw for everyone interested, this is great intro to Fluss (why/what/future):
https://www.alibabacloud.com/blog/fluss-redefining-streaming-storage-for-real-time-data-analytics-and-ai_602412
Fluss's home: https://fluss.apache.org/
FYI, I've completed the integration of Lance with Fluss - an Apache Incubator project that serves as a streaming storage solution purpose-built for real-time analytics. apache/fluss#1155
Hi, Fluss has completed the log table streaming ingestion support for Lance, but the primary key table is still missing.
The typical workflow of Fluss for primary-key table tiering is to distribute writes (including inserts, updates, deletes) to multiple workers and commit on a central committer. However, it seems currently the Lance Java API is insufficient for such workflow.
Any ideas or suggestions? @jackye1995 @majin1102
The typical workflow of Fluss for primary-key table tiering is to distribute writes (including inserts, updates, deletes) to multiple workers and commit on a central committer. However, it seems currently the Lance Java API is insufficient for such workflow.
@jackye1995 has given the solution above, let me quote
Assuming we expose the merge-insert API in Java, you can have the writer side accumulating a batch and then use the merge-insert API to commit the batch at the checkpointing interval. That is also what the referenced Iceberg writer is doing.
For tables with a primary key, inserts should be planned as merge-inserts. This functionality is already provided in the lance-java module, both at the dataset level (standalone) and the fragment level (distributed).
If you want to use a multi-worker architecture with a dedicated committer, you should refer to the fragment-level mergeColumn and use the transaction API to commit the changes. I think this could refer to the usage of Spark merge into
As well as update and delete
Hi @majin1102 thanks for your reply. It seems the fragment API only accepts rowIndexes, which means ingestion engine needs to maintain the indexes for all primary keys, correct?
Hi @majin1102 thanks for your reply. It seems the fragment API only accepts rowIndexes, which means ingestion engine needs to maintain the indexes for all primary keys, correct?
We don’t yet have a definitive answer on whether Lance will support a native primary key model(So there is no such thing of indexes for all primary keys at least for now). We can use merge-insert to implement upsert semantics—similar to the approach used in Apache Iceberg. Lance provides an out-of-box merge-insert API in dataset. I think we could figure out if this is what you can leverage.
Hi @majin1102 thanks for your reply. It seems the fragment API only accepts rowIndexes, which means ingestion engine needs to maintain the indexes for all primary keys, correct?
We don’t yet have a definitive answer on whether Lance will support a native primary key model(So there is no such thing of indexes for all primary keys at least for now). We can use merge-insert to implement upsert semantics—similar to the approach used in Apache Iceberg. Lance provides an out-of-box merge-insert API in dataset. I think we could figure out if this is what you can leverage.
In append-only table, Fluss distributes changelogs to multiples workers and each worker creates append fragments and finally commits on an coordinator. I'm wondering how does the merge-insert API in dataset fit into this distributed writer setting?
In append-only table, Fluss distributes changelogs to multiples workers and each worker creates append fragments and finally commits on an coordinator. I'm wondering how does the merge-insert API in dataset fit into this distributed writer setting?
let me quote from @jackye1995 :
no, I was more thinking about a more naive model that you directly have distributed (write + commit) in each partitioned writer. And this is possible in Lance because it can resolve conflicts at row level and auto-retry, this is more efficient than Iceberg. Based on our testing you can concurrently commit 128 merge-inserts without problem.
I think the idea is to directly build a Dataset object and use the merge-insert API in each writer. By that we need carefully partitioning data before writer to avoid primary key conflicts. Unlike other table formats the merge-insert is quite light do to this
I think the idea is to directly build a Dataset object and use the merge-insert API in each writer. By that we need carefully partitioning data before writer to avoid primary key conflicts. Unlike other table formats the merge-insert is quite light do to this
Noted. Thanks for the explanation.
I think the idea is to directly build a Dataset object and use the merge-insert API in each writer. By that we need carefully partitioning data before writer to avoid primary key conflicts. Unlike other table formats the merge-insert is quite light do to this
Noted. Thanks for the explanation.
One more thing to note is that we currently don’t support row-level conflict resolution. If multiple writers modify the same fragment, their commits are likely to fail due to conflicts. This is probably related to https://github.com/lance-format/lance/discussions/3842