Flink Doris Integration (not able to create Doris catalog)
Hi Team,
Seeking advice on below.
We are conducting a PoC and are in the process of evaluating Flink and Doris integration using the below versions and dependencies.
- Flink-1.18.1
- Doris-2.0.4
- All the dependencies have been added to POM file
Doris is up and running successfully (set up by following Quick Start from official Doris documentation). Able to access FE using - http://10.0.2.15:8030
Below is the PoC code snippet (Java maven project) being used to read the CSV file as Flink FileSystem source and insert it into Doris table as a sink
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.table.catalog.*;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class FlinkDorisIntegration {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///home/user/Documents/app_data/checkpoint/");
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Flink FileSystem CSV format source table in default in-memory catalog
tableEnv.executeSql("create table IF NOT EXISTS `default_catalog`.`default_database`.`fs_src`(station string, txdate string, txtime string, txseq string)" +
" with (\n" +
"'connector' = 'filesystem',\n" +
"'path' = 'file:///home/user/Documents/app_data/source/',\n" +
"'format' = 'csv',\n" +
"'csv.ignore-parse-errors' = 'true',\n" +
"'csv.allow-comments' = 'true',\n" +
"'source.monitor-interval' = '1s'\n" +
");");
// Create Doris Catalog (Not able to execute this, throwing an exception as illegalargumentexception )
tableEnv.executeSql("CREATE CATALOG demo_catalog WITH('type' = 'jdbc', 'default-database' = 'db_test', 'username' = 'root', 'password' = '', 'base-url' = 'jdbc:mysql://10.0.2.15:9030')");
// Switch to Doris Catalog
tableEnv.executeSql("USE CATALOG demo_catalog;");
//Create Doris Sink table
tableEnv.executeSql("CREATE TABLE IF NOT EXISTS db_test.flink_doris_sink (station varchar, txdate varchar, txtime varchar, txseq varchar) " +
" with (\n" +
"'connector' = 'doris',\n" +
"'fenodes' = '10.0.2.15:8030',\n" +
"'table.identifier' = 'db_test.flink_doris_sink',\n" +
"'username' = 'root',\n" +
"'password' = '',\n" +
"'sink.label-prefix' = 'doris_label'\n" +
");");
// Insert into Doris table
tableEnv.executeSql("INSERT INTO demo_catalog.db_test.flink_doris_sink " +
" (station, txdate, txtime, txseq)" +
" SELECT " +
" station, txdate, txtime, txseq " +
" FROM `default_catalog`.`default_database`.fs_src; ");
env.execute();
}
}
To access Doris table, I need to switch to Doris catalog but getting illegalargumentexception exception while trying to create Doris catalog from Flink. Running code from Intellij Idea and not as a job by submitting a jar to Flink cluster.
Can someone please help with the below:
- Is the approach used in the code snippet the right way to access Doris (create catalog, tables and insert)?
- How to create Doris catalog/table from Flink and access it (any sample code or documentation would be helpful)
- Any prerequisites have been missed?
create catalog syntax is incorrect. WITH should be PROPERTIES you need to specify jdbc_url & driver_url
something like this CREATE CATALOG demo_catalog PROPERTIES('type' = 'jdbc', 'default-database' = 'db_test', 'username' = 'root', 'password' = '', 'jdbc_url' = 'jdbc:mysql://10.0.2.15:9030', 'driver_url' = 'mysql-connector-java-8.0.25.jar');
refer to this for details https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-CATALOG/ https://doris.apache.org/docs/dev/lakehouse/multi-catalog/jdbc/
@rohitrs1983 - thanks for the suggestion, I tried your suggestion, getting the SQL parser exception as below:
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "PROPERTIES" at line 1, column 21.
Was expecting one of:
<EOF>
"WITH" ...
";" ...
I think - how Flink SQL parser supposed to know Doris specific DDL!