seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Bug] [connector-file] HdfsFile sink lost data

Open zhaoli2333 opened this issue 2 years ago • 7 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

We sync data from tidb to hdfs using flink engin and Streaming mode, after task finished, we found about half the data was missing, cause the previous hdfs file was removed.

SeaTunnel Version

2.3.3

SeaTunnel Config

{
    "env":{
        "execution.parallelism":1,
        "job.mode":"STREAMING",
        "checkpoint.interval":10000,
        "execution.checkpoint.interval":10000,
        "execution.checkpoint.data-uri":"hdfs://seatunnel/checkpoint"
    },
    "source":{
        "Jdbc":{
            "result_table_name": "kafka_topic_usage",
            "url":"jdbc:mysql://xxxx24000/bigdata_component",
            "driver":"com.mysql.cj.jdbc.Driver",
            "user":"xxxx",
            "password":"xxxxxx",
            "query":"select * from kafka_topic_usage",
            "partition_column":"id",
            "partition_num":4
        }
    },
    "sink":{
        "HdfsFile": {
          "source_table_name": "kafka_topic_usage1",
          "fs.defaultFS": "hdfs://qccnn1",
          "path": "/seatinnel/hive/kafka_topic_usage",
          "file_format_type": "orc"
       }
    }
}

Running Command

/bin/start-seatunnel-flink-15-connector-v2.sh --config /usr/local/seatunnel-2.3.3/conf --target yarn-per-job

Error Exception

2023-12-01 16:16:42,814 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to register at job manager akka.tcp://[email protected]:2245/user/rpc/jobmanager_1 with leader id 54f91177-d28c-4905-b79f-d890bd8bd77a.
2023-12-01 16:16:42,841 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved JobManager address, beginning registration
2023-12-01 16:16:42,871 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful registration at job manager akka.tcp://[email protected]:2245/user/rpc/jobmanager_1 for job 1ec94935106d4b54972ef75dc58ffc53.
2023-12-01 16:16:42,872 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Establish JobManager connection for job 1ec94935106d4b54972ef75dc58ffc53.
2023-12-01 16:16:42,877 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Offer reserved slots to the leader of job 1ec94935106d4b54972ef75dc58ffc53.
2023-12-01 16:16:42,934 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 518fb36db8369eb5982aee49191d309c.
2023-12-01 16:16:42,961 INFO  org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader [] - Creating a changelog storage with name 'memory'.
2023-12-01 16:16:42,988 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task Source: SeaTunnel JdbcSource -> kafka_topic_usage[1] -> Calc[2] -> SinkConversion[3] -> Flat Map -> kafka_topic_usage1[4] -> SinkConversion[5] (1/1)#0 (0199df6a286aa880dd7ffdd7b3ceed06), deploy into slot with allocation id 518fb36db8369eb5982aee49191d309c.
2023-12-01 16:16:42,990 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: SeaTunnel JdbcSource -> kafka_topic_usage[1] -> Calc[2] -> SinkConversion[3] -> Flat Map -> kafka_topic_usage1[4] -> SinkConversion[5] (1/1)#0 (0199df6a286aa880dd7ffdd7b3ceed06) switched from CREATED to DEPLOYING.
2023-12-01 16:16:42,993 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Loading JAR files for task Source: SeaTunnel JdbcSource -> kafka_topic_usage[1] -> Calc[2] -> SinkConversion[3] -> Flat Map -> kafka_topic_usage1[4] -> SinkConversion[5] (1/1)#0 (0199df6a286aa880dd7ffdd7b3ceed06) [DEPLOYING].
2023-12-01 16:16:42,994 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 518fb36db8369eb5982aee49191d309c.
2023-12-01 16:16:43,019 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task HdfsFile: Writer -> HdfsFile: Committer (1/1)#0 (2168727360fe943a1150ed5493b671cc), deploy into slot with allocation id 518fb36db8369eb5982aee49191d309c.
2023-12-01 16:16:43,020 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - HdfsFile: Writer -> HdfsFile: Committer (1/1)#0 (2168727360fe943a1150ed5493b671cc) switched from CREATED to DEPLOYING.
2023-12-01 16:16:43,020 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Loading JAR files for task HdfsFile: Writer -> HdfsFile: Committer (1/1)#0 (2168727360fe943a1150ed5493b671cc) [DEPLOYING].
2023-12-01 16:16:43,021 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 518fb36db8369eb5982aee49191d309c.
2023-12-01 16:16:43,029 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task HdfsFile: Global Committer (1/1)#0 (38611013dac2ea15e318f9075140cc5b), deploy into slot with allocation id 518fb36db8369eb5982aee49191d309c.
2023-12-01 16:16:43,029 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - HdfsFile: Global Committer (1/1)#0 (38611013dac2ea15e318f9075140cc5b) switched from CREATED to DEPLOYING.
2023-12-01 16:16:43,029 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Loading JAR files for task HdfsFile: Global Committer (1/1)#0 (38611013dac2ea15e318f9075140cc5b) [DEPLOYING].
2023-12-01 16:16:43,030 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 518fb36db8369eb5982aee49191d309c.
2023-12-01 16:16:43,071 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Using job/cluster config to configure application-defined state backend: File State Backend (checkpoints: 'hdfs://qccnn1/seatunnel/checkpoint', savepoints: 'null, fileStateThreshold: -1)
2023-12-01 16:16:43,071 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Using job/cluster config to configure application-defined state backend: File State Backend (checkpoints: 'hdfs://qccnn1/seatunnel/checkpoint', savepoints: 'null, fileStateThreshold: -1)
2023-12-01 16:16:43,071 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Using job/cluster config to configure application-defined state backend: File State Backend (checkpoints: 'hdfs://qccnn1/seatunnel/checkpoint', savepoints: 'null, fileStateThreshold: -1)
2023-12-01 16:16:43,073 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Using application-defined state backend: File State Backend (checkpoints: 'hdfs://qccnn1/seatunnel/checkpoint', savepoints: 'hdfs://qccnn1/data/flink/flink-savepoints, fileStateThreshold: 20480)
2023-12-01 16:16:43,073 INFO  org.apache.flink.runtime.state.StateBackendLoader            [] - State backend loader loads the state backend as FsStateBackend
2023-12-01 16:16:43,073 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Using application-defined state backend: File State Backend (checkpoints: 'hdfs://qccnn1/seatunnel/checkpoint', savepoints: 'hdfs://qccnn1/data/flink/flink-savepoints, fileStateThreshold: 20480)
2023-12-01 16:16:43,077 INFO  org.apache.flink.runtime.state.StateBackendLoader            [] - State backend loader loads the state backend as FsStateBackend
2023-12-01 16:16:43,077 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Using application-defined state backend: File State Backend (checkpoints: 'hdfs://qccnn1/seatunnel/checkpoint', savepoints: 'hdfs://qccnn1/data/flink/flink-savepoints, fileStateThreshold: 20480)
2023-12-01 16:16:43,077 INFO  org.apache.flink.runtime.state.StateBackendLoader            [] - State backend loader loads the state backend as FsStateBackend
2023-12-01 16:16:43,077 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Using legacy state backend File State Backend (checkpoints: 'hdfs://qccnn1/seatunnel/checkpoint', savepoints: 'hdfs://qccnn1/data/flink/flink-savepoints, fileStateThreshold: 20480) as Job checkpoint storage
2023-12-01 16:16:43,077 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Using legacy state backend File State Backend (checkpoints: 'hdfs://qccnn1/seatunnel/checkpoint', savepoints: 'hdfs://qccnn1/data/flink/flink-savepoints, fileStateThreshold: 20480) as Job checkpoint storage
2023-12-01 16:16:43,077 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Using legacy state backend File State Backend (checkpoints: 'hdfs://qccnn1/seatunnel/checkpoint', savepoints: 'hdfs://qccnn1/data/flink/flink-savepoints, fileStateThreshold: 20480) as Job checkpoint storage
2023-12-01 16:16:43,077 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Checkpoint storage passed via StreamExecutionEnvironment is ignored because legacy state backend 'org.apache.flink.runtime.state.filesystem.FsStateBackend' is used. Legacy state backends can also be used as checkpoint storage and take precedence for backward-compatibility reasons.
2023-12-01 16:16:43,078 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Checkpoint storage passed via StreamExecutionEnvironment is ignored because legacy state backend 'org.apache.flink.runtime.state.filesystem.FsStateBackend' is used. Legacy state backends can also be used as checkpoint storage and take precedence for backward-compatibility reasons.
2023-12-01 16:16:43,078 WARN  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Checkpoint storage passed via StreamExecutionEnvironment is ignored because legacy state backend 'org.apache.flink.runtime.state.filesystem.FsStateBackend' is used. Legacy state backends can also be used as checkpoint storage and take precedence for backward-compatibility reasons.
2023-12-01 16:16:43,096 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - HdfsFile: Global Committer (1/1)#0 (38611013dac2ea15e318f9075140cc5b) switched from DEPLOYING to INITIALIZING.
2023-12-01 16:16:43,096 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - HdfsFile: Writer -> HdfsFile: Committer (1/1)#0 (2168727360fe943a1150ed5493b671cc) switched from DEPLOYING to INITIALIZING.
2023-12-01 16:16:43,097 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: SeaTunnel JdbcSource -> kafka_topic_usage[1] -> Calc[2] -> SinkConversion[3] -> Flat Map -> kafka_topic_usage1[4] -> SinkConversion[5] (1/1)#0 (0199df6a286aa880dd7ffdd7b3ceed06) switched from DEPLOYING to INITIALIZING.
2023-12-01 16:16:43,370 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - HdfsFile: Global Committer (1/1)#0 (38611013dac2ea15e318f9075140cc5b) switched from INITIALIZING to RUNNING.
2023-12-01 16:16:43,370 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - HdfsFile: Writer -> HdfsFile: Committer (1/1)#0 (2168727360fe943a1150ed5493b671cc) switched from INITIALIZING to RUNNING.
2023-12-01 16:16:43,578 INFO  org.apache.seatunnel.translation.flink.source.BaseSeaTunnelSourceFunction [] - Consumer subtask 0 has no restore state.
2023-12-01 16:16:44,137 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: SeaTunnel JdbcSource -> kafka_topic_usage[1] -> Calc[2] -> SinkConversion[3] -> Flat Map -> kafka_topic_usage1[4] -> SinkConversion[5] (1/1)#0 (0199df6a286aa880dd7ffdd7b3ceed06) switched from INITIALIZING to RUNNING.
2023-12-01 16:16:44,143 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator [] - Starting to calculate splits.
2023-12-01 16:16:44,144 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator [] - Assigning JdbcSourceSplit(parameterValues=[1, 316202], splitId=0) to 0 reader.
2023-12-01 16:16:44,145 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator [] - Assigning JdbcSourceSplit(parameterValues=[316203, 632404], splitId=1) to 0 reader.
2023-12-01 16:16:44,145 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator [] - Assigning JdbcSourceSplit(parameterValues=[632405, 948606], splitId=2) to 0 reader.
2023-12-01 16:16:44,145 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator [] - Assigning JdbcSourceSplit(parameterValues=[948607, 1264808], splitId=3) to 0 reader.
2023-12-01 16:16:44,145 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator [] - Assign splits [JdbcSourceSplit(parameterValues=[1, 316202], splitId=0), JdbcSourceSplit(parameterValues=[316203, 632404], splitId=1), JdbcSourceSplit(parameterValues=[632405, 948606], splitId=2), JdbcSourceSplit(parameterValues=[948607, 1264808], splitId=3)] to reader 0
2023-12-01 16:16:45,150 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcInputFormat [] - Executing SQL: 'com.mysql.cj.jdbc.ClientPreparedStatement: /**  */ SELECT * FROM (select * from kafka_topic_usage) tt where id >= 1 AND id <= 316202'
2023-12-01 16:16:45,370 WARN  org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils [] - Principal [null] or keytabPath [null] is empty, it will skip kerberos authentication
2023-12-01 16:16:45,384 INFO  org.apache.seatunnel.shade.connector.file.org.apache.orc.impl.MemoryManagerImpl [] - orc.rows.between.memory.checks=5000
2023-12-01 16:16:45,427 INFO  org.apache.seatunnel.shade.connector.file.org.apache.orc.impl.PhysicalFsWriter [] - ORC writer created for path: /tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc with stripeSize: 67108864 blockSize: 268435456 compression: NONE bufferSize: 262144
2023-12-01 16:16:45,537 INFO  org.apache.seatunnel.shade.connector.file.org.apache.orc.impl.WriterImpl [] - ORC writer created for path: /tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc with stripeSize: 67108864 blockSize: 268435456 compression: NONE bufferSize: 262144
2023-12-01 16:17:01,121 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcInputFormat [] - Executing SQL: 'com.mysql.cj.jdbc.ClientPreparedStatement: /**  */ SELECT * FROM (select * from kafka_topic_usage) tt where id >= 316203 AND id <= 632404'
2023-12-01 16:17:01,723 INFO  org.apache.seatunnel.shade.connector.file.org.apache.orc.impl.PhysicalFsWriter [] - ORC writer created for path: /tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc with stripeSize: 67108864 blockSize: 268435456 compression: NONE bufferSize: 262144
2023-12-01 16:17:01,728 INFO  org.apache.seatunnel.shade.connector.file.org.apache.orc.impl.WriterImpl [] - ORC writer created for path: /tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc with stripeSize: 67108864 blockSize: 268435456 compression: NONE bufferSize: 262144
2023-12-01 16:17:10,968 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcInputFormat [] - Executing SQL: 'com.mysql.cj.jdbc.ClientPreparedStatement: /**  */ SELECT * FROM (select * from kafka_topic_usage) tt where id >= 632405 AND id <= 948606'
2023-12-01 16:17:11,274 INFO  org.apache.seatunnel.shade.connector.file.org.apache.orc.impl.PhysicalFsWriter [] - ORC writer created for path: /tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc with stripeSize: 67108864 blockSize: 268435456 compression: NONE bufferSize: 262144
2023-12-01 16:17:11,278 INFO  org.apache.seatunnel.shade.connector.file.org.apache.orc.impl.WriterImpl [] - ORC writer created for path: /tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc with stripeSize: 67108864 blockSize: 268435456 compression: NONE bufferSize: 262144
2023-12-01 16:17:11,411 WARN  org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils [] - Principal [null] or keytabPath [null] is empty, it will skip kerberos authentication
2023-12-01 16:17:11,450 INFO  org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils [] - begin rename file oldName :[/tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] to newName :[/seatinnel/hive/kafka_topic_usage/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc]
2023-12-01 16:17:11,468 INFO  org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils [] - rename file :[/tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] to [/seatinnel/hive/kafka_topic_usage/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] finish
2023-12-01 16:17:18,216 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcInputFormat [] - Executing SQL: 'com.mysql.cj.jdbc.ClientPreparedStatement: /**  */ SELECT * FROM (select * from kafka_topic_usage) tt where id >= 948607 AND id <= 1264808'
2023-12-01 16:17:18,480 INFO  org.apache.seatunnel.shade.connector.file.org.apache.orc.impl.PhysicalFsWriter [] - ORC writer created for path: /tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc with stripeSize: 67108864 blockSize: 268435456 compression: NONE bufferSize: 262144
2023-12-01 16:17:18,483 INFO  org.apache.seatunnel.shade.connector.file.org.apache.orc.impl.WriterImpl [] - ORC writer created for path: /tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc with stripeSize: 67108864 blockSize: 268435456 compression: NONE bufferSize: 262144
2023-12-01 16:17:18,531 INFO  org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils [] - begin rename file oldName :[/tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] to newName :[/seatinnel/hive/kafka_topic_usage/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc]
2023-12-01 16:17:18,536 INFO  org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils [] - Delete already file: /seatinnel/hive/kafka_topic_usage/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc
2023-12-01 16:17:18,538 INFO  org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils [] - rename file :[/tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] to [/seatinnel/hive/kafka_topic_usage/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] finish
2023-12-01 16:17:23,087 INFO  org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceReader [] - Closed the bounded jdbc source
2023-12-01 16:17:23,395 INFO  org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils [] - begin rename file oldName :[/tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] to newName :[/seatinnel/hive/kafka_topic_usage/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc]
2023-12-01 16:17:23,396 WARN  org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils [] - rename file :[/tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] to [/seatinnel/hive/kafka_topic_usage/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] already finished in the last commit, skip
2023-12-01 16:17:29,109 INFO  org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils [] - begin rename file oldName :[/tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] to newName :[/seatinnel/hive/kafka_topic_usage/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc]
2023-12-01 16:17:29,110 WARN  org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils [] - rename file :[/tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] to [/seatinnel/hive/kafka_topic_usage/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] already finished in the last commit, skip
2023-12-01 16:17:39,118 INFO  org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils [] - begin rename file oldName :[/tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] to newName :[/seatinnel/hive/kafka_topic_usage/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc]
2023-12-01 16:17:39,129 WARN  org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils [] - rename file :[/tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] to [/seatinnel/hive/kafka_topic_usage/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] already finished in the last commit, skip
2023-12-01 16:17:39,133 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: SeaTunnel JdbcSource -> kafka_topic_usage[1] -> Calc[2] -> SinkConversion[3] -> Flat Map -> kafka_topic_usage1[4] -> SinkConversion[5] (1/1)#0 (0199df6a286aa880dd7ffdd7b3ceed06) switched from RUNNING to FINISHED.
2023-12-01 16:17:39,133 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Source: SeaTunnel JdbcSource -> kafka_topic_usage[1] -> Calc[2] -> SinkConversion[3] -> Flat Map -> kafka_topic_usage1[4] -> SinkConversion[5] (1/1)#0 (0199df6a286aa880dd7ffdd7b3ceed06).
2023-12-01 16:17:39,134 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - HdfsFile: Writer -> HdfsFile: Committer (1/1)#0 (2168727360fe943a1150ed5493b671cc) switched from RUNNING to FINISHED.
2023-12-01 16:17:39,134 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for HdfsFile: Writer -> HdfsFile: Committer (1/1)#0 (2168727360fe943a1150ed5493b671cc).
2023-12-01 16:17:39,134 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FINISHED to JobManager for task Source: SeaTunnel JdbcSource -> kafka_topic_usage[1] -> Calc[2] -> SinkConversion[3] -> Flat Map -> kafka_topic_usage1[4] -> SinkConversion[5] (1/1)#0 0199df6a286aa880dd7ffdd7b3ceed06.
2023-12-01 16:17:39,139 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - HdfsFile: Global Committer (1/1)#0 (38611013dac2ea15e318f9075140cc5b) switched from RUNNING to FINISHED.
2023-12-01 16:17:39,140 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for HdfsFile: Global Committer (1/1)#0 (38611013dac2ea15e318f9075140cc5b).
2023-12-01 16:17:39,141 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FINISHED to JobManager for task HdfsFile: Writer -> HdfsFile: Committer (1/1)#0 2168727360fe943a1150ed5493b671cc.
2023-12-01 16:17:39,145 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FINISHED to JobManager for task HdfsFile: Global Committer (1/1)#0 38611013dac2ea15e318f9075140cc5b.
2023-12-01 16:17:39,428 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1, taskHeapMemory=384.000mb (402653174 bytes), taskOffHeapMemory=0 bytes, managedMemory=512.000mb (536870920 bytes), networkMemory=128.000mb (134217730 bytes)}, allocationId: 518fb36db8369eb5982aee49191d309c, jobId: 1ec94935106d4b54972ef75dc58ffc53).
2023-12-01 16:17:39,431 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 1ec94935106d4b54972ef75dc58ffc53 from job leader monitoring.
2023-12-01 16:17:39,431 INFO  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
2023-12-01 16:17:39,431 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - Closing ZookeeperLeaderRetrievalDriver{connectionInformationPath='/1ec94935106d4b54972ef75dc58ffc53/connection_info'}.
2023-12-01 16:17:39,440 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close JobManager connection for job 1ec94935106d4b54972ef75dc58ffc53.
2023-12-01 16:17:39,749 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close ResourceManager connection 02e09c839313dde08c0491114a5e467a.
2023-12-01 16:17:39,770 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                 [] - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
2023-12-01 16:17:39,771 INFO  org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager [] - Shutting down TaskExecutorStateChangelogStoragesManager.
2023-12-01 16:17:39,771 INFO  org.apache.flink.runtime.blob.PermanentBlobCache             [] - Shutting down BLOB cache
2023-12-01 16:17:39,771 INFO  org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - Shutting down TaskExecutorLocalStateStoresManager.
2023-12-01 16:17:39,772 INFO  org.apache.flink.runtime.filecache.FileCache                 [] - removed file cache directory /tmp/flink/taskmanager/flink-dist-cache-027fd2a2-1b5f-4212-992b-f3f02d133769
2023-12-01 16:17:39,772 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Stopping TaskExecutor akka.tcp://[email protected]:3380/user/rpc/taskmanager_0.
2023-12-01 16:17:39,772 INFO  org.apache.flink.runtime.blob.TransientBlobCache             [] - Shutting down BLOB cache
2023-12-01 16:17:39,774 INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl      [] - FileChannelManager removed spill file directory /tmp/flink/taskmanager/flink-netty-shuffle-0d3ef331-6cce-4767-99af-b5679e334b33
2023-12-01 16:17:39,774 INFO  org.apache.flink.runtime.io.disk.FileChannelManagerImpl      [] - FileChannelManager removed spill file directory /tmp/flink/taskmanager/flink-io-ef24d118-a9c7-427b-bc9a-f24cfffc6e52
2023-12-01 16:17:39,776 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Stop job leader service.
2023-12-01 16:17:39,776 INFO  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
2023-12-01 16:17:39,776 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - Closing ZookeeperLeaderRetrievalDriver{connectionInformationPath='/resource_manager/connection_info'}.
2023-12-01 16:17:39,776 INFO  org.apache.flink.runtime.io.network.NettyShuffleEnvironment  [] - Shutting down the network environment and its components.
2023-12-01 16:17:39,777 INFO  org.apache.flink.runtime.io.network.netty.NettyClient        [] - Successful shutdown (took 0 ms).
2023-12-01 16:17:39,779 INFO  org.apache.flink.runtime.io.network.netty.NettyServer        [] - Successful shutdown (took 1 ms).
2023-12-01 16:17:39,780 INFO  org.apache.flink.runtime.taskexecutor.KvStateService         [] - Shutting down the kvState service and its components.
2023-12-01 16:17:39,780 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Stop job leader service.
2023-12-01 16:17:39,780 INFO  org.apache.flink.runtime.filecache.FileCache                 [] - removed file cache directory /tmp/flink/taskmanager/flink-dist-cache-027fd2a2-1b5f-4212-992b-f3f02d133769
2023-12-01 16:17:39,781 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Stopped TaskExecutor akka.tcp://[email protected]:3380/user/rpc/taskmanager_0.
2023-12-01 16:17:39,786 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Stopping Akka RPC service.
2023-12-01 16:17:39,790 INFO  org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl [] - backgroundOperationsLoop exiting
2023-12-01 16:17:39,811 WARN  akka.actor.CoordinatedShutdown                               [] - Could not addJvmShutdownHook, due to: Shutdown in progress
2023-12-01 16:17:39,828 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Shutting down remote daemon.
2023-12-01 16:17:39,830 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remote daemon shut down; proceeding with flushing remote transports.
2023-12-01 16:17:39,854 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remoting shut down.
2023-12-01 16:17:39,871 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Stopped Akka RPC service.
2023-12-01 16:17:39,895 INFO  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Session: 0x105fcdefdb232c0 closed
2023-12-01 16:17:39,895 INFO  org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - EventThread shut down for session: 0x105fcdefdb232c0
2023-12-01 16:17:39,895 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Stopping Akka RPC service.
2023-12-01 16:17:39,899 WARN  akka.actor.CoordinatedShutdown                               [] - Could not addJvmShutdownHook, due to: Shutdown in progress
2023-12-01 16:17:39,902 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Shutting down remote daemon.
2023-12-01 16:17:39,902 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remote daemon shut down; proceeding with flushing remote transports.
2023-12-01 16:17:39,908 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remoting shut down.
2023-12-01 16:17:39,929 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Stopped Akka RPC service.
2023-12-01 16:17:39,929 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - Terminating TaskManagerRunner with exit code 1


### Zeta or Flink or Spark Version

1.15.4

### Java or Scala Version

1.8

### Screenshots

_No response_

### Are you willing to submit PR?

- [ ] Yes I am willing to submit a PR!

### Code of Conduct

- [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)

zhaoli2333 avatar Dec 04 '23 01:12 zhaoli2333

mysql table DDL and data volume?

zhilinli123 avatar Dec 05 '23 02:12 zhilinli123

mysql table DDL and data volume?

@zhilinli123 I found it is the same problem as the problem mentioned in issue https://github.com/apache/seatunnel/issues/5299. Because "FlinkSinkWriter.snapshotState()" never be called, so "checkpointId" never ++, and the generated fileName was always the same , so the previous file was overwritten by the latter.

And I encountered this problem using seatunnel + flink-1.15, but after I changed to seatunnel + flink-1.14 , every thing was ok.

zhaoli2333 avatar Dec 05 '23 08:12 zhaoli2333

mysql table DDL and data volume?

@zhilinli123 I found it is the same problem as the problem mentioned in issue #5299. Because "FlinkSinkWriter.snapshotState()" never be called, so "checkpointId" never ++, and the generated fileName was always the same , so the previous file was overwritten by the latter.

And I encountered this problem using seatunnel + flink-1.15, but after I changed to seatunnel + flink-1.14 , every thing was ok.

Let me look at the problem

zhilinli123 avatar Dec 13 '23 11:12 zhilinli123

Configuring jdbc as above is itself a batch mode and there is no need to run it in streaming mode @zhaoli2333

env {
  job.mode = "BATCH"
}
source {
  Jdbc {
    url = "jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8"
    driver = "com.mysql.cj.jdbc.Driver"
    connection_check_timeout_sec = 100
    user = "root"
    password = "12345678"

    table_path = "test.ogg"
    #split.size = 8096
    #split.even-distribution.factor.upper-bound = 100
    #split.even-distribution.factor.lower-bound = 0.05
    #split.sample-sharding.threshold = 1000
    #split.inverse-sampling.rate = 1000
  }
}

sink {
  HdfsFile {
    fs.defaultFS = "hdfs://localhost:9000"
    path = "/user/mac/seatunnel"

  }
}

zhilinli123 avatar Dec 13 '23 12:12 zhilinli123

Configuring jdbc as above is itself a batch mode and there is no need to run it in streaming mode @zhaoli2333

env {
  job.mode = "BATCH"
}
source {
  Jdbc {
    url = "jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8"
    driver = "com.mysql.cj.jdbc.Driver"
    connection_check_timeout_sec = 100
    user = "root"
    password = "12345678"

    table_path = "test.ogg"
    #split.size = 8096
    #split.even-distribution.factor.upper-bound = 100
    #split.even-distribution.factor.lower-bound = 0.05
    #split.sample-sharding.threshold = 1000
    #split.inverse-sampling.rate = 1000
  }
}

sink {
  HdfsFile {
    fs.defaultFS = "hdfs://localhost:9000"
    path = "/user/mac/seatunnel"

  }
}

@zhilinli123 BATCH mode is ok cause it doesn't trigger any checkpoint. But there is a bug when using STREAMING mode. When using flink engine, BATCH mode is inefficient and unrecoverable, so we must use STREAMING mode. For now, we have switched to Flink 1.14 to bypass this issue.

zhaoli2333 avatar Dec 28 '23 01:12 zhaoli2333

我遇到同样的文件删除重新创建问题,但是,我的source是kafka,只能使用STREAMING模式,而不能使用batch,请问我需要怎么处理才能解决

czlh avatar Feb 20 '24 02:02 czlh

@czlh use flink 1.14

zhaoli2333 avatar Feb 22 '24 08:02 zhaoli2333