yangxusun9
yangxusun9
补充:我又添加了检查点,但是发现结果还是没有什么变化 `import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource; import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction; import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.Properties; public class MySqlBinlogSourceExample { public static void main(String[] args) throws Exception { Properties properties = new Properties();...
感谢,我测试了snapshot.mode的各种参数,以下是测试效果 properties.setProperty("snapshot.mode", "never");//Encountered change event for table sensor_offset.offset_manager whose schema isn't known to this connector // properties.setProperty("snapshot.mode", "initial");每次重启都会读全量 // properties.setProperty("snapshot.mode", "initial_only");//读不到数据 // properties.setProperty("snapshot.mode", "when_needed");//跟initial效果类似 // properties.setProperty("snapshot.mode", "schema_only");//只会记录最新的更改,历史全量读不到 // properties.setProperty("snapshot.mode", "schema_only_recovery");//Could...
确实,通过savepoint 恢复任务是可以解决。但是我现在面对的生产场景是这样的:任务启动,开始全量读取MySQL中的数据,之后与MySQL服务器网络连接断开,因为重启策略的配置,flink 任务就会不断的尝试重启,而不会失败,此时如果恢复了网络连接,任务又会重新去全量读取一遍MySQL的数据,就似乎检查点并没有起作用。难道只有设置重启策略为不重启,每次任务出错都手动恢复吗,感觉这样不是很灵活。。 PS:我在阅读源码的时候发现com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction 的run()方法中,对offset刷新禁用了,是否跟这个有关呢 
我在本地做测试的时候,因为表只有10行数据,而且日志已经打印checkpoint Completed 信息,我能确定checkpoint 是已经完成了的,并且我手动指定检查点去跑任务也是OK的(不会全量读取数据),但如果让flink自己重启任务,它是不会去用到检查点的,每次重启都会去全量读取
我分别测试了 FSStateBackend , RocksDBStateBackend,检查点保留策略也设置为了‘RETAIN_ON_CANCELLATION’,发现俩者效果都是一样的,当flink failover自动重启时,log打印信息如下: `2020-11-06 08:41:38,722 INFO io.debezium.connector.mysql.MySqlConnectorTask [] - Found no existing offset, so preparing to perform a snapshot ` 当我手动指定检查点启动任务时,log 打印信息如下: `2020-11-06 09:21:12,289 INFO com.alibaba.ververica.cdc.debezium.internal.FlinkOffsetBackingStore [] -...
同时也可以避免StarRocksSinkManager.validateTableStructure()的 强校验,这对聚合模型的Replace 模式下的只更新某几个列的场景很有帮助
自动解析json到starrocks的对应列里面
to solve this problem , i need to run build-thrifh.sh in starrocks-thrift-sdk in frist?