chunjun icon indicating copy to clipboard operation
chunjun copied to clipboard

[Bug] pgwalreader => pgwriter local模式,无法写入数据

Open yqwoe opened this issue 3 years ago • 1 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

{
  "job": {
    "content": [
      {
        "reader": {
          "parameter": {
            "temporary": false,
            "password": "yqwoe900316",
            "databaseName": "businesslogic_dev",
            "cat": "insert,update,delete",
            "tableList": [
              "public.actions"
            ],
            "allowCreateSlot": true,
            "pavingData": true,
            "url": "jdbc:postgresql://localhost:5432/businesslogic_dev?useUnicode=true&characterEncoding=utf8&currentSchema=public",
            "username": "yqwoe"
          },
          "name": "pgwalreader"
        },
        "writer": {
          "name": "postgresqlwriter",
          "parameter": {
            "username": "yqwoe",
            "password": "yqwoe900316",
            "connection": [
              {
                "jdbcUrl": "jdbc:postgresql://localhost:5432/businesslogic_dev_bak?useSSL=false",
                "table": [
                  "actions"
                ]
              }
            ],
            "writeMode": "insert",
            "column": [
              {
                "name": "id",
                "type": "bigint",
                "index":5
              },
              {
                "name": "action_type",
                "type": "varchar",
                "index":6
              },
              {
                "name": "target_type",
                "type": "varchar",
                "index":7
              },
              {
                "name": "target_id",
                "type": "bigint",
                "index":8
              },
              {
                "name": "source_type",
                "type": "varchar",
                "index":9
              },
              {
                "name": "source_id",
                "type": "bigint",
                "index":10
              },
              {
                "name": "created_at",
                "type": "timestamp",
                "index":11
              },
              {
                "name": "updated_at",
                "type": "timestamp",
                "index":12
              }
            ]
          }
        }
      }
    ],
    "setting": {
      "restore": {
        "isRestore": true,
        "isStream": true
      },
      "errorLimit": {},
      "speed": {
        "readerChannel": 1,
        "writerChannel": 1,
        "bytes": -1048576,
        "channel": 1
      }
    }
  }
}

What you expected to happen

DirtyDataEntry[jobId='cc8dfc599cfc490bb11a7ece9605cf04', jobName='Flink_Job', operatorName='Sink: postgresqlsinkfactory', dirtyContent='{"extHeader":[],"byteSize":1829,"arity":13,"headerInfo":{"schema":0,"table":1,"ts":2,"opTime":3,"type":4,"before_id":5,"before_action_type":6,"before_target_type":7,"before_target_id":8,"before_source_type":9,"before_source_id":10,"before_created_at":11,"before_updated_at":12},"rowKind":"UPDATE_BEFORE","headers":["schema","table","ts","opTime","type","before_id","before_action_type","before_target_type","before_target_id","before_source_type","before_source_id","before_created_at","before_updated_at"],"string":"(public,actions,6965234474644803584,2022-08-16 16:55:09.343000,UPDATE_BEFORE,1,join1,User,16,Event,1,2021-12-15 02:36:25.061662,2021-12-15 02:36:25.061662)"}', errorMessage='com.dtstack.chunjun.throwable.WriteRecordException:
JdbcOutputFormat [Flink_Job] writeRecord error: when converting field[0] in Row(-U(public,actions,6965234474644803584,2022-08-16 16:55:09.343000,UPDATE_BEFORE,1,join1,User,16,Event,1,2021-12-15 02:36:25.061662,2021-12-15 02:36:25.061662))
com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IndexOutOfBoundsException: Index: -1, Size: 9
	at com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat.processWriteException(JdbcOutputFormat.java:384)
	at com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat.writeSingleRecordInternal(JdbcOutputFormat.java:199)
	at com.dtstack.chunjun.connector.postgresql.sink.PostgresOutputFormat.writeSingleRecordInternal(PostgresOutputFormat.java:103)
	at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeSingleRecord(BaseRichOutputFormat.java:466)
	at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.lambda$writeRecordInternal$1(BaseRichOutputFormat.java:487)
	at java.util.ArrayList.forEach(ArrayList.java:1257)
	at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecordInternal(BaseRichOutputFormat.java:487)
	at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.lambda$initTimingSubmitTask$0(BaseRichOutputFormat.java:443)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IndexOutOfBoundsException: Index: -1, Size: 9
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
	at com.google.common.cache.LocalCache.get(LocalCache.java:3952)
	at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4871)
	at com.dtstack.chunjun.connector.jdbc.sink.PreparedStmtProxy.getOrCreateFieldNamedPstmt(PreparedStmtProxy.java:154)
	at com.dtstack.chunjun.connector.jdbc.sink.PreparedStmtProxy.writeSingleRecordInternal(PreparedStmtProxy.java:201)
	at com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat.writeSingleRecordInternal(JdbcOutputFormat.java:196)
	... 13 more
Caused by: java.lang.IndexOutOfBoundsException: Index: -1, Size: 9
	at java.util.LinkedList.checkElementIndex(LinkedList.java:555)
	at java.util.LinkedList.get(LinkedList.java:476)
	at com.dtstack.chunjun.connector.jdbc.sink.DynamicPreparedStmt.getColumnMeta(DynamicPreparedStmt.java:181)
	at com.dtstack.chunjun.connector.jdbc.sink.DynamicPreparedStmt.buildStmt(DynamicPreparedStmt.java:81)
	at com.dtstack.chunjun.connector.jdbc.sink.PreparedStmtProxy.lambda$getOrCreateFieldNamedPstmt$0(PreparedStmtProxy.java:158)
	at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4876)
	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
	... 18 more
', fieldName='null', createTime=2022-08-16 17:15:15.37]

How to reproduce

image

Anything else

No response

Version

1.12_release

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

yqwoe avatar Aug 16 '22 09:08 yqwoe

writer配置里的表名前加上schema名试试

ll076110 avatar Aug 26 '22 02:08 ll076110