[mysql] when i add new table to pull data, but SplitReader is silent when pull after a few times
Environment :
- Flink version : 1.13.5
- Flink CDC version: 2.2.0
- Database and version: mysql 5.7.28-log
When I added a new table (about 1.5 million data) to an existing task and then pulled it, but after pulling a dozen or more times, the task did not respond. After my analysis, the code seems to be Stopped in a loop, which has been waiting for a Binlog end event (SnapshotSplitReader, line 234), but no new events have been pulled here.
This is the screenshot of my Debug:

As you can see in the picture, I pulled a total of 8441 (1 watermark event + 8440 data event) in this batch. According to the code logic, this loop needs a binlog water level end event to exit. However, I observed Haven't waited for this watermark event for a long time, the program keeps looping here, causing it to seem like it's stopped
This is last log:
You can see that it stops when 44 chunks are pulled, but there are a total of 200+ chunks
So can you help me see what are the possibilities, never wait for the end event?
I found that in the binlog merge stage of the full snapshot, if the binlog reading is abnormal (cannot be deserialized, network abnormality), the binlog reading will stop at this time, which means that the Binlog End event will never be obtained.
However, I just tested, if it is a new task, single-table pull, no problem occurs
com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1656487905000, eventType=UPDATE_ROWS, serverId=3318594750, headerLength=19, dataLength=467, nextPosition=24906517, flags=0}
ok,,,, It seems to work (^▽^)
I added a three-second sleep for MySqlSourceEnumerator to deliver shards -> MySqlSourceEnumerator#handleSplitRequest
@Override
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
if (!context.registeredReaders().containsKey(subtaskId)) {
// reader failed between sending the request and now. skip this request.
return;
}
readersAwaitingSplit.add(subtaskId);
// here ~
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
assignSplits();
}
Because I suspect that the fetch model seems to be single-threaded, but there may be a bug somewhere, causing multiple fetch runs, causing BinaryLogClient to be forcibly disconnected when pulling incremental logs, and will not wait for a binlog end event. Causes a loop, as I said in my reply above
I'm still looking at why this happens. . . o(╥﹏╥)o
When the daytime business is relatively high, 3s seems not enough, need sleep 5s
When I rewrote the class, the problem doesn't seem to appear, I tried 3 times with success. I will test it again tomorrow. At this point, I suspect that some resources are not released due to the reuse of objects.
com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlTaskContextImpl :
public class MySqlTaskContextImpl extends MySqlTaskContext {
// private final BinaryLogClient reusedBinaryLogClient;
public MySqlTaskContextImpl(
MySqlConnectorConfig config,
MySqlDatabaseSchema schema,
BinaryLogClient reusedBinaryLogClient) {
super(config, schema);
// this.reusedBinaryLogClient = reusedBinaryLogClient;
}
/**
@Override
public BinaryLogClient getBinaryLogClient() {
return reusedBinaryLogClient;
}
*/
}
Boss, have you solved this problem, I have the same problem
When I rewrote the class, the problem doesn't seem to appear, I tried 3 times with success. I will test it again tomorrow. At this point, I suspect that some resources are not released due to the reuse of objects.
com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlTaskContextImpl :
public class MySqlTaskContextImpl extends MySqlTaskContext { // private final BinaryLogClient reusedBinaryLogClient; public MySqlTaskContextImpl( MySqlConnectorConfig config, MySqlDatabaseSchema schema, BinaryLogClient reusedBinaryLogClient) { super(config, schema); // this.reusedBinaryLogClient = reusedBinaryLogClient; } /** @Override public BinaryLogClient getBinaryLogClient() { return reusedBinaryLogClient; } */ }
@chrofram here
Thanks a lot, this solved my problem!
I'm having the same problem, I've added the MySqlTaskContextImpl class to my project, overwriting the original class of mysql cdc, but still can't read the data.
Sometimes, it seems to be a problem with the class loader. It may still load the classes in the jar file first. Maybe you can rewrite the overridden class into the official cdc jar. My problem is solved like this
I found the cause of the problem, which is that the version of slf4j is incorrect, and I can read the data by changing it to 2.0.1. Here's how to locate the problem, I rewrote "com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader" in the project to add a try catch.
if (snapshotResult.isCompletedOrSkipped()) {
try {
final MySqlBinlogSplitReadTask backfillBinlogReadTask =
createBackfillBinlogReadTask(backfillBinlogSplit);
backfillBinlogReadTask.execute(
new SnapshotBinlogSplitChangeEventSourceContextImpl());
} catch (Throwable e) {
e.printStackTrace();
}
}
An error was reported when creating MySqlBinlogSplitReadTask and the org.slf4j.event.Level class could not be found .
Closing this issue because it was created before version 2.3.0 (2022-11-10). Please try the latest version of Flink CDC to see if the issue has been resolved. If the issue is still valid, kindly report it on Apache Jira under project Flink with component tag Flink CDC. Thank you!