clickhouse-java icon indicating copy to clipboard operation
clickhouse-java copied to clipboard

Writing data to Clickhouse(20.6.3.28,22.8.4.7) using Flink(1.14.5) causes data duplication

Open 2309859381 opened this issue 3 years ago • 2 comments

JDBC 0.3.2+ can cause such problems. this is my code ` public void createConnection() throws Exception {

    String insertStr = insertStr(this.tableColums);
    String url = "jdbc:clickhouse://" + host + "?socket_timeout=300000";
    Connection connection = DriverManager.getConnection(url, this.username, this.password);
    this.statement = connection.createStatement();
    PreparedStatement preparedStatement = connection.prepareStatement(insertStr);
    //connection.setAutoCommit(false);
    this.preparedStatement = preparedStatement;
    this.connection = connection;
}

@Override
public void open(Configuration parameters) throws Exception {
    
    String drivername = "ru.yandex.clickhouse.ClickHouseDriver";
    Class.forName(drivername);
    lastInsertTime = System.currentTimeMillis();
    
    createConnection();
}

@Override
public void invoke(Row row, Context context) {
    try {
        if (list.size() >= this.insertCkBatchSize || this.isTimeToDoInsert()) {
            list.add(row);
            insertData(list, preparedStatement, connection);
            list.clear();
            this.lastInsertTime = System.currentTimeMillis();
        } else {
            list.add(row);
        }
    } catch (Exception e) {
    }
}

@Override
public void close() throws Exception {
    if (null != statement) {
        statement.close();
    }

    if (null != preparedStatement) {
        preparedStatement.close();
    }
    if (null != connection) {
        connection.close();
    }
}

public void insertData(List<Row> rows, PreparedStatement preparedStatement, Connection connection) throws SQLException { for (Row row : rows) { for (int j = 0; j < this.tableColums.length; ++j) { if (null != row.getField(j)) { preparedStatement.setObject(j + 1, row.getField(j)); } else { preparedStatement.setObject(j + 1, "null"); } } preparedStatement.addBatch(); } preparedStatement.executeBatch(); connection.commit(); preparedStatement.clearBatch(); } ` I don't have time to delve into this right now, so I just downgraded the JDBC version(0.3.1) to fix the problem. When I finished my work I went back and tried to find out why.

2309859381 avatar Sep 08 '22 09:09 2309859381

Thanks for the report @2309859381. Do you mind to share the batch size and table structure?

zhicwu avatar Sep 08 '22 11:09 zhicwu

Thanks for the report @2309859381. Do you mind to share the batch size and table structure?

batchsize=5000

`create table if not exists database.tbname on cluster clickhouse_7shards_2replicas ( fields String , fields String , fields String , fields Int64 , fields Int64 , fields String , fields String , fields String , fields String , fields int , fields int , fields String , fields String ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/database.tbname', '{replica}') PARTITION BY (fields, fields) primary key (fields) ORDER BY (fields,fields,fields) SETTINGS index_granularity = 10240;

CREATE TABLE database.tbname ON CLUSTER clickhouse_7shards_2replicas AS database.tbname ENGINE = Distributed(clickhouse_7shards_2replicas,database,tbname,rand()); ` Excuse me, I need to desensitize the data, so the table structure looks strange

2309859381 avatar Sep 09 '22 06:09 2309859381