[Bug] [mysqlcdc] 运行mysqlcdc脚本报错
Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
flink 版本是 1.12.7,chunjun 是 chunjun-1.12_release 分支最新代码
Standalone 模式下运行 mysqlcdc 脚本报错 Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath 具体异常信息如下
Unable to create a source for reading table 'default_catalog.default_database.test1'.
Table options are:
'connector'='mysql-cdc'
'database-name'='dgy'
'hostname'='192.168.1.121'
'password'='******'
'port'='3306'
'table-name'='test1'
'username'='root'
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:177)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:254)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:100)
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertJoin(SqlToRelConverter.java:2864)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2162)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2169)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:902)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:871)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:564)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:248)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)
at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:50)
at com.dtstack.chunjun.sql.parser.InsertStmtParser.execStmt(InsertStmtParser.java:47)
at com.dtstack.chunjun.sql.parser.AbstractStmtParser.handleStmt(AbstractStmtParser.java:50)
at com.dtstack.chunjun.sql.parser.AbstractStmtParser.handleStmt(AbstractStmtParser.java:52)
at com.dtstack.chunjun.sql.parser.AbstractStmtParser.handleStmt(AbstractStmtParser.java:52)
at com.dtstack.chunjun.sql.parser.SqlParser.lambda$parseSql$1(SqlParser.java:69)
... 24 more
Caused by: java.lang.RuntimeException: Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
Available factory identifiers are:
datagen
filesystem
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:303)
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:420)
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:173)
... 58 more
What you expected to happen
脚本正常运行
How to reproduce
CREATE TABLE test1(
id INT,
name varchar,
p_id int,
updateTime timestamp,
primary key(id) not enforced
)WITH(
'connector'='mysql-cdc',
'hostname'='192.168.1.121',
'port'='3306',
'database-name'='dgy',
'table-name'='test1',
'username'='root',
'password'='Datxx2023'
);
CREATE TABLE test2(
id int,
type varchar,
updateTime timestamp,
primary key(id) not enforced
)WITH(
'connector'='mysql-cdc',
'hostname'='192.168.1.121',
'port'='3306',
'database-name'='dgy',
'table-name'='test2',
'username'='root',
'password'='Dxx0x'
);
CREATE TABLE testall(
id int,
name varchar,
type varchar,
updateTime1 timestamp,
updateTime2 timestamp,
primary key(id) not enforced
) WITH (
'connector' = 'mysql-x',
'url' = 'jdbc:mysql://192.168.1.121:3306/dgy',
'table-name' = 'testall',
'username' = 'root',
'password' = 'Dxxxx023',
'sink.buffer-flush.max-rows' = '1024', -- 批量写数据条数,默认:1024
'sink.buffer-flush.interval' = '3000', -- 批量写时间间隔,默认:3000毫秒
'sink.all-replace' = 'true', -- 解释如下(其他rdb数据库类似):默认:false。定义了PRIMARY KEY才有效,否则是追加语句
-- sink.all-replace = 'true' 生成如:INSERT INTO `result3`(`mid`, `mbb`, `sid`, `sbb`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `mid`=VALUES(`mid`), `mbb`=VALUES(`mbb`), `sid`=VALUES(`sid`), `sbb`=VALUES(`sbb`) 。会将所有的数据都替换。
-- sink.all-replace = 'false' 生成如:INSERT INTO `result3`(`mid`, `mbb`, `sid`, `sbb`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `mid`=IFNULL(VALUES(`mid`),`mid`), `mbb`=IFNULL(VALUES(`mbb`),`mbb`), `sid`=IFNULL(VALUES(`sid`),`sid`), `sbb`=IFNULL(VALUES(`sbb`),`sbb`) 。如果新值为null,数据库中的旧值不为null,则不会覆盖。
'sink.parallelism' = '1' -- 写入结果的并行度,默认:null
);
CREATE TABLE testall_es
(
id int,
name varchar,
type varchar,
updateTime1 timestamp,
updateTime2 timestamp,
primary key(id) not enforced
)
WITH (
'connector' = 'elasticsearch7-x',
'hosts' = '192.168.1.227:9200',
'index' = 'testall',
'client.connect-timeout' = '10000'
);
insert
into
testall
select
id,
name,
type,
updateTime1,
updateTime2
from
( SELECT
ck.id,
ck.name,
py.type,
ck.updateTime as updateTime1,
py.updateTime as updateTime2
from
test1 ck
left join
test2 py
on ck.p_id = py.id ) tt;
insert
into
testall_es
select
id,
name,
type,
updateTime1,
updateTime2
from
( SELECT
ck.id,
ck.name,
py.type,
ck.updateTime as updateTime1,
py.updateTime as updateTime2
from
test1 ck
left join
test2 py
on ck.p_id = py.id ) tt;
./flink-1.12.7111/bin/start-cluster.sh
sh bin/chunjun-standalone.sh -job test.sql
Anything else
No response
Version
1.12_release
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
flink classpath 如下
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/chunjun-core.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/binlog/chunjun-connector-binlog.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/cassandra/chunjun-connector-cassandra.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/clickhouse/chunjun-connector-clickhouse.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/db2/chunjun-connector-db2.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/dm/chunjun-connector-dm.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/doris/chunjun-connector-doris.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/elasticsearch7/chunjun-connector-elasticsearch7.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/emqx/chunjun-connector-emqx.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/file/chunjun-connector-file.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/filesystem/chunjun-connector-filesystem.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/ftp/chunjun-connector-ftp.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/greenplum/chunjun-connector-greenplum.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/hbase14/chunjun-connector-hbase-1.4.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/hbase2/chunjun-connector-hbase2.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/hdfs/chunjun-connector-hdfs.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/hive3/chunjun-connector-hive3.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/hive/chunjun-connector-hiv e.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/http/chunjun-connector-http.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/iceberg/chunjun-connector-iceberg.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/influxdb/chunjun-connector-influxdb.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/kafka/chunjun-connector-kafka.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/kudu/chunjun-connector-kudu.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/mongodb/chunjun-connector-mongodb.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/mysqlcdc/chunjun-connector-mysqlcdc.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/mysql/chunjun-connector-mysql.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/mysqld/chunjun-connector-mysqld.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/oceanbasecdc/chunjun-connector-oceanbasecdc.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/oceanbase/chunjun-connector-oceanbase.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/oracle/chunjun-connector-oracle.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/oraclelogminer/chunjun-connector-oraclelogminer.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/pgwal/chunjun-connector-pgwal.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/postgresql/chunjun-connector-postgresql.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/rabbitmq/chunjun-connector-rabbitmq.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/redis/chunjun-connector-redis.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/rocketmq/chunjun-connector-rocketmq.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/s3/chunjun-connector-s3.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/saphana/chunjun-connector-saphana.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/socket/chunjun-connector-socket.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/solr/chunjun-connector-solr.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/sqlservercdc/chunjun-connector-sqlservercdc.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/sqlserver/chunjun-connector-sqlserver.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/starrocks/chunjun-connector-starrocks.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/stream/chunjun-connector-stream.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/sybase/chunjun-connector-sybase.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/upsert-kafka/chunjun-connector-kafka.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/vertica11/chunjun-connector-vertica11.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/ddl-plugins/mysql/chunjun-ddl-mysql.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/ddl-plugins/oracle/chunjun-ddl-oracle.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/dirty-data-collector/log/chunjun-dirty-log.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/dirty-data-collector/mysql/chunjun-dirty-mysql.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/formats/pbformat/chunjun-protobuf.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/metrics/mysql/chunjun-metrics-mys ql.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/metrics/prometheus/chunjun-metrics-prometheus.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/restore-plugins/mysql/chunjun-restore-mysql. jar
/data/insight_plugin1.12/flink-1.12.7/lib/flink-connector-jdbc_2.12-1.12.7.jar
/data/insight_plugin1.12/flink-1.12.7/lib/flink-connector-mysql-cdc-1.3.0.jar
/data/insight_plugin1.12/flink-1.12.7/lib/flink-csv-1.12.7.jar
/data/insight_plugin1.12/flink-1.12.7/lib/flink-json-1.12.7.jar
/data/insight_plugin1.12/flink-1.12.7/lib/flink-metrics-prometheus_2.12-1.12.7.jar
/data/insight_plugin1.12/flink-1.12.7/lib/flink-parquet_2.12-1.12.7.jar
/data/insight_plugin1.12/flink-1.12.7/lib/flink-shaded-zookeeper-3.4.14.jar
/data/insight_plugin1.12/flink-1.12.7/lib/flink-sql-avro-1.12.7.jar
/data/insight_plugin1.12/flink-1.12.7/lib/flink-statebackend-rocksdb_2.12-1.12.7.jar
/data/insight_plugin1.12/flink-1.12.7/lib/flink-table_2.12-1.12.7.jar
/data/insight_plugin1.12/flink-1.12.7/lib/flink-table-blink_2.12-1.12.7.jar
/data/insight_plugin1.12/flink-1.12.7/lib/iceberg-flink-runtime-0.12.0.jar
/data/insight_plugin1.12/flink-1.12.7/lib/log4j-1.2-api-2.16.0.jar
/data/insight_plugin1.12/flink-1.12.7/lib/log4j-api-2.16.0.jar
/data/insight_plugin1.12/flink-1.12.7/lib/log4j-core-2.16.0.jar
/data/insight_plugin1.12/flink-1.12.7/lib/log4j-slf4j-impl-2.16.0.jar
/data/insight_plugin1.12/flink-1.12.7/lib/logback-classic-1.2.11.jar
/data/insight_plugin1.12/flink-1.12.7/lib/logback-core-1.2.11.jar
/data/insight_plugin1.12/flink-1.12.7/lib/flink-dist_2.12-1.12.7.jar
继续尝试了下,发现了一种情况。
CREATE TABLE test1(
id INT,
name varchar,
p_id int,
updateTime timestamp,
primary key(id) not enforced
)WITH(
'connector'='mysql-cdc',
'hostname'='192.168.1.121',
'port'='3306',
'database-name'='dgy',
'table-name'='test1',
'username'='root',
'password'='Daxxx3'
);
第一次直接执行会报如下的错误
Caused by: java.lang.RuntimeException: Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
Available factory identifiers are:
datagen
filesystem
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:303)
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:420)
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:173)
... 63 more
如果更改 connector 为 mysqlcdc-x,报错如下
Caused by: java.lang.RuntimeException: Could not find any factory for identifier 'mysqlcdc-x' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
Available factory identifiers are:
datagen
filesystem
jdbc
mysql-cdc
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:303)
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:420)
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:173)
... 63 more
这时候把 connector 改回 mysql-cdc 就正常了
你的启动命令好像是指定的json脚本模式提交,而不是sql模式,建议多参考官网例子看一下,mysqlcdc并不稳定吧
你的启动命令好像是指定的json脚本模式提交,而不是sql模式,建议多参考官网例子看一下,mysqlcdc并不稳定吧
指定了 sql 模式还是一样,我看 submit.sh 也有作类型适配,sql 文件后缀会当成 sql 类型处理,可能还是其他问题~
我修改了 org.apache.flink.table.factories.FactoryUtil 这个文件,重新编译下就好了
不确定是bug还是我环境的问题,反正就先这样,给后面的人一点参考,如果有更加完美的方案,麻烦告知一下,谢谢。