[Bug] [connector-file] HdfsFile sink lost data
Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
We sync data from tidb to hdfs using flink engin and Streaming mode, after task finished, we found about half the data was missing, cause the previous hdfs file was removed.
SeaTunnel Version
2.3.3
SeaTunnel Config
{
"env":{
"execution.parallelism":1,
"job.mode":"STREAMING",
"checkpoint.interval":10000,
"execution.checkpoint.interval":10000,
"execution.checkpoint.data-uri":"hdfs://seatunnel/checkpoint"
},
"source":{
"Jdbc":{
"result_table_name": "kafka_topic_usage",
"url":"jdbc:mysql://xxxx24000/bigdata_component",
"driver":"com.mysql.cj.jdbc.Driver",
"user":"xxxx",
"password":"xxxxxx",
"query":"select * from kafka_topic_usage",
"partition_column":"id",
"partition_num":4
}
},
"sink":{
"HdfsFile": {
"source_table_name": "kafka_topic_usage1",
"fs.defaultFS": "hdfs://qccnn1",
"path": "/seatinnel/hive/kafka_topic_usage",
"file_format_type": "orc"
}
}
}
Running Command
/bin/start-seatunnel-flink-15-connector-v2.sh --config /usr/local/seatunnel-2.3.3/conf --target yarn-per-job
Error Exception
2023-12-01 16:16:42,814 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to register at job manager akka.tcp://[email protected]:2245/user/rpc/jobmanager_1 with leader id 54f91177-d28c-4905-b79f-d890bd8bd77a.
2023-12-01 16:16:42,841 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved JobManager address, beginning registration
2023-12-01 16:16:42,871 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful registration at job manager akka.tcp://[email protected]:2245/user/rpc/jobmanager_1 for job 1ec94935106d4b54972ef75dc58ffc53.
2023-12-01 16:16:42,872 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Establish JobManager connection for job 1ec94935106d4b54972ef75dc58ffc53.
2023-12-01 16:16:42,877 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Offer reserved slots to the leader of job 1ec94935106d4b54972ef75dc58ffc53.
2023-12-01 16:16:42,934 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 518fb36db8369eb5982aee49191d309c.
2023-12-01 16:16:42,961 INFO org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader [] - Creating a changelog storage with name 'memory'.
2023-12-01 16:16:42,988 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: SeaTunnel JdbcSource -> kafka_topic_usage[1] -> Calc[2] -> SinkConversion[3] -> Flat Map -> kafka_topic_usage1[4] -> SinkConversion[5] (1/1)#0 (0199df6a286aa880dd7ffdd7b3ceed06), deploy into slot with allocation id 518fb36db8369eb5982aee49191d309c.
2023-12-01 16:16:42,990 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: SeaTunnel JdbcSource -> kafka_topic_usage[1] -> Calc[2] -> SinkConversion[3] -> Flat Map -> kafka_topic_usage1[4] -> SinkConversion[5] (1/1)#0 (0199df6a286aa880dd7ffdd7b3ceed06) switched from CREATED to DEPLOYING.
2023-12-01 16:16:42,993 INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task Source: SeaTunnel JdbcSource -> kafka_topic_usage[1] -> Calc[2] -> SinkConversion[3] -> Flat Map -> kafka_topic_usage1[4] -> SinkConversion[5] (1/1)#0 (0199df6a286aa880dd7ffdd7b3ceed06) [DEPLOYING].
2023-12-01 16:16:42,994 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 518fb36db8369eb5982aee49191d309c.
2023-12-01 16:16:43,019 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task HdfsFile: Writer -> HdfsFile: Committer (1/1)#0 (2168727360fe943a1150ed5493b671cc), deploy into slot with allocation id 518fb36db8369eb5982aee49191d309c.
2023-12-01 16:16:43,020 INFO org.apache.flink.runtime.taskmanager.Task [] - HdfsFile: Writer -> HdfsFile: Committer (1/1)#0 (2168727360fe943a1150ed5493b671cc) switched from CREATED to DEPLOYING.
2023-12-01 16:16:43,020 INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task HdfsFile: Writer -> HdfsFile: Committer (1/1)#0 (2168727360fe943a1150ed5493b671cc) [DEPLOYING].
2023-12-01 16:16:43,021 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 518fb36db8369eb5982aee49191d309c.
2023-12-01 16:16:43,029 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task HdfsFile: Global Committer (1/1)#0 (38611013dac2ea15e318f9075140cc5b), deploy into slot with allocation id 518fb36db8369eb5982aee49191d309c.
2023-12-01 16:16:43,029 INFO org.apache.flink.runtime.taskmanager.Task [] - HdfsFile: Global Committer (1/1)#0 (38611013dac2ea15e318f9075140cc5b) switched from CREATED to DEPLOYING.
2023-12-01 16:16:43,029 INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task HdfsFile: Global Committer (1/1)#0 (38611013dac2ea15e318f9075140cc5b) [DEPLOYING].
2023-12-01 16:16:43,030 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 518fb36db8369eb5982aee49191d309c.
2023-12-01 16:16:43,071 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using job/cluster config to configure application-defined state backend: File State Backend (checkpoints: 'hdfs://qccnn1/seatunnel/checkpoint', savepoints: 'null, fileStateThreshold: -1)
2023-12-01 16:16:43,071 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using job/cluster config to configure application-defined state backend: File State Backend (checkpoints: 'hdfs://qccnn1/seatunnel/checkpoint', savepoints: 'null, fileStateThreshold: -1)
2023-12-01 16:16:43,071 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using job/cluster config to configure application-defined state backend: File State Backend (checkpoints: 'hdfs://qccnn1/seatunnel/checkpoint', savepoints: 'null, fileStateThreshold: -1)
2023-12-01 16:16:43,073 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using application-defined state backend: File State Backend (checkpoints: 'hdfs://qccnn1/seatunnel/checkpoint', savepoints: 'hdfs://qccnn1/data/flink/flink-savepoints, fileStateThreshold: 20480)
2023-12-01 16:16:43,073 INFO org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader loads the state backend as FsStateBackend
2023-12-01 16:16:43,073 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using application-defined state backend: File State Backend (checkpoints: 'hdfs://qccnn1/seatunnel/checkpoint', savepoints: 'hdfs://qccnn1/data/flink/flink-savepoints, fileStateThreshold: 20480)
2023-12-01 16:16:43,077 INFO org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader loads the state backend as FsStateBackend
2023-12-01 16:16:43,077 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using application-defined state backend: File State Backend (checkpoints: 'hdfs://qccnn1/seatunnel/checkpoint', savepoints: 'hdfs://qccnn1/data/flink/flink-savepoints, fileStateThreshold: 20480)
2023-12-01 16:16:43,077 INFO org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader loads the state backend as FsStateBackend
2023-12-01 16:16:43,077 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using legacy state backend File State Backend (checkpoints: 'hdfs://qccnn1/seatunnel/checkpoint', savepoints: 'hdfs://qccnn1/data/flink/flink-savepoints, fileStateThreshold: 20480) as Job checkpoint storage
2023-12-01 16:16:43,077 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using legacy state backend File State Backend (checkpoints: 'hdfs://qccnn1/seatunnel/checkpoint', savepoints: 'hdfs://qccnn1/data/flink/flink-savepoints, fileStateThreshold: 20480) as Job checkpoint storage
2023-12-01 16:16:43,077 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Using legacy state backend File State Backend (checkpoints: 'hdfs://qccnn1/seatunnel/checkpoint', savepoints: 'hdfs://qccnn1/data/flink/flink-savepoints, fileStateThreshold: 20480) as Job checkpoint storage
2023-12-01 16:16:43,077 WARN org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage passed via StreamExecutionEnvironment is ignored because legacy state backend 'org.apache.flink.runtime.state.filesystem.FsStateBackend' is used. Legacy state backends can also be used as checkpoint storage and take precedence for backward-compatibility reasons.
2023-12-01 16:16:43,078 WARN org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage passed via StreamExecutionEnvironment is ignored because legacy state backend 'org.apache.flink.runtime.state.filesystem.FsStateBackend' is used. Legacy state backends can also be used as checkpoint storage and take precedence for backward-compatibility reasons.
2023-12-01 16:16:43,078 WARN org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage passed via StreamExecutionEnvironment is ignored because legacy state backend 'org.apache.flink.runtime.state.filesystem.FsStateBackend' is used. Legacy state backends can also be used as checkpoint storage and take precedence for backward-compatibility reasons.
2023-12-01 16:16:43,096 INFO org.apache.flink.runtime.taskmanager.Task [] - HdfsFile: Global Committer (1/1)#0 (38611013dac2ea15e318f9075140cc5b) switched from DEPLOYING to INITIALIZING.
2023-12-01 16:16:43,096 INFO org.apache.flink.runtime.taskmanager.Task [] - HdfsFile: Writer -> HdfsFile: Committer (1/1)#0 (2168727360fe943a1150ed5493b671cc) switched from DEPLOYING to INITIALIZING.
2023-12-01 16:16:43,097 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: SeaTunnel JdbcSource -> kafka_topic_usage[1] -> Calc[2] -> SinkConversion[3] -> Flat Map -> kafka_topic_usage1[4] -> SinkConversion[5] (1/1)#0 (0199df6a286aa880dd7ffdd7b3ceed06) switched from DEPLOYING to INITIALIZING.
2023-12-01 16:16:43,370 INFO org.apache.flink.runtime.taskmanager.Task [] - HdfsFile: Global Committer (1/1)#0 (38611013dac2ea15e318f9075140cc5b) switched from INITIALIZING to RUNNING.
2023-12-01 16:16:43,370 INFO org.apache.flink.runtime.taskmanager.Task [] - HdfsFile: Writer -> HdfsFile: Committer (1/1)#0 (2168727360fe943a1150ed5493b671cc) switched from INITIALIZING to RUNNING.
2023-12-01 16:16:43,578 INFO org.apache.seatunnel.translation.flink.source.BaseSeaTunnelSourceFunction [] - Consumer subtask 0 has no restore state.
2023-12-01 16:16:44,137 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: SeaTunnel JdbcSource -> kafka_topic_usage[1] -> Calc[2] -> SinkConversion[3] -> Flat Map -> kafka_topic_usage1[4] -> SinkConversion[5] (1/1)#0 (0199df6a286aa880dd7ffdd7b3ceed06) switched from INITIALIZING to RUNNING.
2023-12-01 16:16:44,143 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator [] - Starting to calculate splits.
2023-12-01 16:16:44,144 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator [] - Assigning JdbcSourceSplit(parameterValues=[1, 316202], splitId=0) to 0 reader.
2023-12-01 16:16:44,145 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator [] - Assigning JdbcSourceSplit(parameterValues=[316203, 632404], splitId=1) to 0 reader.
2023-12-01 16:16:44,145 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator [] - Assigning JdbcSourceSplit(parameterValues=[632405, 948606], splitId=2) to 0 reader.
2023-12-01 16:16:44,145 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator [] - Assigning JdbcSourceSplit(parameterValues=[948607, 1264808], splitId=3) to 0 reader.
2023-12-01 16:16:44,145 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplitEnumerator [] - Assign splits [JdbcSourceSplit(parameterValues=[1, 316202], splitId=0), JdbcSourceSplit(parameterValues=[316203, 632404], splitId=1), JdbcSourceSplit(parameterValues=[632405, 948606], splitId=2), JdbcSourceSplit(parameterValues=[948607, 1264808], splitId=3)] to reader 0
2023-12-01 16:16:45,150 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcInputFormat [] - Executing SQL: 'com.mysql.cj.jdbc.ClientPreparedStatement: /** */ SELECT * FROM (select * from kafka_topic_usage) tt where id >= 1 AND id <= 316202'
2023-12-01 16:16:45,370 WARN org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils [] - Principal [null] or keytabPath [null] is empty, it will skip kerberos authentication
2023-12-01 16:16:45,384 INFO org.apache.seatunnel.shade.connector.file.org.apache.orc.impl.MemoryManagerImpl [] - orc.rows.between.memory.checks=5000
2023-12-01 16:16:45,427 INFO org.apache.seatunnel.shade.connector.file.org.apache.orc.impl.PhysicalFsWriter [] - ORC writer created for path: /tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc with stripeSize: 67108864 blockSize: 268435456 compression: NONE bufferSize: 262144
2023-12-01 16:16:45,537 INFO org.apache.seatunnel.shade.connector.file.org.apache.orc.impl.WriterImpl [] - ORC writer created for path: /tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc with stripeSize: 67108864 blockSize: 268435456 compression: NONE bufferSize: 262144
2023-12-01 16:17:01,121 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcInputFormat [] - Executing SQL: 'com.mysql.cj.jdbc.ClientPreparedStatement: /** */ SELECT * FROM (select * from kafka_topic_usage) tt where id >= 316203 AND id <= 632404'
2023-12-01 16:17:01,723 INFO org.apache.seatunnel.shade.connector.file.org.apache.orc.impl.PhysicalFsWriter [] - ORC writer created for path: /tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc with stripeSize: 67108864 blockSize: 268435456 compression: NONE bufferSize: 262144
2023-12-01 16:17:01,728 INFO org.apache.seatunnel.shade.connector.file.org.apache.orc.impl.WriterImpl [] - ORC writer created for path: /tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc with stripeSize: 67108864 blockSize: 268435456 compression: NONE bufferSize: 262144
2023-12-01 16:17:10,968 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcInputFormat [] - Executing SQL: 'com.mysql.cj.jdbc.ClientPreparedStatement: /** */ SELECT * FROM (select * from kafka_topic_usage) tt where id >= 632405 AND id <= 948606'
2023-12-01 16:17:11,274 INFO org.apache.seatunnel.shade.connector.file.org.apache.orc.impl.PhysicalFsWriter [] - ORC writer created for path: /tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc with stripeSize: 67108864 blockSize: 268435456 compression: NONE bufferSize: 262144
2023-12-01 16:17:11,278 INFO org.apache.seatunnel.shade.connector.file.org.apache.orc.impl.WriterImpl [] - ORC writer created for path: /tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc with stripeSize: 67108864 blockSize: 268435456 compression: NONE bufferSize: 262144
2023-12-01 16:17:11,411 WARN org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils [] - Principal [null] or keytabPath [null] is empty, it will skip kerberos authentication
2023-12-01 16:17:11,450 INFO org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils [] - begin rename file oldName :[/tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] to newName :[/seatinnel/hive/kafka_topic_usage/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc]
2023-12-01 16:17:11,468 INFO org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils [] - rename file :[/tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] to [/seatinnel/hive/kafka_topic_usage/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] finish
2023-12-01 16:17:18,216 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcInputFormat [] - Executing SQL: 'com.mysql.cj.jdbc.ClientPreparedStatement: /** */ SELECT * FROM (select * from kafka_topic_usage) tt where id >= 948607 AND id <= 1264808'
2023-12-01 16:17:18,480 INFO org.apache.seatunnel.shade.connector.file.org.apache.orc.impl.PhysicalFsWriter [] - ORC writer created for path: /tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc with stripeSize: 67108864 blockSize: 268435456 compression: NONE bufferSize: 262144
2023-12-01 16:17:18,483 INFO org.apache.seatunnel.shade.connector.file.org.apache.orc.impl.WriterImpl [] - ORC writer created for path: /tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc with stripeSize: 67108864 blockSize: 268435456 compression: NONE bufferSize: 262144
2023-12-01 16:17:18,531 INFO org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils [] - begin rename file oldName :[/tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] to newName :[/seatinnel/hive/kafka_topic_usage/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc]
2023-12-01 16:17:18,536 INFO org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils [] - Delete already file: /seatinnel/hive/kafka_topic_usage/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc
2023-12-01 16:17:18,538 INFO org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils [] - rename file :[/tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] to [/seatinnel/hive/kafka_topic_usage/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] finish
2023-12-01 16:17:23,087 INFO org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceReader [] - Closed the bounded jdbc source
2023-12-01 16:17:23,395 INFO org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils [] - begin rename file oldName :[/tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] to newName :[/seatinnel/hive/kafka_topic_usage/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc]
2023-12-01 16:17:23,396 WARN org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils [] - rename file :[/tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] to [/seatinnel/hive/kafka_topic_usage/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] already finished in the last commit, skip
2023-12-01 16:17:29,109 INFO org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils [] - begin rename file oldName :[/tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] to newName :[/seatinnel/hive/kafka_topic_usage/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc]
2023-12-01 16:17:29,110 WARN org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils [] - rename file :[/tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] to [/seatinnel/hive/kafka_topic_usage/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] already finished in the last commit, skip
2023-12-01 16:17:39,118 INFO org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils [] - begin rename file oldName :[/tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] to newName :[/seatinnel/hive/kafka_topic_usage/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc]
2023-12-01 16:17:39,129 WARN org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils [] - rename file :[/tmp/seatunnel/seatunnel/9e6c860bdcaa4eed8668ee20a94b3a0f/304b7e01ff/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1/NON_PARTITION/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] to [/seatinnel/hive/kafka_topic_usage/T_9e6c860bdcaa4eed8668ee20a94b3a0f_304b7e01ff_0_1_0.orc] already finished in the last commit, skip
2023-12-01 16:17:39,133 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: SeaTunnel JdbcSource -> kafka_topic_usage[1] -> Calc[2] -> SinkConversion[3] -> Flat Map -> kafka_topic_usage1[4] -> SinkConversion[5] (1/1)#0 (0199df6a286aa880dd7ffdd7b3ceed06) switched from RUNNING to FINISHED.
2023-12-01 16:17:39,133 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: SeaTunnel JdbcSource -> kafka_topic_usage[1] -> Calc[2] -> SinkConversion[3] -> Flat Map -> kafka_topic_usage1[4] -> SinkConversion[5] (1/1)#0 (0199df6a286aa880dd7ffdd7b3ceed06).
2023-12-01 16:17:39,134 INFO org.apache.flink.runtime.taskmanager.Task [] - HdfsFile: Writer -> HdfsFile: Committer (1/1)#0 (2168727360fe943a1150ed5493b671cc) switched from RUNNING to FINISHED.
2023-12-01 16:17:39,134 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for HdfsFile: Writer -> HdfsFile: Committer (1/1)#0 (2168727360fe943a1150ed5493b671cc).
2023-12-01 16:17:39,134 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FINISHED to JobManager for task Source: SeaTunnel JdbcSource -> kafka_topic_usage[1] -> Calc[2] -> SinkConversion[3] -> Flat Map -> kafka_topic_usage1[4] -> SinkConversion[5] (1/1)#0 0199df6a286aa880dd7ffdd7b3ceed06.
2023-12-01 16:17:39,139 INFO org.apache.flink.runtime.taskmanager.Task [] - HdfsFile: Global Committer (1/1)#0 (38611013dac2ea15e318f9075140cc5b) switched from RUNNING to FINISHED.
2023-12-01 16:17:39,140 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for HdfsFile: Global Committer (1/1)#0 (38611013dac2ea15e318f9075140cc5b).
2023-12-01 16:17:39,141 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FINISHED to JobManager for task HdfsFile: Writer -> HdfsFile: Committer (1/1)#0 2168727360fe943a1150ed5493b671cc.
2023-12-01 16:17:39,145 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FINISHED to JobManager for task HdfsFile: Global Committer (1/1)#0 38611013dac2ea15e318f9075140cc5b.
2023-12-01 16:17:39,428 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1, taskHeapMemory=384.000mb (402653174 bytes), taskOffHeapMemory=0 bytes, managedMemory=512.000mb (536870920 bytes), networkMemory=128.000mb (134217730 bytes)}, allocationId: 518fb36db8369eb5982aee49191d309c, jobId: 1ec94935106d4b54972ef75dc58ffc53).
2023-12-01 16:17:39,431 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 1ec94935106d4b54972ef75dc58ffc53 from job leader monitoring.
2023-12-01 16:17:39,431 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
2023-12-01 16:17:39,431 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - Closing ZookeeperLeaderRetrievalDriver{connectionInformationPath='/1ec94935106d4b54972ef75dc58ffc53/connection_info'}.
2023-12-01 16:17:39,440 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager connection for job 1ec94935106d4b54972ef75dc58ffc53.
2023-12-01 16:17:39,749 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close ResourceManager connection 02e09c839313dde08c0491114a5e467a.
2023-12-01 16:17:39,770 INFO org.apache.flink.yarn.YarnTaskExecutorRunner [] - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
2023-12-01 16:17:39,771 INFO org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager [] - Shutting down TaskExecutorStateChangelogStoragesManager.
2023-12-01 16:17:39,771 INFO org.apache.flink.runtime.blob.PermanentBlobCache [] - Shutting down BLOB cache
2023-12-01 16:17:39,771 INFO org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - Shutting down TaskExecutorLocalStateStoresManager.
2023-12-01 16:17:39,772 INFO org.apache.flink.runtime.filecache.FileCache [] - removed file cache directory /tmp/flink/taskmanager/flink-dist-cache-027fd2a2-1b5f-4212-992b-f3f02d133769
2023-12-01 16:17:39,772 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Stopping TaskExecutor akka.tcp://[email protected]:3380/user/rpc/taskmanager_0.
2023-12-01 16:17:39,772 INFO org.apache.flink.runtime.blob.TransientBlobCache [] - Shutting down BLOB cache
2023-12-01 16:17:39,774 INFO org.apache.flink.runtime.io.disk.FileChannelManagerImpl [] - FileChannelManager removed spill file directory /tmp/flink/taskmanager/flink-netty-shuffle-0d3ef331-6cce-4767-99af-b5679e334b33
2023-12-01 16:17:39,774 INFO org.apache.flink.runtime.io.disk.FileChannelManagerImpl [] - FileChannelManager removed spill file directory /tmp/flink/taskmanager/flink-io-ef24d118-a9c7-427b-bc9a-f24cfffc6e52
2023-12-01 16:17:39,776 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Stop job leader service.
2023-12-01 16:17:39,776 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
2023-12-01 16:17:39,776 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - Closing ZookeeperLeaderRetrievalDriver{connectionInformationPath='/resource_manager/connection_info'}.
2023-12-01 16:17:39,776 INFO org.apache.flink.runtime.io.network.NettyShuffleEnvironment [] - Shutting down the network environment and its components.
2023-12-01 16:17:39,777 INFO org.apache.flink.runtime.io.network.netty.NettyClient [] - Successful shutdown (took 0 ms).
2023-12-01 16:17:39,779 INFO org.apache.flink.runtime.io.network.netty.NettyServer [] - Successful shutdown (took 1 ms).
2023-12-01 16:17:39,780 INFO org.apache.flink.runtime.taskexecutor.KvStateService [] - Shutting down the kvState service and its components.
2023-12-01 16:17:39,780 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Stop job leader service.
2023-12-01 16:17:39,780 INFO org.apache.flink.runtime.filecache.FileCache [] - removed file cache directory /tmp/flink/taskmanager/flink-dist-cache-027fd2a2-1b5f-4212-992b-f3f02d133769
2023-12-01 16:17:39,781 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Stopped TaskExecutor akka.tcp://[email protected]:3380/user/rpc/taskmanager_0.
2023-12-01 16:17:39,786 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Stopping Akka RPC service.
2023-12-01 16:17:39,790 INFO org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl [] - backgroundOperationsLoop exiting
2023-12-01 16:17:39,811 WARN akka.actor.CoordinatedShutdown [] - Could not addJvmShutdownHook, due to: Shutdown in progress
2023-12-01 16:17:39,828 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting down remote daemon.
2023-12-01 16:17:39,830 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remote daemon shut down; proceeding with flushing remote transports.
2023-12-01 16:17:39,854 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remoting shut down.
2023-12-01 16:17:39,871 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Stopped Akka RPC service.
2023-12-01 16:17:39,895 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Session: 0x105fcdefdb232c0 closed
2023-12-01 16:17:39,895 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - EventThread shut down for session: 0x105fcdefdb232c0
2023-12-01 16:17:39,895 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Stopping Akka RPC service.
2023-12-01 16:17:39,899 WARN akka.actor.CoordinatedShutdown [] - Could not addJvmShutdownHook, due to: Shutdown in progress
2023-12-01 16:17:39,902 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting down remote daemon.
2023-12-01 16:17:39,902 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remote daemon shut down; proceeding with flushing remote transports.
2023-12-01 16:17:39,908 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remoting shut down.
2023-12-01 16:17:39,929 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Stopped Akka RPC service.
2023-12-01 16:17:39,929 INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Terminating TaskManagerRunner with exit code 1
### Zeta or Flink or Spark Version
1.15.4
### Java or Scala Version
1.8
### Screenshots
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
mysql table DDL and data volume?
mysql table DDL and data volume?
@zhilinli123 I found it is the same problem as the problem mentioned in issue https://github.com/apache/seatunnel/issues/5299. Because "FlinkSinkWriter.snapshotState()" never be called, so "checkpointId" never ++, and the generated fileName was always the same , so the previous file was overwritten by the latter.
And I encountered this problem using seatunnel + flink-1.15, but after I changed to seatunnel + flink-1.14 , every thing was ok.
mysql table DDL and data volume?
@zhilinli123 I found it is the same problem as the problem mentioned in issue #5299. Because "FlinkSinkWriter.snapshotState()" never be called, so "checkpointId" never ++, and the generated fileName was always the same , so the previous file was overwritten by the latter.
And I encountered this problem using seatunnel + flink-1.15, but after I changed to seatunnel + flink-1.14 , every thing was ok.
Let me look at the problem
Configuring jdbc as above is itself a batch mode and there is no need to run it in streaming mode @zhaoli2333
env {
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "root"
password = "12345678"
table_path = "test.ogg"
#split.size = 8096
#split.even-distribution.factor.upper-bound = 100
#split.even-distribution.factor.lower-bound = 0.05
#split.sample-sharding.threshold = 1000
#split.inverse-sampling.rate = 1000
}
}
sink {
HdfsFile {
fs.defaultFS = "hdfs://localhost:9000"
path = "/user/mac/seatunnel"
}
}
Configuring jdbc as above is itself a batch mode and there is no need to run it in streaming mode @zhaoli2333
env { job.mode = "BATCH" } source { Jdbc { url = "jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2b8" driver = "com.mysql.cj.jdbc.Driver" connection_check_timeout_sec = 100 user = "root" password = "12345678" table_path = "test.ogg" #split.size = 8096 #split.even-distribution.factor.upper-bound = 100 #split.even-distribution.factor.lower-bound = 0.05 #split.sample-sharding.threshold = 1000 #split.inverse-sampling.rate = 1000 } } sink { HdfsFile { fs.defaultFS = "hdfs://localhost:9000" path = "/user/mac/seatunnel" } }
@zhilinli123 BATCH mode is ok cause it doesn't trigger any checkpoint. But there is a bug when using STREAMING mode. When using flink engine, BATCH mode is inefficient and unrecoverable, so we must use STREAMING mode. For now, we have switched to Flink 1.14 to bypass this issue.
我遇到同样的文件删除重新创建问题,但是,我的source是kafka,只能使用STREAMING模式,而不能使用batch,请问我需要怎么处理才能解决
@czlh use flink 1.14