chunjun icon indicating copy to clipboard operation
chunjun copied to clipboard

[Bug] [mysqlcdc] 运行mysqlcdc脚本报错

Open caiyuyux opened this issue 2 years ago • 5 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

flink 版本是 1.12.7,chunjun 是 chunjun-1.12_release 分支最新代码

image

Standalone 模式下运行 mysqlcdc 脚本报错 Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath 具体异常信息如下

Unable to create a source for reading table 'default_catalog.default_database.test1'.

Table options are:

'connector'='mysql-cdc'
'database-name'='dgy'
'hostname'='192.168.1.121'
'password'='******'
'port'='3306'
'table-name'='test1'
'username'='root'
        at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:177)
        at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:254)
        at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:100)
        at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertJoin(SqlToRelConverter.java:2864)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2162)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2169)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:902)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:871)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:564)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:248)
        at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)
        at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:50)
        at com.dtstack.chunjun.sql.parser.InsertStmtParser.execStmt(InsertStmtParser.java:47)
        at com.dtstack.chunjun.sql.parser.AbstractStmtParser.handleStmt(AbstractStmtParser.java:50)
        at com.dtstack.chunjun.sql.parser.AbstractStmtParser.handleStmt(AbstractStmtParser.java:52)
        at com.dtstack.chunjun.sql.parser.AbstractStmtParser.handleStmt(AbstractStmtParser.java:52)
        at com.dtstack.chunjun.sql.parser.SqlParser.lambda$parseSql$1(SqlParser.java:69)
        ... 24 more
Caused by: java.lang.RuntimeException: Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.

Available factory identifiers are:

datagen
filesystem
        at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:303)
        at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:420)
        at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:173)
        ... 58 more

What you expected to happen

脚本正常运行

How to reproduce

CREATE TABLE test1(
    id          INT,
    name        varchar,
    p_id        int,
    updateTime  timestamp,
    primary key(id) not enforced
 )WITH(
    'connector'='mysql-cdc',
    'hostname'='192.168.1.121',
    'port'='3306',
    'database-name'='dgy',
    'table-name'='test1',
    'username'='root',
    'password'='Datxx2023'
 );

CREATE TABLE test2(
    id          int,
    type        varchar,
    updateTime  timestamp,
    primary key(id) not enforced
 )WITH(
    'connector'='mysql-cdc',
    'hostname'='192.168.1.121',
    'port'='3306',
    'database-name'='dgy',
    'table-name'='test2',
    'username'='root',
    'password'='Dxx0x'
 );

CREATE TABLE testall(
    id          int,
    name        varchar,
    type        varchar,
    updateTime1        timestamp,
    updateTime2        timestamp,
    primary key(id) not enforced
) WITH (
      'connector' = 'mysql-x',
      'url' = 'jdbc:mysql://192.168.1.121:3306/dgy',
      'table-name' = 'testall',
      'username' = 'root',
      'password' = 'Dxxxx023',

      'sink.buffer-flush.max-rows' = '1024', -- 批量写数据条数,默认:1024
      'sink.buffer-flush.interval' = '3000', -- 批量写时间间隔,默认:3000毫秒
      'sink.all-replace' = 'true', -- 解释如下(其他rdb数据库类似):默认:false。定义了PRIMARY KEY才有效,否则是追加语句
                                  -- sink.all-replace = 'true' 生成如:INSERT INTO `result3`(`mid`, `mbb`, `sid`, `sbb`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `mid`=VALUES(`mid`), `mbb`=VALUES(`mbb`), `sid`=VALUES(`sid`), `sbb`=VALUES(`sbb`) 。会将所有的数据都替换。
                                  -- sink.all-replace = 'false' 生成如:INSERT INTO `result3`(`mid`, `mbb`, `sid`, `sbb`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `mid`=IFNULL(VALUES(`mid`),`mid`), `mbb`=IFNULL(VALUES(`mbb`),`mbb`), `sid`=IFNULL(VALUES(`sid`),`sid`), `sbb`=IFNULL(VALUES(`sbb`),`sbb`) 。如果新值为null,数据库中的旧值不为null,则不会覆盖。
      'sink.parallelism' = '1'    -- 写入结果的并行度,默认:null
      );

CREATE TABLE testall_es
(
    id          int,
    name        varchar,
    type        varchar,
    updateTime1        timestamp,
    updateTime2        timestamp,
    primary key(id) not enforced
)
    WITH (
        'connector' = 'elasticsearch7-x',
        'hosts' = '192.168.1.227:9200',
        'index' = 'testall',
        'client.connect-timeout' = '10000'
        );

insert 
into
    testall
    select
        id,
        name,
        type,
        updateTime1,
        updateTime2 
    from
        (     SELECT
            ck.id,
            ck.name,
            py.type,
            ck.updateTime as updateTime1,
            py.updateTime as updateTime2     
        from
            test1 ck 
        left join
            test2 py       
                on ck.p_id = py.id ) tt;

insert 
into
    testall_es
    select
        id,
        name,
        type,
        updateTime1,
        updateTime2 
    from
        (     SELECT
            ck.id,
            ck.name,
            py.type,
            ck.updateTime as updateTime1,
            py.updateTime as updateTime2     
        from
            test1 ck 
        left join
            test2 py       
                on ck.p_id = py.id ) tt;

./flink-1.12.7111/bin/start-cluster.sh

sh bin/chunjun-standalone.sh -job test.sql

Anything else

No response

Version

1.12_release

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

caiyuyux avatar Jun 29 '23 07:06 caiyuyux

flink classpath 如下

/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/chunjun-core.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/binlog/chunjun-connector-binlog.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/cassandra/chunjun-connector-cassandra.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/clickhouse/chunjun-connector-clickhouse.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/db2/chunjun-connector-db2.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/dm/chunjun-connector-dm.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/doris/chunjun-connector-doris.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/elasticsearch7/chunjun-connector-elasticsearch7.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/emqx/chunjun-connector-emqx.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/file/chunjun-connector-file.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/filesystem/chunjun-connector-filesystem.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/ftp/chunjun-connector-ftp.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/greenplum/chunjun-connector-greenplum.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/hbase14/chunjun-connector-hbase-1.4.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/hbase2/chunjun-connector-hbase2.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/hdfs/chunjun-connector-hdfs.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/hive3/chunjun-connector-hive3.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/hive/chunjun-connector-hiv    e.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/http/chunjun-connector-http.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/iceberg/chunjun-connector-iceberg.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/influxdb/chunjun-connector-influxdb.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/kafka/chunjun-connector-kafka.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/kudu/chunjun-connector-kudu.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/mongodb/chunjun-connector-mongodb.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/mysqlcdc/chunjun-connector-mysqlcdc.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/mysql/chunjun-connector-mysql.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/mysqld/chunjun-connector-mysqld.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/oceanbasecdc/chunjun-connector-oceanbasecdc.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/oceanbase/chunjun-connector-oceanbase.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/oracle/chunjun-connector-oracle.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/oraclelogminer/chunjun-connector-oraclelogminer.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/pgwal/chunjun-connector-pgwal.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/postgresql/chunjun-connector-postgresql.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/rabbitmq/chunjun-connector-rabbitmq.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/redis/chunjun-connector-redis.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/rocketmq/chunjun-connector-rocketmq.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/s3/chunjun-connector-s3.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/saphana/chunjun-connector-saphana.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/socket/chunjun-connector-socket.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/solr/chunjun-connector-solr.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/sqlservercdc/chunjun-connector-sqlservercdc.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/sqlserver/chunjun-connector-sqlserver.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/starrocks/chunjun-connector-starrocks.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/stream/chunjun-connector-stream.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/sybase/chunjun-connector-sybase.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/upsert-kafka/chunjun-connector-kafka.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/connector/vertica11/chunjun-connector-vertica11.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/ddl-plugins/mysql/chunjun-ddl-mysql.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/ddl-plugins/oracle/chunjun-ddl-oracle.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/dirty-data-collector/log/chunjun-dirty-log.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/dirty-data-collector/mysql/chunjun-dirty-mysql.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/formats/pbformat/chunjun-protobuf.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/metrics/mysql/chunjun-metrics-mys    ql.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/metrics/prometheus/chunjun-metrics-prometheus.jar
/data/insight_plugin1.12/flink-1.12.7/lib/chunjun-dist/restore-plugins/mysql/chunjun-restore-mysql.    jar
/data/insight_plugin1.12/flink-1.12.7/lib/flink-connector-jdbc_2.12-1.12.7.jar
/data/insight_plugin1.12/flink-1.12.7/lib/flink-connector-mysql-cdc-1.3.0.jar
/data/insight_plugin1.12/flink-1.12.7/lib/flink-csv-1.12.7.jar
/data/insight_plugin1.12/flink-1.12.7/lib/flink-json-1.12.7.jar
/data/insight_plugin1.12/flink-1.12.7/lib/flink-metrics-prometheus_2.12-1.12.7.jar
/data/insight_plugin1.12/flink-1.12.7/lib/flink-parquet_2.12-1.12.7.jar
/data/insight_plugin1.12/flink-1.12.7/lib/flink-shaded-zookeeper-3.4.14.jar
/data/insight_plugin1.12/flink-1.12.7/lib/flink-sql-avro-1.12.7.jar
/data/insight_plugin1.12/flink-1.12.7/lib/flink-statebackend-rocksdb_2.12-1.12.7.jar
/data/insight_plugin1.12/flink-1.12.7/lib/flink-table_2.12-1.12.7.jar
/data/insight_plugin1.12/flink-1.12.7/lib/flink-table-blink_2.12-1.12.7.jar
/data/insight_plugin1.12/flink-1.12.7/lib/iceberg-flink-runtime-0.12.0.jar
/data/insight_plugin1.12/flink-1.12.7/lib/log4j-1.2-api-2.16.0.jar
/data/insight_plugin1.12/flink-1.12.7/lib/log4j-api-2.16.0.jar
/data/insight_plugin1.12/flink-1.12.7/lib/log4j-core-2.16.0.jar
/data/insight_plugin1.12/flink-1.12.7/lib/log4j-slf4j-impl-2.16.0.jar
/data/insight_plugin1.12/flink-1.12.7/lib/logback-classic-1.2.11.jar
/data/insight_plugin1.12/flink-1.12.7/lib/logback-core-1.2.11.jar
/data/insight_plugin1.12/flink-1.12.7/lib/flink-dist_2.12-1.12.7.jar

caiyuyux avatar Jun 30 '23 02:06 caiyuyux

继续尝试了下,发现了一种情况。

CREATE TABLE test1(
    id          INT,
    name        varchar,
    p_id        int,
    updateTime  timestamp,
    primary key(id) not enforced
 )WITH(
    'connector'='mysql-cdc',
    'hostname'='192.168.1.121',
    'port'='3306',
    'database-name'='dgy',
    'table-name'='test1',
    'username'='root',
    'password'='Daxxx3'
 );

第一次直接执行会报如下的错误

Caused by: java.lang.RuntimeException: Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.

Available factory identifiers are:

datagen
filesystem
	at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:303)
	at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:420)
	at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:173)
	... 63 more

如果更改 connector 为 mysqlcdc-x,报错如下

Caused by: java.lang.RuntimeException: Could not find any factory for identifier 'mysqlcdc-x' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.

Available factory identifiers are:

datagen
filesystem
jdbc
mysql-cdc
	at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:303)
	at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:420)
	at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:173)
	... 63 more

这时候把 connector 改回 mysql-cdc 就正常了

caiyuyux avatar Jun 30 '23 06:06 caiyuyux

你的启动命令好像是指定的json脚本模式提交,而不是sql模式,建议多参考官网例子看一下,mysqlcdc并不稳定吧

KerYooz avatar Jun 30 '23 06:06 KerYooz

你的启动命令好像是指定的json脚本模式提交,而不是sql模式,建议多参考官网例子看一下,mysqlcdc并不稳定吧

指定了 sql 模式还是一样,我看 submit.sh 也有作类型适配,sql 文件后缀会当成 sql 类型处理,可能还是其他问题~

caiyuyux avatar Jun 30 '23 07:06 caiyuyux

我修改了 org.apache.flink.table.factories.FactoryUtil 这个文件,重新编译下就好了

不确定是bug还是我环境的问题,反正就先这样,给后面的人一点参考,如果有更加完美的方案,麻烦告知一下,谢谢。

image

caiyuyux avatar Jun 30 '23 07:06 caiyuyux