chunjun icon indicating copy to clipboard operation
chunjun copied to clipboard

[Question][chunjun-oracle] oracle的clob类型,使用transformer报错

Open J1aHe opened this issue 1 year ago • 0 comments

Search before asking

  • [X] I had searched in the issues and found no similar question.

  • [X] I had googled my question but i didn't get any help.

  • [X] I had read the documentation: ChunJun doc but it didn't help me.

Description

这是报错

 Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. com.dtstack.chunjun.connector.oracle.converter.ClobType cannot be cast to org.apache.flink.table.types.logical.VarCharType
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:640)
	at com.dtstack.chunjun.Main.syncStreamToTable(Main.java:288)
	at com.dtstack.chunjun.Main.exeSyncJob(Main.java:246)
	at com.dtstack.chunjun.Main.main(Main.java:137)
	at com.dtstack.chunjun.local.test.LocalTest.main(LocalTest.java:136)
Caused by: java.lang.ClassCastException: com.dtstack.chunjun.connector.oracle.converter.ClobType cannot be cast to org.apache.flink.table.types.logical.VarCharType
	at org.apache.flink.table.planner.calcite.FlinkTypeFactory.newRelDataType$1(FlinkTypeFactory.scala:76)
	at org.apache.flink.table.planner.calcite.FlinkTypeFactory.createFieldTypeFromLogicalType(FlinkTypeFactory.scala:167)
	at org.apache.flink.table.planner.calcite.FlinkTypeFactory.$anonfun$buildStructType$1(FlinkTypeFactory.scala:254)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.flink.table.planner.calcite.FlinkTypeFactory.buildStructType(FlinkTypeFactory.scala:252)
	at org.apache.flink.table.planner.calcite.FlinkTypeFactory.buildRelNodeRowType(FlinkTypeFactory.scala:224)
	at org.apache.flink.table.planner.sources.TableSourceUtil$.getSourceRowType(TableSourceUtil.scala:191)
	at org.apache.flink.table.planner.sources.TableSourceUtil.getSourceRowType(TableSourceUtil.scala)
	at org.apache.flink.table.planner.catalog.CatalogSchemaTable.getRowType(CatalogSchemaTable.java:171)
	at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:159)
	at org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
	at org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
	at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112)
	at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:184)
	at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3205)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3187)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3461)
	at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
	at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041)
	at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1016)
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)
	... 8 more

这是json

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "oraclereader",
          "parameter": {
            "allowCreateSlot": false,
            "column": [
              {
                "index": 10,
                "name": "SHIPPING_ADDRESS",
                "type": "CLOB"
              },
              {
                "index": 11,
                "name": "BILLING_ADDRESS",
                "type": "CLOB"
              }
            ],
            "connection": [
              {
                "jdbcUrl": [
                  ""
                ],
                "table": [
                  "ORDERS1"
                ]
              }
            ],
            "password": "",
            "username": "",
            "where": ""
          },
          "table": {
            "tableName": "sourceTable"
          }
        },
        "transformer": {
          "transformSql": "select SHIPPING_ADDRESS,BILLING_ADDRESS from sourceTable "
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "column": [
              {
                "index": 10,
                "name": "SHIPPING_ADDRESS",
                "type": "longtext"
              },
              {
                "index": 11,
                "name": "BILLING_ADDRESS",
                "type": "longtext"
              }
            ],
            "connection": [
              {
                "jdbcUrl": "",
                "table": [
                  "ORDERS1"
                ]
              }
            ],
            "password": "",
            "preSql": [
              "truncate table ORDERS1"
            ],
            "username": ""
          },
          "table": {
            "tableName": "sinkTable"
          }
        }
      }
    ],
    "setting": {
      "errorLimit": {
        "record": 0
      },
      "speed": {
        "bytes": 10485760,
        "writerChannel": 2
      }
    }
  }
}

我本地调试后发现好像是flink的报错,但是具体的还是不太清楚,想问下在使用transformer的情况下clob类型怎么进行同步呢

Code of Conduct

J1aHe avatar Apr 12 '24 06:04 J1aHe