如何实现精准一次性读取数据
#24 请问:我的疑问其实跟这个issue类似,我在实际运行mysql-cdc时,发现每次重启,cdc都会去把表中数据全量读取一遍,即使我已经在代码里设置了
properties.setProperty("debezium.snapshot.mode", "never"); //schema_only也是一样的
完整代码如下:
`
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
public class MySqlBinlogSourceExample { public static void main(String[] args) throws Exception { Properties properties = new Properties(); // properties.setProperty("snapshot.new.tables", "parallel"); // properties.setProperty("offset.flush.interval.ms", "0"); // properties.setProperty("debezium.snapshot.mode", "schema_only"); properties.setProperty("debezium.snapshot.mode", "never");
DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.debeziumProperties(properties)
.hostname("localhost")
.port(3306)
.databaseList("sensor_offset")
.username("root")
.password("123")
.deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.addSource(sourceFunction)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute();
}
} `
我在log里发现了几个值得注意的地方,不知是否跟我的配置有关
[debezium-engine] INFO io.debezium.connector.mysql.MySqlConnectorTask - Found no existing offset, so preparing to perform a snapshot

补充:我又添加了检查点,但是发现结果还是没有什么变化 `import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource; import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction; import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
public class MySqlBinlogSourceExample { public static void main(String[] args) throws Exception { Properties properties = new Properties(); // properties.setProperty("snapshot.new.tables", "parallel"); // properties.setProperty("offset.flush.interval.ms", "0"); properties.setProperty("debezium.snapshot.mode", "schema_only"); // properties.setProperty("debezium.snapshot.mode", "never");
DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.debeziumProperties(properties)
.hostname("localhost")
.port(3306)
.databaseList("sensor_offset")
.username("root")
.password("123")
.deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);
env.setStateBackend(new FsStateBackend("file:///Users/sun9/IdeaProjects/FlinkDemo/ck"));
env
.addSource(sourceFunction)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute();
}
} `
正确的配置是 snapshot.mode, 加debezium前缀是在 sql中用的,如果只用stream api,直接就可以配置debezium的属性不用加 debezium前缀。比如:
properties.setProperty("snapshot.mode", "schema_only");
感谢,我测试了snapshot.mode的各种参数,以下是测试效果 properties.setProperty("snapshot.mode", "never");//Encountered change event for table sensor_offset.offset_manager whose schema isn't known to this connector // properties.setProperty("snapshot.mode", "initial");每次重启都会读全量 // properties.setProperty("snapshot.mode", "initial_only");//读不到数据 // properties.setProperty("snapshot.mode", "when_needed");//跟initial效果类似 // properties.setProperty("snapshot.mode", "schema_only");//只会记录最新的更改,历史全量读不到 // properties.setProperty("snapshot.mode", "schema_only_recovery");//Could not find existing binlog information while attempting schema only recovery snapshot 我想实现的功能是首次运行去读取全量的数据,之后重启只会读取增量的数据(否则不能保证数据的精准一次性,除非在下游用主键进行约束),不知道有没有什么参数设置能达到这种效果,还请大佬指点迷津
感谢,我测试了snapshot.mode的各种参数,以下是测试效果 properties.setProperty("snapshot.mode", "never");//Encountered change event for table sensor_offset.offset_manager whose schema isn't known to this connector // properties.setProperty("snapshot.mode", "initial");每次重启都会读全量 // properties.setProperty("snapshot.mode", "initial_only");//读不到数据 // properties.setProperty("snapshot.mode", "when_needed");//跟initial效果类似 // properties.setProperty("snapshot.mode", "schema_only");//只会记录最新的更改,历史全量读不到 // properties.setProperty("snapshot.mode", "schema_only_recovery");//Could not find existing binlog information while attempting schema only recovery snapshot 我想实现的功能是首次运行去读取全量的数据,之后重启只会读取增量的数据(否则不能保证数据的精准一次性,除非在下游用主键进行约束),不知道有没有什么参数设置能达到这种效果,还请大佬指点迷津
如果你想第一次启动读取倒是数据,那么不要配置snapshot.mode, 然后下次启动时接着上次启动的位点 开始消费,你必须指定savepoint恢复任务。
确实,通过savepoint 恢复任务是可以解决。但是我现在面对的生产场景是这样的:任务启动,开始全量读取MySQL中的数据,之后与MySQL服务器网络连接断开,因为重启策略的配置,flink 任务就会不断的尝试重启,而不会失败,此时如果恢复了网络连接,任务又会重新去全量读取一遍MySQL的数据,就似乎检查点并没有起作用。难道只有设置重启策略为不重启,每次任务出错都手动恢复吗,感觉这样不是很灵活。。
PS:我在阅读源码的时候发现com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction 的run()方法中,对offset刷新禁用了,是否跟这个有关呢

因为全量拉取完mysql中的数据之前,是不会做checkpoint的,如果这个过程中出现 failover,那要解决下failover的问题,确定是MySQL服务网络连接断开导致的failover的话,那就要先解决下网络的问题。
还有我知道的大部分情况下 全量拉取阶段重启的大多数原因是 checkpoint等待超时导致的,这个问题你可以参考这篇文章的第4条经验 。
我在本地做测试的时候,因为表只有10行数据,而且日志已经打印checkpoint Completed 信息,我能确定checkpoint 是已经完成了的,并且我手动指定检查点去跑任务也是OK的(不会全量读取数据),但如果让flink自己重启任务,它是不会去用到检查点的,每次重启都会去全量读取
我在本地做测试的时候,因为表只有10行数据,而且日志已经打印checkpoint Completed 信息,我能确定checkpoint 是已经完成了的,并且我手动指定检查点去跑任务也是OK的(不会全量读取数据),但如果让flink自己重启任务,它是不会去用到检查点的,每次重启都会去全量读取
我了解 flink failover后,自动重启是会从checkpoint恢复的哦。有可能是你本地checkpoint默认为内存中,可能恢复的时候已经没有checkpoint了。你试一下在本地手动设置checkpoint为你本地系统文件,然后让flink自己failover。打断点到cdc的source fucntion那里看下是否会从checkpoint恢复。
我分别测试了 FSStateBackend , RocksDBStateBackend,检查点保留策略也设置为了‘RETAIN_ON_CANCELLATION’,发现俩者效果都是一样的,当flink failover自动重启时,log打印信息如下:
2020-11-06 08:41:38,722 INFO io.debezium.connector.mysql.MySqlConnectorTask [] - Found no existing offset, so preparing to perform a snapshot
当我手动指定检查点启动任务时,log 打印信息如下:
2020-11-06 09:21:12,289 INFO com.alibaba.ververica.cdc.debezium.internal.FlinkOffsetBackingStore [] - Flush offsets successfully, partition: {server=mysql_binlog_source}, offsets: {file=mysql-bin.000028, pos=154}
有可能是 #85 中说的 bug 有关。 可以试下最新版本是否能解决你的问题。
我分别测试了 FSStateBackend , RocksDBStateBackend,检查点保留策略也设置为了‘RETAIN_ON_CANCELLATION’,发现俩者效果都是一样的,当flink failover自动重启时,log打印信息如下:
2020-11-06 08:41:38,722 INFO io.debezium.connector.mysql.MySqlConnectorTask [] - Found no existing offset, so preparing to perform a snapshot当我手动指定检查点启动任务时,log 打印信息如下:2020-11-06 09:21:12,289 INFO com.alibaba.ververica.cdc.debezium.internal.FlinkOffsetBackingStore [] - Flush offsets successfully, partition: {server=mysql_binlog_source}, offsets: {file=mysql-bin.000028, pos=154}
请问这个问题你有解决吗?我也遇到了同样的问题,有什么的好的建议吗,谢谢
感觉可以看看offset.storage的相关设定 我在使用kafka的时候就是通过设定这个属性保存读取位置的。
Closing this issue because it was created before version 2.3.0 (2022-11-10). Please try the latest version of Flink CDC to see if the issue has been resolved. If the issue is still valid, kindly report it on Apache Jira under project Flink with component tag Flink CDC. Thank you!
感谢,我测试了snapshot.mode的各种参数,以下是测试效果 properties.setProperty("snapshot.mode", "never");//Encountered change event for table sensor_offset.offset_manager whose schema isn't known to this connector // properties.setProperty("snapshot.mode", "initial");每次重启都会读全量 // properties.setProperty("snapshot.mode", "initial_only");//读不到数据 // properties.setProperty("snapshot.mode", "when_needed");//跟initial效果类似 // properties.setProperty("snapshot.mode", "schema_only");//只会记录最新的更改,历史全量读不到 // properties.setProperty("snapshot.mode", "schema_only_recovery");//Could not find existing binlog information while attempting schema only recovery snapshot 我想实现的功能是首次运行去读取全量的数据,之后重启只会读取增量的数据(否则不能保证数据的精准一次性,除非在下游用主键进行约束),不知道有没有什么参数设置能达到这种效果,还请大佬指点迷津
如果想要用schema_only_recovery,这个报错如何解决呢?