doris icon indicating copy to clipboard operation
doris copied to clipboard

Flink Doris Integration (not able to create Doris catalog)

Open vshinde-medacist opened this issue 1 year ago • 2 comments

Hi Team,

Seeking advice on below.

We are conducting a PoC and are in the process of evaluating Flink and Doris integration using the below versions and dependencies.

  • Flink-1.18.1
  • Doris-2.0.4
  • All the dependencies have been added to POM file

Doris is up and running successfully (set up by following Quick Start from official Doris documentation). Able to access FE using - http://10.0.2.15:8030

Below is the PoC code snippet (Java maven project) being used to read the CSV file as Flink FileSystem source and insert it into Doris table as a sink

import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.table.catalog.*;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class FlinkDorisIntegration {
    public static void main(String[] args) throws Exception {

        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:///home/user/Documents/app_data/checkpoint/");
	StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
		
		// Flink FileSystem CSV format source table in default in-memory catalog
		tableEnv.executeSql("create table IF NOT EXISTS `default_catalog`.`default_database`.`fs_src`(station string, txdate string, txtime string, txseq string)" +
				" with (\n" +
				"'connector' = 'filesystem',\n" +
				"'path'      = 'file:///home/user/Documents/app_data/source/',\n" +
				"'format'    = 'csv',\n" +
				"'csv.ignore-parse-errors' = 'true',\n" +
				"'csv.allow-comments' = 'true',\n" +
				"'source.monitor-interval' = '1s'\n" +
				");");
				
   	        // Create Doris Catalog (Not able to execute this, throwing an exception as illegalargumentexception )
		tableEnv.executeSql("CREATE CATALOG demo_catalog WITH('type' = 'jdbc', 'default-database' = 'db_test', 'username' = 'root', 'password' = '', 'base-url' = 'jdbc:mysql://10.0.2.15:9030')");
		 
	        // Switch to Doris Catalog
		tableEnv.executeSql("USE CATALOG demo_catalog;");
		
		//Create Doris Sink table
		tableEnv.executeSql("CREATE TABLE IF NOT EXISTS db_test.flink_doris_sink (station varchar, txdate varchar, txtime varchar, txseq varchar) " +
                " with (\n" +
                "'connector' = 'doris',\n" +
                "'fenodes'      = '10.0.2.15:8030',\n" +
                "'table.identifier'    = 'db_test.flink_doris_sink',\n" +
                "'username' = 'root',\n" +
                "'password' = '',\n" +
                "'sink.label-prefix' = 'doris_label'\n" +
                ");");
				
		// Insert into Doris table
		tableEnv.executeSql("INSERT INTO demo_catalog.db_test.flink_doris_sink " +
                " (station, txdate, txtime, txseq)" +
                " SELECT " +
                " station, txdate, txtime, txseq " +
                " FROM `default_catalog`.`default_database`.fs_src; ");

		env.execute();
	}
}

To access Doris table, I need to switch to Doris catalog but getting illegalargumentexception exception while trying to create Doris catalog from Flink. Running code from Intellij Idea and not as a job by submitting a jar to Flink cluster.

Can someone please help with the below:

  • Is the approach used in the code snippet the right way to access Doris (create catalog, tables and insert)?
  • How to create Doris catalog/table from Flink and access it (any sample code or documentation would be helpful)
  • Any prerequisites have been missed?

vshinde-medacist avatar Feb 22 '24 07:02 vshinde-medacist

create catalog syntax is incorrect. WITH should be PROPERTIES you need to specify jdbc_url & driver_url

something like this CREATE CATALOG demo_catalog PROPERTIES('type' = 'jdbc', 'default-database' = 'db_test', 'username' = 'root', 'password' = '', 'jdbc_url' = 'jdbc:mysql://10.0.2.15:9030', 'driver_url' = 'mysql-connector-java-8.0.25.jar');

refer to this for details https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-CATALOG/ https://doris.apache.org/docs/dev/lakehouse/multi-catalog/jdbc/

rohitrs1983 avatar Feb 22 '24 09:02 rohitrs1983

@rohitrs1983 - thanks for the suggestion, I tried your suggestion, getting the SQL parser exception as below:

Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "PROPERTIES" at line 1, column 21.
Was expecting one of:
    <EOF> 
    "WITH" ...
    ";" ...

I think - how Flink SQL parser supposed to know Doris specific DDL!

vshinde-medacist avatar Feb 22 '24 11:02 vshinde-medacist