【有奖征文】linkis与SQL中间件(跨数据源混查)结合实践分享
标题:linkis与SQL中间件(跨数据源混查)结合实践分享
背景需求
-业务需求:
在公司有很多运营、数据分析的童鞋,虽然自有的BI产品功能丰富,有各式各样的定制化分析报表、各种维度的图表。 但是有时需要临时查询数据,发现数据是分布在不同数据源里的,也有可能数据来自不同业务的不同集群。 比如查询hive表的数据,但是维度映射数据在mysql里。还有要两个mysql数据库(不同服务器)关联查询等情况, 平时这些需求都需要程序猿大神们写程序实现。
现在可以通过Linkis与SQL中间件完美结合满足上述需求。难道不香吗?
-技术迭代:
之前使用的Apache Livy作为SQL执行入口,把查询的请求都提交给Livy,但是体验上不尽人意。 一直想找个替代组件, 后来发现功能强大的linkis。至于Linkis跟Apache Livy的对比,可以查看官方相关文档。
SQL中间件介绍:
SQL中间件是基于公司目前开源的XSQL和Quicksql两款SQL中间件,两种都支持跨数据源混查,两个都很优秀, 至于大家选择集成哪个可以根据自身情况决定。因为之前使用过XSQL,所以是在linkis增加了xsql查询引擎。
以下分别简单介绍下两款开源组件:
XSQL:
XSQL是一款低门槛、更稳定的分布式查询引擎。它允许你快速、近实时地查询大量数据。对于一些数据源(例如:Elasticsearch、MongoDB、Druid等),他可以大幅地降低学习曲线,并节省人力成本。除Hive外,每种数据源都支持除子查询外的下推执行优化。用户有时希望将位于不同数据源上的数据关联起来进行查询,但是由于各种数据源是异构的且一些数据源不支持SQL或者支持的SQL语法非常有限,因此传统互联网公司的做法是,将不同的数据同步到统一的存储介质中,再进行OLAP的查询。数据同步的过程中可能面临数据迁移、主从同步、网络带宽等诸多困难和挑战,而且需要浪费大量的人力、物力及时间,无法满足大数据产品当前阶段对于近实时甚至准实时的场景。通过XSQL你将可以避免数据迁移和时间浪费,更加专注于业务本身。XSQL可以通过下推、并行计算、迭代计算等底层支撑技术,对各种数据源的查询加速。
功能特性:
- 内置8种数据源,包括:Hive、Mysql、EleasticSearch、Mongo、Kafka、Hbase、Redis、Druid等。
- XSQL采用数据源(DataSource)、数据库(Database)、数据表(Table)的三层元信息,为异构数据源提供了统一视图,进而实现了跨数据源的数据关联
- SQL Everything,将程序与数据源具体版本解耦,程序迁移能力得到加强
- 对DDL、DML、可下推查询,延迟与Yarn的交互及资源申请,进而提升效率并节省资源。
- 相比很多开源分布式查询引擎,XSQL替换了Spark SQL,因而只需要一次SQL解析,避免多次解析带来的时延。
- XSQL允许用户将聚合、过滤、投影等操作下推至数据源计算引擎,相比DataSet API更容易实现毫秒级响应。
- XSQL借鉴了业内优秀的开源项目,放弃元数据的中心化,因此避免了数据同步、数据不一致,数据延迟等不利因素。XSQL也因此在部署上更加轻量、简便。
- XSQL对元数据的缓存有两种级别,既能减少对底层数据源的压力,也提升了XSQL的执行效率。
- XSQL可以按照用户需要,设置元数据白名单来避免缓存多余的元信息,进一步提升执行效率。
- 可适配到Spark 2.x任意版本,解压即可运行,不需要引入额外依赖。且与原生SparkSQL隔离运行,不影响现有程序运行

Quicksql:
Quicksql是一款跨计算引擎的统一联邦查询中间件,用户可以使用标准SQL语法对各类数据源进行联合分析查询。其目标是构建实时\离线全数据源统一的数据处理范式,屏蔽底层物理存储和计算层,最大化业务处理数据的效率。同时能够提供给开发人员可插拔接口,由开发人员自行对接新数据源。
功能特性:
- 支持8种数据源查询:Hive, MySQL, Kylin, Elasticsearch, Oracle, MongoDB, PostgreSQL, GBase-8s;
- 支持Spark、Flink双计算引擎;
- 支持基础CLI命令行查询和JDBC远程连接查询;
- JDBC类型数据源可通过YAML配置快速接入,无需修改代码;
- 提供方言/语法对接模板,支持用户对新数据源的语法自定义;
- 提供元数据采集功能,批量拉取预置元数据;
- 支持落地HDFS,支持可配置的异步响应机制

执行流程图

实践过程
参考linkis官方文档《如何快速实现新的底层引擎》、《Spark引擎介绍》,然后在uejs/definedEngines下创建xsql模块进行相关开发。
功能点:
- 1、支持按照不同集群加载相关配置
- 2、支持自定义结果存储路径
- 3、支持是否开启默认limit 5000限制保护
- 4、linkis网关上socket支持token user认证。
- 5、适配公司内部hadoop版本
- 6、增加XSQL执行引擎
实现过程简述:
由于公司大数据是有多集群的,为了节省客户端资源,可以复用客户端提交任务到不同集群,这时就需要能够灵活指定不同集群的配置文件。 目前是将用到的集群配置文件放到linkis-hadoop-conf文件夹下,用于在启动引擎时以及己启动的执行引擎里进入动态加载。
├── client-viewfs.xml
├── core-site-cluster1.xml
├── hbase-site-cluster1.xml
├── hdfs-site.xml
├── hive-default.xml
├── hive-exec-log4j.properties
├── hive-log4j.properties
├── hive-site-cluster1.xml
├── ivysettings.xml
├── mapred-site-cluster1.xml
├── spark-defaults-cluster1.conf
├── xsql-spark-defaults-cluster1.conf
└── yarn-site-cluster1.xml
在linkis-gateway网关模块,改造让socket支持token user认证。比如在创建socket连接时可以通过传入token相关参数来完成用户认证。
ws://gateway.linkis.net:9001/ws/api/entrance/connect?Token-User=xxxx&Token-Code=BML-AUTH
{
//这个地址也需要增加token参数
"method":"/api/rest_j/v1/entrance/execute?Token-User=xxx&Token-Code=BML-AUTH",
"data":{
"params": {
"variable":{
},
"configuration":{
"special":{
},
"runtime":{
},
"startup":{
}
}
},
"executeApplicationName":"xsql",
"executionCode":"SELECT * FROM abc limit 5;",
"runType":"sql"
}
}
由于业务实际查询时是需要全量数据,不需要进行limit限制。 而且是想根据每次请求中参数动态来设置是否需要Limit,而不是通过全局配置统一禁用还是开启。 业务需要自定义存储结果路径,比如跨集群跨账号存储查询结果。
以上是Linkis\ujes\entrance入口模块里进行参数接受处理。
XSQL执行引擎实现:
- 目录结构

由于xsql是基于spark实现的。所以xsql执行引擎基本是复用了linkis spark引擎代码。
重点是修改如下:
主要涉及到linkis-ujes-xsql-engine 模块相关改动
- pom.xml
<!--<spark.version>2.4.3</spark.version> -->
<!--把2.4.3修改为2.4.3.xsql-0.6.0 -->
<spark.version>2.4.3.xsql-0.6.0</spark.version>,
2.4.3.xsql-0.6.0这个版本请根据从开源xsql编译时获取,由于适配了公司内部,所以版本号可能略有不同。
SparkEngineExecutorFactory 类
override def createExecutor(options: JMap[String, String]): SparkEngineExecutor = {
val confFile = Paths.get(configPath, "xsql-spark-defaults-" + clusterName + ".conf").toAbsolutePath.toFile.getAbsolutePath
SparkUtils.getPropertiesFromFile(confFile).filter { case (k, v) =>
k.startsWith("spark.")
}.foreach { case (k, v) =>
conf.set(k, v)
sys.props.getOrElseUpdate(k, v)
}
}
def createSparkSession(outputDir: File, conf: SparkConf, addPythonSupport: Boolean = false): SparkSession = {
//val builder = SparkSession.builder.config(conf)
//builder.enableHiveSupport().getOrCreate()
//划重点:将enableHiveSupport改成enableXSQLSupport()
val builder = SparkSession.builder.config(conf)
builder.enableXSQLSupport().getOrCreate()
}
SparkEngineExecutor 类
override protected def executeLine(engineExecutorContext: EngineExecutorContext, code: String): ExecuteResponse = Utils.tryFinally {
//同样要增加加载配置代码段
val confFile = Paths.get(configPath, "xsql-spark-defaults-" + clusterName + ".conf").toAbsolutePath.toFile.getAbsolutePath
SparkUtils.getPropertiesFromFile(confFile).filter { case (k, v) =>
k.startsWith("spark.")
}.foreach { case (k, v) =>
sc.getConf.set(k, v)
sys.props.getOrElseUpdate(k, v)
}
}
如何使用
提交参数如下:
{
"params":{
"variable":{
},
"configuration":{
"special":{
},
"runtime":{
"clusterName":"cluster1",
"configPath":"/usr/local/dss/linkis/linkis-hadoop-conf",
"userName":"hadoop",
"wds.linkis.yarnqueue":"hadoop",
//可以传入绝对路径,跨集群写,前提执行账号是对目的路径有写权限
"resultPath":"hdfs://namenode.hadoop.net:9000/home/hadoop/dwc/lihongwei"
//如果不想linkis进行limit限制,则需要传入"allowNoLimit" : true,
//否则不需要传这个参数,linkis则默认会进行limit 5000限制
//"allowNoLimit" : true
},
"startup":{
"clusterName":"cluster1",
"configPath":"/usr/local/dss/linkis/linkis-hadoop-conf",
"userName":"hadoop",
"wds.linkis.yarnqueue":"hadoop",
//可以传入绝对路径,跨集群写,前提执行账号是对目的路径有写权限
"resultPath":"hdfs://namenode.hadoop.net:9000/home/hadoop/dwc/lihongwei"
//如果不想linkis进行limit限制,则需要传入"allowNoLimit" : true,
//否则不需要传这个参数,linkis则默认会进行limit 5000限制
//"allowNoLimit" : true
}
}
},
"executeApplicationName":"xsql",
"executionCode":"
REMOVE DATASOURCE IF EXISTS mysql_connect_name;
ADD DATASOURCE mysql_connect_name(type='mysql',url='jdbc:mysql://10.22.22.22:3306',user='root',password='123456',pushdown='false',useSSL='false',version='5.7.28');
REMOVE DATASOURCE IF EXISTS hive_cluster1;
ADD DATASOURCE hive_cluster1(type='hive',metastore.url='thrift://10.222.222.222:9083',user='test',password='test',version='1.2.1');
SELECT t1.id,t1.name,t1.title,t2.time,t2.url,t2.partner,t2.m2 FROM (SELECT id,name,title,ip FROM mysql_connect_name.database_name.mysql_tables) t1 JOIN
(SELECT m,time,url,partner,ip FROM hive_cluster1.database_name.hive_tablse WHERE day = 20200903) t2
ON t1.ip=t2.ip order by t2.time;",
"runType":"sql"
}
XSQL语法说明:
删除数据源时请使用这种语法 REMOVE DATASOURCE IF EXISTS 数据源名称; 避免直接REMOVE DATASOURCE 数据源名称, 因为上来就执行删除数据源,会因为找不到数据源来报异常。
查询的表名需要增加数据源以及数据库进行限定,要符合三段式表名。比如:hive_cluster1.database_name.table_name
第一段数据源名称,就是添加数据源语法时自定义的名称hive_cluster1,比如ADD DATASOURCE hive_cluster1(... ...)
第二段数据库名称,这个需要是真实的数据库,比如database_name
第三段表名,表要是第二段数据库下真实的表名。
更多XSQL使用语法,可以查看官方相关文档。https://qihoo360.github.io/XSQL/tutorial/syntax/
这样就可以实现mysql与hive数据进行关联查询了。
相关版本
hive 1.2.1
spark 2.4.3
linkis 0.9.3
xsql 0.6.0
java 1.8+
hadoop 2.7.2
相关资源
https://github.com/WeBankFinTech/Linkis
https://github.com/Qihoo360/XSQL
https://github.com/Qihoo360/Quicksql