chunjun
chunjun copied to clipboard
[Bug] pgwalreader => pgwriter local模式,无法写入数据
Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
{
"job": {
"content": [
{
"reader": {
"parameter": {
"temporary": false,
"password": "yqwoe900316",
"databaseName": "businesslogic_dev",
"cat": "insert,update,delete",
"tableList": [
"public.actions"
],
"allowCreateSlot": true,
"pavingData": true,
"url": "jdbc:postgresql://localhost:5432/businesslogic_dev?useUnicode=true&characterEncoding=utf8¤tSchema=public",
"username": "yqwoe"
},
"name": "pgwalreader"
},
"writer": {
"name": "postgresqlwriter",
"parameter": {
"username": "yqwoe",
"password": "yqwoe900316",
"connection": [
{
"jdbcUrl": "jdbc:postgresql://localhost:5432/businesslogic_dev_bak?useSSL=false",
"table": [
"actions"
]
}
],
"writeMode": "insert",
"column": [
{
"name": "id",
"type": "bigint",
"index":5
},
{
"name": "action_type",
"type": "varchar",
"index":6
},
{
"name": "target_type",
"type": "varchar",
"index":7
},
{
"name": "target_id",
"type": "bigint",
"index":8
},
{
"name": "source_type",
"type": "varchar",
"index":9
},
{
"name": "source_id",
"type": "bigint",
"index":10
},
{
"name": "created_at",
"type": "timestamp",
"index":11
},
{
"name": "updated_at",
"type": "timestamp",
"index":12
}
]
}
}
}
],
"setting": {
"restore": {
"isRestore": true,
"isStream": true
},
"errorLimit": {},
"speed": {
"readerChannel": 1,
"writerChannel": 1,
"bytes": -1048576,
"channel": 1
}
}
}
}
What you expected to happen
DirtyDataEntry[jobId='cc8dfc599cfc490bb11a7ece9605cf04', jobName='Flink_Job', operatorName='Sink: postgresqlsinkfactory', dirtyContent='{"extHeader":[],"byteSize":1829,"arity":13,"headerInfo":{"schema":0,"table":1,"ts":2,"opTime":3,"type":4,"before_id":5,"before_action_type":6,"before_target_type":7,"before_target_id":8,"before_source_type":9,"before_source_id":10,"before_created_at":11,"before_updated_at":12},"rowKind":"UPDATE_BEFORE","headers":["schema","table","ts","opTime","type","before_id","before_action_type","before_target_type","before_target_id","before_source_type","before_source_id","before_created_at","before_updated_at"],"string":"(public,actions,6965234474644803584,2022-08-16 16:55:09.343000,UPDATE_BEFORE,1,join1,User,16,Event,1,2021-12-15 02:36:25.061662,2021-12-15 02:36:25.061662)"}', errorMessage='com.dtstack.chunjun.throwable.WriteRecordException:
JdbcOutputFormat [Flink_Job] writeRecord error: when converting field[0] in Row(-U(public,actions,6965234474644803584,2022-08-16 16:55:09.343000,UPDATE_BEFORE,1,join1,User,16,Event,1,2021-12-15 02:36:25.061662,2021-12-15 02:36:25.061662))
com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IndexOutOfBoundsException: Index: -1, Size: 9
at com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat.processWriteException(JdbcOutputFormat.java:384)
at com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat.writeSingleRecordInternal(JdbcOutputFormat.java:199)
at com.dtstack.chunjun.connector.postgresql.sink.PostgresOutputFormat.writeSingleRecordInternal(PostgresOutputFormat.java:103)
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeSingleRecord(BaseRichOutputFormat.java:466)
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.lambda$writeRecordInternal$1(BaseRichOutputFormat.java:487)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecordInternal(BaseRichOutputFormat.java:487)
at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.lambda$initTimingSubmitTask$0(BaseRichOutputFormat.java:443)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IndexOutOfBoundsException: Index: -1, Size: 9
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
at com.google.common.cache.LocalCache.get(LocalCache.java:3952)
at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4871)
at com.dtstack.chunjun.connector.jdbc.sink.PreparedStmtProxy.getOrCreateFieldNamedPstmt(PreparedStmtProxy.java:154)
at com.dtstack.chunjun.connector.jdbc.sink.PreparedStmtProxy.writeSingleRecordInternal(PreparedStmtProxy.java:201)
at com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat.writeSingleRecordInternal(JdbcOutputFormat.java:196)
... 13 more
Caused by: java.lang.IndexOutOfBoundsException: Index: -1, Size: 9
at java.util.LinkedList.checkElementIndex(LinkedList.java:555)
at java.util.LinkedList.get(LinkedList.java:476)
at com.dtstack.chunjun.connector.jdbc.sink.DynamicPreparedStmt.getColumnMeta(DynamicPreparedStmt.java:181)
at com.dtstack.chunjun.connector.jdbc.sink.DynamicPreparedStmt.buildStmt(DynamicPreparedStmt.java:81)
at com.dtstack.chunjun.connector.jdbc.sink.PreparedStmtProxy.lambda$getOrCreateFieldNamedPstmt$0(PreparedStmtProxy.java:158)
at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4876)
at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
... 18 more
', fieldName='null', createTime=2022-08-16 17:15:15.37]
How to reproduce

Anything else
No response
Version
1.12_release
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
writer配置里的表名前加上schema名试试