flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

[mysql] when i add new table to pull data, but SplitReader is silent when pull after a few times

Open qq443672581 opened this issue 3 years ago • 9 comments

Environment :

  • Flink version : 1.13.5
  • Flink CDC version: 2.2.0
  • Database and version: mysql 5.7.28-log

When I added a new table (about 1.5 million data) to an existing task and then pulled it, but after pulling a dozen or more times, the task did not respond. After my analysis, the code seems to be Stopped in a loop, which has been waiting for a Binlog end event (SnapshotSplitReader, line 234), but no new events have been pulled here.

This is the screenshot of my Debug: aaa

As you can see in the picture, I pulled a total of 8441 (1 watermark event + 8440 data event) in this batch. According to the code logic, this loop needs a binlog water level end event to exit. However, I observed Haven't waited for this watermark event for a long time, the program keeps looping here, causing it to seem like it's stopped

This is last log: uTools_1656405813046 You can see that it stops when 44 chunks are pulled, but there are a total of 200+ chunks

So can you help me see what are the possibilities, never wait for the end event?

qq443672581 avatar Jun 28 '22 08:06 qq443672581

I found that in the binlog merge stage of the full snapshot, if the binlog reading is abnormal (cannot be deserialized, network abnormality), the binlog reading will stop at this time, which means that the Binlog End event will never be obtained.

qq443672581 avatar Jun 29 '22 02:06 qq443672581

However, I just tested, if it is a new task, single-table pull, no problem occurs

qq443672581 avatar Jun 29 '22 03:06 qq443672581

com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1656487905000, eventType=UPDATE_ROWS, serverId=3318594750, headerLength=19, dataLength=467, nextPosition=24906517, flags=0}

qq443672581 avatar Jun 29 '22 07:06 qq443672581

ok,,,, It seems to work (^▽^)

I added a three-second sleep for MySqlSourceEnumerator to deliver shards -> MySqlSourceEnumerator#handleSplitRequest


    @Override
    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
        if (!context.registeredReaders().containsKey(subtaskId)) {
            // reader failed between sending the request and now. skip this request.
            return;
        }
        readersAwaitingSplit.add(subtaskId);
        
        // here ~
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        assignSplits();
    }

Because I suspect that the fetch model seems to be single-threaded, but there may be a bug somewhere, causing multiple fetch runs, causing BinaryLogClient to be forcibly disconnected when pulling incremental logs, and will not wait for a binlog end event. Causes a loop, as I said in my reply above

I'm still looking at why this happens. . . o(╥﹏╥)o

qq443672581 avatar Jun 30 '22 15:06 qq443672581

When the daytime business is relatively high, 3s seems not enough, need sleep 5s

qq443672581 avatar Jul 01 '22 03:07 qq443672581

When I rewrote the class, the problem doesn't seem to appear, I tried 3 times with success. I will test it again tomorrow. At this point, I suspect that some resources are not released due to the reuse of objects.

com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlTaskContextImpl :

public class MySqlTaskContextImpl extends MySqlTaskContext {

   // private final BinaryLogClient reusedBinaryLogClient;

    public MySqlTaskContextImpl(
            MySqlConnectorConfig config,
            MySqlDatabaseSchema schema,
            BinaryLogClient reusedBinaryLogClient) {
        super(config, schema);
        // this.reusedBinaryLogClient = reusedBinaryLogClient;
    }

/** 
    @Override
    public BinaryLogClient getBinaryLogClient() {
        return reusedBinaryLogClient;
    }
*/

}

qq443672581 avatar Jul 04 '22 12:07 qq443672581

Boss, have you solved this problem, I have the same problem

chrofram avatar Sep 11 '22 08:09 chrofram

When I rewrote the class, the problem doesn't seem to appear, I tried 3 times with success. I will test it again tomorrow. At this point, I suspect that some resources are not released due to the reuse of objects.

com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlTaskContextImpl :

public class MySqlTaskContextImpl extends MySqlTaskContext {

   // private final BinaryLogClient reusedBinaryLogClient;

    public MySqlTaskContextImpl(
            MySqlConnectorConfig config,
            MySqlDatabaseSchema schema,
            BinaryLogClient reusedBinaryLogClient) {
        super(config, schema);
        // this.reusedBinaryLogClient = reusedBinaryLogClient;
    }

/** 
    @Override
    public BinaryLogClient getBinaryLogClient() {
        return reusedBinaryLogClient;
    }
*/

}

@chrofram here

qq443672581 avatar Sep 11 '22 11:09 qq443672581

Thanks a lot, this solved my problem!

chrofram avatar Sep 15 '22 03:09 chrofram

I'm having the same problem, I've added the MySqlTaskContextImpl class to my project, overwriting the original class of mysql cdc, but still can't read the data.

jxwnhj0717 avatar Dec 05 '22 09:12 jxwnhj0717

Sometimes, it seems to be a problem with the class loader. It may still load the classes in the jar file first. Maybe you can rewrite the overridden class into the official cdc jar. My problem is solved like this

qq443672581 avatar Dec 05 '22 11:12 qq443672581

I found the cause of the problem, which is that the version of slf4j is incorrect, and I can read the data by changing it to 2.0.1. Here's how to locate the problem, I rewrote "com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader" in the project to add a try catch.

if (snapshotResult.isCompletedOrSkipped()) {
      try {
          final MySqlBinlogSplitReadTask backfillBinlogReadTask =
                  createBackfillBinlogReadTask(backfillBinlogSplit);
          backfillBinlogReadTask.execute(
                  new SnapshotBinlogSplitChangeEventSourceContextImpl());
      } catch (Throwable e) {
          e.printStackTrace();
      }
  } 

An error was reported when creating MySqlBinlogSplitReadTask and the org.slf4j.event.Level class could not be found .

jxwnhj0717 avatar Dec 06 '22 01:12 jxwnhj0717

Closing this issue because it was created before version 2.3.0 (2022-11-10). Please try the latest version of Flink CDC to see if the issue has been resolved. If the issue is still valid, kindly report it on Apache Jira under project Flink with component tag Flink CDC. Thank you!

PatrickRen avatar Feb 28 '24 15:02 PatrickRen