WeDataSphere icon indicating copy to clipboard operation
WeDataSphere copied to clipboard

【有奖征文】linkis与SQL中间件(跨数据源混查)结合实践分享

Open libailin opened this issue 5 years ago • 1 comments

标题:linkis与SQL中间件(跨数据源混查)结合实践分享

背景需求

-业务需求:

在公司有很多运营、数据分析的童鞋,虽然自有的BI产品功能丰富,有各式各样的定制化分析报表、各种维度的图表。 但是有时需要临时查询数据,发现数据是分布在不同数据源里的,也有可能数据来自不同业务的不同集群。 比如查询hive表的数据,但是维度映射数据在mysql里。还有要两个mysql数据库(不同服务器)关联查询等情况, 平时这些需求都需要程序猿大神们写程序实现。

现在可以通过Linkis与SQL中间件完美结合满足上述需求。难道不香吗?

-技术迭代:

之前使用的Apache Livy作为SQL执行入口,把查询的请求都提交给Livy,但是体验上不尽人意。 一直想找个替代组件, 后来发现功能强大的linkis。至于Linkis跟Apache Livy的对比,可以查看官方相关文档。

SQL中间件介绍:

SQL中间件是基于公司目前开源的XSQL和Quicksql两款SQL中间件,两种都支持跨数据源混查,两个都很优秀, 至于大家选择集成哪个可以根据自身情况决定。因为之前使用过XSQL,所以是在linkis增加了xsql查询引擎。

以下分别简单介绍下两款开源组件:

XSQL:

XSQL是一款低门槛、更稳定的分布式查询引擎。它允许你快速、近实时地查询大量数据。对于一些数据源(例如:Elasticsearch、MongoDB、Druid等),他可以大幅地降低学习曲线,并节省人力成本。除Hive外,每种数据源都支持除子查询外的下推执行优化。用户有时希望将位于不同数据源上的数据关联起来进行查询,但是由于各种数据源是异构的且一些数据源不支持SQL或者支持的SQL语法非常有限,因此传统互联网公司的做法是,将不同的数据同步到统一的存储介质中,再进行OLAP的查询。数据同步的过程中可能面临数据迁移、主从同步、网络带宽等诸多困难和挑战,而且需要浪费大量的人力、物力及时间,无法满足大数据产品当前阶段对于近实时甚至准实时的场景。通过XSQL你将可以避免数据迁移和时间浪费,更加专注于业务本身。XSQL可以通过下推、并行计算、迭代计算等底层支撑技术,对各种数据源的查询加速。

功能特性:

  • 内置8种数据源,包括:Hive、Mysql、EleasticSearch、Mongo、Kafka、Hbase、Redis、Druid等。
  • XSQL采用数据源(DataSource)、数据库(Database)、数据表(Table)的三层元信息,为异构数据源提供了统一视图,进而实现了跨数据源的数据关联
  • SQL Everything,将程序与数据源具体版本解耦,程序迁移能力得到加强
  • 对DDL、DML、可下推查询,延迟与Yarn的交互及资源申请,进而提升效率并节省资源。
  • 相比很多开源分布式查询引擎,XSQL替换了Spark SQL,因而只需要一次SQL解析,避免多次解析带来的时延。
  • XSQL允许用户将聚合、过滤、投影等操作下推至数据源计算引擎,相比DataSet API更容易实现毫秒级响应。
  • XSQL借鉴了业内优秀的开源项目,放弃元数据的中心化,因此避免了数据同步、数据不一致,数据延迟等不利因素。XSQL也因此在部署上更加轻量、简便。
  • XSQL对元数据的缓存有两种级别,既能减少对底层数据源的压力,也提升了XSQL的执行效率。
  • XSQL可以按照用户需要,设置元数据白名单来避免缓存多余的元信息,进一步提升执行效率。
  • 可适配到Spark 2.x任意版本,解压即可运行,不需要引入额外依赖。且与原生SparkSQL隔离运行,不影响现有程序运行

111

Quicksql:

Quicksql是一款跨计算引擎的统一联邦查询中间件,用户可以使用标准SQL语法对各类数据源进行联合分析查询。其目标是构建实时\离线全数据源统一的数据处理范式,屏蔽底层物理存储和计算层,最大化业务处理数据的效率。同时能够提供给开发人员可插拔接口,由开发人员自行对接新数据源。

功能特性:

  • 支持8种数据源查询:Hive, MySQL, Kylin, Elasticsearch, Oracle, MongoDB, PostgreSQL, GBase-8s;
  • 支持Spark、Flink双计算引擎;
  • 支持基础CLI命令行查询和JDBC远程连接查询;
  • JDBC类型数据源可通过YAML配置快速接入,无需修改代码;
  • 提供方言/语法对接模板,支持用户对新数据源的语法自定义;
  • 提供元数据采集功能,批量拉取预置元数据;
  • 支持落地HDFS,支持可配置的异步响应机制

p1

执行流程图

333

实践过程

参考linkis官方文档《如何快速实现新的底层引擎》、《Spark引擎介绍》,然后在uejs/definedEngines下创建xsql模块进行相关开发。

功能点:

  • 1、支持按照不同集群加载相关配置
  • 2、支持自定义结果存储路径
  • 3、支持是否开启默认limit 5000限制保护
  • 4、linkis网关上socket支持token user认证。
  • 5、适配公司内部hadoop版本
  • 6、增加XSQL执行引擎

实现过程简述:

由于公司大数据是有多集群的,为了节省客户端资源,可以复用客户端提交任务到不同集群,这时就需要能够灵活指定不同集群的配置文件。 目前是将用到的集群配置文件放到linkis-hadoop-conf文件夹下,用于在启动引擎时以及己启动的执行引擎里进入动态加载。


├── client-viewfs.xml
├── core-site-cluster1.xml
├── hbase-site-cluster1.xml
├── hdfs-site.xml
├── hive-default.xml
├── hive-exec-log4j.properties
├── hive-log4j.properties
├── hive-site-cluster1.xml
├── ivysettings.xml
├── mapred-site-cluster1.xml
├── spark-defaults-cluster1.conf
├── xsql-spark-defaults-cluster1.conf
└── yarn-site-cluster1.xml

在linkis-gateway网关模块,改造让socket支持token user认证。比如在创建socket连接时可以通过传入token相关参数来完成用户认证。

ws://gateway.linkis.net:9001/ws/api/entrance/connect?Token-User=xxxx&Token-Code=BML-AUTH

{
	//这个地址也需要增加token参数
 	"method":"/api/rest_j/v1/entrance/execute?Token-User=xxx&Token-Code=BML-AUTH",
 	"data":{
		"params": {
			"variable":{
			},
			"configuration":{
				"special":{

				},
				"runtime":{

				},
				"startup":{
				}
			}
		},
		"executeApplicationName":"xsql",
		"executionCode":"SELECT * FROM abc limit 5;",
		"runType":"sql"
	}
}

由于业务实际查询时是需要全量数据,不需要进行limit限制。 而且是想根据每次请求中参数动态来设置是否需要Limit,而不是通过全局配置统一禁用还是开启。 业务需要自定义存储结果路径,比如跨集群跨账号存储查询结果。

以上是Linkis\ujes\entrance入口模块里进行参数接受处理。

XSQL执行引擎实现:

  • 目录结构

222

由于xsql是基于spark实现的。所以xsql执行引擎基本是复用了linkis spark引擎代码。

重点是修改如下:

主要涉及到linkis-ujes-xsql-engine 模块相关改动

  • pom.xml

<!--<spark.version>2.4.3</spark.version> -->
<!--把2.4.3修改为2.4.3.xsql-0.6.0 -->
<spark.version>2.4.3.xsql-0.6.0</spark.version>,

2.4.3.xsql-0.6.0这个版本请根据从开源xsql编译时获取,由于适配了公司内部,所以版本号可能略有不同。

SparkEngineExecutorFactory 类

override def createExecutor(options: JMap[String, String]): SparkEngineExecutor = {

    val confFile = Paths.get(configPath, "xsql-spark-defaults-" + clusterName + ".conf").toAbsolutePath.toFile.getAbsolutePath
    SparkUtils.getPropertiesFromFile(confFile).filter { case (k, v) =>
      k.startsWith("spark.")
    }.foreach { case (k, v) =>
      conf.set(k, v)
      sys.props.getOrElseUpdate(k, v)
    }

}

def createSparkSession(outputDir: File, conf: SparkConf, addPythonSupport: Boolean = false): SparkSession = {


	//val builder = SparkSession.builder.config(conf)
    //builder.enableHiveSupport().getOrCreate()

	//划重点:将enableHiveSupport改成enableXSQLSupport()
	val builder = SparkSession.builder.config(conf)
    builder.enableXSQLSupport().getOrCreate()
}

SparkEngineExecutor 类

override protected def executeLine(engineExecutorContext: EngineExecutorContext, code: String): ExecuteResponse = Utils.tryFinally {

	//同样要增加加载配置代码段
	val confFile = Paths.get(configPath, "xsql-spark-defaults-" + clusterName + ".conf").toAbsolutePath.toFile.getAbsolutePath
    SparkUtils.getPropertiesFromFile(confFile).filter { case (k, v) =>
      k.startsWith("spark.")
    }.foreach { case (k, v) =>
      sc.getConf.set(k, v)
      sys.props.getOrElseUpdate(k, v)
    }

}

如何使用

提交参数如下:

{
    "params":{
        "variable":{
        },
        "configuration":{
            "special":{
            },
            "runtime":{
                "clusterName":"cluster1",
                "configPath":"/usr/local/dss/linkis/linkis-hadoop-conf",
                "userName":"hadoop",
                "wds.linkis.yarnqueue":"hadoop",
				//可以传入绝对路径,跨集群写,前提执行账号是对目的路径有写权限
                "resultPath":"hdfs://namenode.hadoop.net:9000/home/hadoop/dwc/lihongwei"
                //如果不想linkis进行limit限制,则需要传入"allowNoLimit" : true,
                //否则不需要传这个参数,linkis则默认会进行limit 5000限制
                //"allowNoLimit" : true
            },
            "startup":{
                "clusterName":"cluster1",
                "configPath":"/usr/local/dss/linkis/linkis-hadoop-conf",
                "userName":"hadoop",
                "wds.linkis.yarnqueue":"hadoop",
				//可以传入绝对路径,跨集群写,前提执行账号是对目的路径有写权限
                "resultPath":"hdfs://namenode.hadoop.net:9000/home/hadoop/dwc/lihongwei"
                //如果不想linkis进行limit限制,则需要传入"allowNoLimit" : true,
                //否则不需要传这个参数,linkis则默认会进行limit 5000限制
                //"allowNoLimit" : true
            }
        }
    },
    "executeApplicationName":"xsql",
    "executionCode":"
		REMOVE DATASOURCE IF EXISTS mysql_connect_name;
		ADD DATASOURCE mysql_connect_name(type='mysql',url='jdbc:mysql://10.22.22.22:3306',user='root',password='123456',pushdown='false',useSSL='false',version='5.7.28');
		REMOVE DATASOURCE IF EXISTS hive_cluster1;
		ADD DATASOURCE hive_cluster1(type='hive',metastore.url='thrift://10.222.222.222:9083',user='test',password='test',version='1.2.1');
		SELECT t1.id,t1.name,t1.title,t2.time,t2.url,t2.partner,t2.m2 FROM (SELECT id,name,title,ip FROM mysql_connect_name.database_name.mysql_tables) t1 JOIN 
		(SELECT m,time,url,partner,ip FROM hive_cluster1.database_name.hive_tablse WHERE day = 20200903) t2 
		ON t1.ip=t2.ip order by t2.time;",
    "runType":"sql"
}

XSQL语法说明:

删除数据源时请使用这种语法 REMOVE DATASOURCE IF EXISTS 数据源名称; 避免直接REMOVE DATASOURCE 数据源名称, 因为上来就执行删除数据源,会因为找不到数据源来报异常。

查询的表名需要增加数据源以及数据库进行限定,要符合三段式表名。比如:hive_cluster1.database_name.table_name

第一段数据源名称,就是添加数据源语法时自定义的名称hive_cluster1,比如ADD DATASOURCE hive_cluster1(... ...)

第二段数据库名称,这个需要是真实的数据库,比如database_name

第三段表名,表要是第二段数据库下真实的表名。

更多XSQL使用语法,可以查看官方相关文档。https://qihoo360.github.io/XSQL/tutorial/syntax/

这样就可以实现mysql与hive数据进行关联查询了。

相关版本

hive 1.2.1

spark 2.4.3

linkis 0.9.3

xsql 0.6.0

java 1.8+

hadoop 2.7.2

相关资源

https://github.com/WeBankFinTech/Linkis

https://github.com/Qihoo360/XSQL

https://github.com/Qihoo360/Quicksql

libailin avatar Nov 04 '20 10:11 libailin