Writing data to Clickhouse(20.6.3.28,22.8.4.7) using Flink(1.14.5) causes data duplication
JDBC 0.3.2+ can cause such problems. this is my code ` public void createConnection() throws Exception {
String insertStr = insertStr(this.tableColums);
String url = "jdbc:clickhouse://" + host + "?socket_timeout=300000";
Connection connection = DriverManager.getConnection(url, this.username, this.password);
this.statement = connection.createStatement();
PreparedStatement preparedStatement = connection.prepareStatement(insertStr);
//connection.setAutoCommit(false);
this.preparedStatement = preparedStatement;
this.connection = connection;
}
@Override
public void open(Configuration parameters) throws Exception {
String drivername = "ru.yandex.clickhouse.ClickHouseDriver";
Class.forName(drivername);
lastInsertTime = System.currentTimeMillis();
createConnection();
}
@Override
public void invoke(Row row, Context context) {
try {
if (list.size() >= this.insertCkBatchSize || this.isTimeToDoInsert()) {
list.add(row);
insertData(list, preparedStatement, connection);
list.clear();
this.lastInsertTime = System.currentTimeMillis();
} else {
list.add(row);
}
} catch (Exception e) {
}
}
@Override
public void close() throws Exception {
if (null != statement) {
statement.close();
}
if (null != preparedStatement) {
preparedStatement.close();
}
if (null != connection) {
connection.close();
}
}
public void insertData(List<Row> rows, PreparedStatement preparedStatement, Connection connection) throws SQLException { for (Row row : rows) { for (int j = 0; j < this.tableColums.length; ++j) { if (null != row.getField(j)) { preparedStatement.setObject(j + 1, row.getField(j)); } else { preparedStatement.setObject(j + 1, "null"); } } preparedStatement.addBatch(); } preparedStatement.executeBatch(); connection.commit(); preparedStatement.clearBatch(); } ` I don't have time to delve into this right now, so I just downgraded the JDBC version(0.3.1) to fix the problem. When I finished my work I went back and tried to find out why.
Thanks for the report @2309859381. Do you mind to share the batch size and table structure?
Thanks for the report @2309859381. Do you mind to share the batch size and table structure?
batchsize=5000
`create table if not exists database.tbname on cluster clickhouse_7shards_2replicas ( fields String , fields String , fields String , fields Int64 , fields Int64 , fields String , fields String , fields String , fields String , fields int , fields int , fields String , fields String ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/database.tbname', '{replica}') PARTITION BY (fields, fields) primary key (fields) ORDER BY (fields,fields,fields) SETTINGS index_granularity = 10240;
CREATE TABLE database.tbname ON CLUSTER clickhouse_7shards_2replicas AS database.tbname ENGINE = Distributed(clickhouse_7shards_2replicas,database,tbname,rand()); ` Excuse me, I need to desensitize the data, so the table structure looks strange