chunjun
chunjun copied to clipboard
[Bug][chunjun-connection] DateTime type conversion fails in different modes
Search before asking
-
[X] I had searched in the issues and found no similar question.
-
[X] I had googled my question but i didn't get any help.
-
[X] I had read the documentation: ChunJun doc but it didn't help me.
Description
Description
When I try to run the task in Flink on yarn mode, it throws an exception, but when I run the task in Flink on pre_job mode, it works fine.
case
create table statement
-- MySQL CREATE TABLE(source)
CREATE TABLE `source` (
`business_date` date NOT NULL,
`tenant_id` bigint(20) NOT NULL,
`etl_time` datetime DEFAULT NULL
);
-- MySQL CREATE TABLE(sink)
CREATE TABLE `sink` (
`business_date` date DEFAULT NULL ,
`tenant_id` bigint(20) DEFAULT NULL,
`etl_time` varchar(255) DEFAULT NULL
);
-- data
INSERT INTO `source` (`business_date`, `tenant_id`, `etl_time`) VALUES ('2020-01-01', 177, '2022-05-24 17:47:36');
INSERT INTO `source` (`business_date`, `tenant_id`, `etl_time`) VALUES ('2020-01-01', 177, '2022-05-24 17:47:36');
INSERT INTO `source` (`business_date`, `tenant_id`, `etl_time`) VALUES ('2020-01-01', 177, '2022-05-24 17:47:36');
INSERT INTO `source` (`business_date`, `tenant_id`, `etl_time`) VALUES ('2020-01-02', 177, '2022-05-24 17:47:36');
INSERT INTO `source` (`business_date`, `tenant_id`, `etl_time`) VALUES ('2020-01-02', 177, '2022-05-24 17:47:36');
INSERT INTO `source` (`business_date`, `tenant_id`, `etl_time`) VALUES ('2020-01-02', 177, '2022-05-24 17:47:36');
INSERT INTO `source` (`business_date`, `tenant_id`, `etl_time`) VALUES ('2020-01-03', 177, '2022-05-24 17:47:36');
INSERT INTO `source` (`business_date`, `tenant_id`, `etl_time`) VALUES ('2020-01-03', 177, '2022-05-24 17:47:36');
INSERT INTO `source` (`business_date`, `tenant_id`, `etl_time`) VALUES ('2020-01-03', 177, '2022-05-24 17:47:36');
INSERT INTO `source` (`business_date`, `tenant_id`, `etl_time`) VALUES ('2020-01-04', 177, '2022-05-24 17:47:36');
INSERT INTO `source` (`business_date`, `tenant_id`, `etl_time`) VALUES ('2020-01-04', 177, '2022-05-24 17:47:36');
chunjun json
{
"job" : {
"content" : [ {
"reader" : {
"parameter" : {
"password" : "000000",
"startLocation" : "",
"increColumn" : "",
"column" : [ {
"format" : "",
"name" : "business_date",
"type" : "DATE",
"key" : "business_date"
}, {
"format" : "",
"name" : "tenant_id",
"type" : "BIGINT",
"key" : "tenant_id"
}, {
"format" : "",
"name" : "etl_time",
"type" : "DATETIME",
"key" : "etl_time"
} ],
"connection" : [ {
"schema" : "test",
"jdbcUrl" : [ "jdbc:mysql://localhost:3308/test?characterEncoding=UTF-8&useSSL=false&allowMultiQueries=true" ],
"type" : 1,
"table" : [ "source" ]
} ],
"username" : "root"
},
"name" : "mysqlreader"
},
"writer" : {
"parameter" : {
"postSql" : [ ],
"mode" : "insert",
"password" : "000000",
"column" : [ {
"name" : "business_date",
"format" : "",
"isPart" : false,
"type" : "DATE",
"key" : "business_date"
}, {
"name" : "tenant_id",
"format" : "",
"isPart" : false,
"type" : "BIGINT",
"key" : "tenant_id"
}, {
"name" : "etl_time",
"format" : "",
"isPart" : false,
"type" : "VARCHAR",
"key" : "etl_time"
} ],
"connection" : [ {
"jdbcUrl" : "jdbc:mysql://localhost:3308/test1?characterEncoding=UTF-8&useSSL=false&allowMultiQuerie",
"table" : [ "sink" ]
} ],
"writeMode" : "insert",
"preSql" : [ ],
"username" : "root"
},
"name" : "mysqlwriter"
}
} ],
"setting" : {
"restore" : {
"isRestore" : false,
"isStream" : false
},
"speed" : {
"readerChannel" : 1,
"writerChannel" : 1,
"bytes" : -20971520,
"channel" : 1
}
}
}
}
exception information
The exception information is as follows:
2022-09-05 16:13:48.175 [Legacy Source Thread - Source: mysqlsourcefactory -> Sink: mysqlsinkfactory (1/1)#0] ERROR com.dtstack.chunjun.connector.jdbc.converter.JdbcColumnConverter - value [2022-05-24T17:47:36] convent failed
2022-09-05 16:13:48.225 [dirty-consumer-pool-2-thread-2] WARN com.dtstack.chunjun.dirty.log.LogDirtyDataCollector -
====================Dirty Data=====================
DirtyDataEntry[jobId='ddc99f057f9d827ee95b3a8ca1ad861e', jobName='Flink_Job', operatorName='Source: mysqlsourcefactory', dirtyContent='{"extHeader":[],"byteSize":1,"arity":0,"rowKind":"INSERT","headerInfo":null,"string":"()","headers":null}', errorMessage='com.dtstack.chunjun.throwable.ReadRecordException:
java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to java.sql.Timestamp
at com.dtstack.chunjun.connector.jdbc.source.JdbcInputFormat.nextRecordInternal(JdbcInputFormat.java:309)
at com.dtstack.chunjun.source.format.BaseRichInputFormat.nextRecord(BaseRichInputFormat.java:197)
at com.dtstack.chunjun.source.format.BaseRichInputFormat.nextRecord(BaseRichInputFormat.java:67)
at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:133)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:267)
Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to java.sql.Timestamp
at com.dtstack.chunjun.connector.jdbc.converter.JdbcColumnConverter.lambda$createInternalConverter$e77f182$2(JdbcColumnConverter.java:176)
at com.dtstack.chunjun.converter.AbstractRowConverter.lambda$wrapIntoNullableInternalConverter$66e1293c$1(AbstractRowConverter.java:97)
at com.dtstack.chunjun.connector.jdbc.converter.JdbcColumnConverter.toInternal(JdbcColumnConverter.java:114)
at com.dtstack.chunjun.connector.jdbc.converter.JdbcColumnConverter.toInternal(JdbcColumnConverter.java:56)
at com.dtstack.chunjun.connector.jdbc.source.JdbcInputFormat.nextRecordInternal(JdbcInputFormat.java:287)
... 6 more
', fieldName='null', createTime=2022-09-05 16:13:48.179]
===================================================
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
At present, after modifying the latest code of the master branch, com.dtstack.chunjun.connector.jdbc.converter.JdbcColumnConverter.java 177 behaves as follows, the session mode is normal, what is the difference between pre_job and session running?
(Timestamp) val, ((TimestampType) (type)).getPrecision()); -> Timestamp.valueOf((LocalDateTime) val), ((TimestampType) (type)).getPrecision());