[tidb-cdc] data loss when switch from snapshot read to change event read
Describe the bug(Please use English) data loss when switch from snapshot read to change event read
Environment :
- Flink version : flink 1.14
- Flink CDC version: 2.2.0
- Database and version: tidb 5.1
To Reproduce Steps to reproduce the behavior:
- prepare a tidb table : here: max id is 100040
mysql> select max(id) from tidb_cdc_test;
+---------+
| max(id) |
+---------+
| 100040 |
+---------+
1 row in set (0.00 sec)
total count is 50000
mysql> select count(1) from tidb_cdc_test;
+----------+
| count(1) |
+----------+
| 50000 |
+----------+
1 row in set (0.01 sec)
- The test code : flink sql
CREATE TABLE source_table (
id INT,
createTime timestamp,
updateTime timestamp,
PRIMARY KEY(id) NOT ENFORCED
)WITH (
'connector' = 'tidb-cdc',
'pd-addresses' = '****',
'database-name' = '****',
'table-name' = 'tidb_cdc_test',
'scan.startup.mode' = 'initial'
);
CREATE TABLE sink_table (
id INT,
createTime timestamp,
updateTime timestamp,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'print');
insert into sink_table
select * from source_table;
- The error :
(1) run flink job, the job contains two process: "snapshot read" 、 "change events"
(2) when "snapshot read" start, before job switch to "change events"
here,"snapshot read" had finished, and resolvedTs = 433309973258698753 (which is set in readSnapshotEvents)
(3) before switch to "change events". insert some data into TIDB table
mysql> insert into tidb_cdc_test values
('100060' ,'2022-05-18 17:10:14','2022-05-18 20:01:14'),
('100061' ,'2022-05-18 17:20:14','2022-05-18 20:20:14'),
('100062' ,'2022-05-18 17:21:14','2022-05-18 20:21:14'),
('100063' ,'2022-05-18 17:25:14','2022-05-18 21:00:14'),
('100064' ,'2022-05-18 17:26:14','2022-05-18 21:05:14'),
('100065' ,'2022-05-18 17:28:14','2022-05-18 21:08:14'),
('100066' ,'2022-05-18 17:29:14','2022-05-18 21:11:14');
(4) then switch to "change events", and resolvedTs = 433309978881949699

beacuse resolvedTs update, some changelog during "snapshot read" , and before "change events" start, would be lost
watch the print results :
max id is 100040, do not include datas from id 100060~100066

Additional Description to avoid data loss when switch from snapshot read to change event read.when 'scan.startup.mode' = 'initial'. before job switchs to "change events" ,resolvedTs should not be update,and use the value set in readSnapshotEvents here is my solution
I had take a fix in https://github.com/ververica/flink-cdc-connectors/pull/1207 With this fix , with test, no data lost . Tidb table
mysql> select count(1) from tidb_cdc_test;
+----------+
| count(1) |
+----------+
| 50008 |
+----------+
1 row in set (0.02 sec)
mysql> select max(id) from tidb_cdc_test;
+---------+
| max(id) |
+---------+
| 100070 |
+---------+
1 row in set (0.01 sec)
before switch to "change events"

insert some data into TIDB
insert into tidb_cdc_test values
('100080' ,'2022-05-18 18:12:14','2022-05-18 22:21:14'),
('100081' ,'2022-05-18 18:20:14','2022-05-18 22:25:14'),
('100082' ,'2022-05-18 18:21:14','2022-05-18 22:26:14'),
('100083' ,'2022-05-18 18:25:14','2022-05-18 22:27:14'),
('100084' ,'2022-05-18 18:26:14','2022-05-18 22:30:14'),
('100085' ,'2022-05-18 18:28:14','2022-05-18 22:32:14'),
('100086' ,'2022-05-18 18:29:14','2022-05-18 22:33:14');
then switch to "change events"
watch the print results : max id is 100086, include datas id 100080~100086

@GOODBOY008 @leonardBang could you please have a look ,thanks so much~
Hi @xieyi888 , I don't think it's a bug,it's man made trouble with pausing program anywhere using debug tools.
Hi @GOODBOY008 Actually , It can repeat without debug. The bug can be repeated when there are change log in tidb during readSnapshotEvents.
Here is the process: Environment : Flink version : flink 1.14 Flink CDC version: 2.3-SNAPSHOT(build master branch) Database and version: tidb 5.1
- prepare Tidb table
mysql> desc test_tidb_cdc;
+------------+----------+------+------+-------------------+-----------------------------------------------+
| Field | Type | Null | Key | Default | Extra |
+------------+----------+------+------+-------------------+-----------------------------------------------+
| id | int(11) | NO | PRI | NULL | |
| createTime | datetime | NO | MUL | CURRENT_TIMESTAMP | DEFAULT_GENERATED on update CURRENT_TIMESTAMP |
| updateTime | datetime | NO | MUL | CURRENT_TIMESTAMP | DEFAULT_GENERATED on update CURRENT_TIMESTAMP |
+------------+----------+------+------+-------------------+-----------------------------------------------+
3 rows in set (0.00 sec)
before flink tidb-cdc start,the newest row is '2022-09-07 15:31:07 '
mysql> select id,createTime,updateTime from test_tidb_cdc order by updateTime desc limit 2;
+--------+---------------------+---------------------+
| id | createTime | updateTime |
+--------+---------------------+---------------------+
| 970797 | 2022-09-06 18:53:44 | 2022-09-07 15:31:07 |
| 970805 | 2022-09-06 19:05:24 | 2022-09-07 15:31:05 |
+--------+---------------------+---------------------+
- start flink job tidb-cdc flink sql
CREATE TABLE source_table (
id INT,
createTime timestamp,
updateTime timestamp,
PRIMARY KEY(id) NOT ENFORCED
)WITH (
'connector' = 'tidb-cdc',
'pd-addresses' = '****',
'database-name' = '****',
'table-name' = 'test_tidb_cdc',
'scan.startup.mode' = 'initial'
);
CREATE TABLE sink_table_print (
id INT,
createTime timestamp,
updateTime timestamp,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'print'
);
insert into sink_table_print select * from source_table;
- during flink job running,I insert some data to TIDB(6 rows)
mysql> insert into test_tidb_cdc (id) values ('981000');
Query OK, 1 row affected (0.00 sec)
mysql> insert into test_tidb_cdc (id) values ('981001');
Query OK, 1 row affected (0.00 sec)
mysql> insert into test_tidb_cdc (id) values ('981002');
Query OK, 1 row affected (0.01 sec)
mysql> insert into test_tidb_cdc (id) values ('981003');
Query OK, 1 row affected (0.00 sec)
mysql> insert into test_tidb_cdc (id) values ('981004');
Query OK, 1 row affected (0.01 sec)
mysql> insert into test_tidb_cdc (id) values ('981005');
Query OK, 1 row affected (0.01 sec)
- Watch the log of TM 2022-09-07 15:50:00,344 start read snapshot events
2022-09-07 15:50:33,693 start read change events

- after running for a while, compare data in tidb and TM print I insert 6 rows in tidb ,however here are only 4 rows in TM print. In TM print,it loss data id: 981002、981003. because these data insert time between '2022-09-07 15:50:00' and '2022-09-07 15:50:33' tidb table
mysql> select id,createTime,updateTime from test_tidb_cdc order by updateTime desc limit 7;
+--------+---------------------+---------------------+
| id | createTime | updateTime |
+--------+---------------------+---------------------+
| 981005 | 2022-09-07 15:50:34 | 2022-09-07 15:50:34 |
| 981004 | 2022-09-07 15:50:34 | 2022-09-07 15:50:34 |
| 981003 | 2022-09-07 15:50:21 | 2022-09-07 15:50:21 |
| 981002 | 2022-09-07 15:50:20 | 2022-09-07 15:50:20 |
| 981001 | 2022-09-07 15:49:58 | 2022-09-07 15:49:58 |
| 981000 | 2022-09-07 15:49:57 | 2022-09-07 15:49:57 |
| 970797 | 2022-09-06 18:53:44 | 2022-09-07 15:31:07 |
+--------+---------------------+---------------------+
7 rows in set (0.01 sec)```
TM print

TM print in step 5

Closing this issue because it was created before version 2.3.0 (2022-11-10). Please try the latest version of Flink CDC to see if the issue has been resolved. If the issue is still valid, kindly report it on Apache Jira under project Flink with component tag Flink CDC. Thank you!