gravity icon indicating copy to clipboard operation
gravity copied to clipboard

mysql数据接收到kafka topic的时候,ddl语句都被过滤掉了,不会存储到topic中.

Open Douglas-gwj opened this issue 5 years ago • 5 comments

[output] type = "async-kafka"

目标端编码规则:输出类型和版本号

- 可选

[output.config]

默认为 json

output-format = "json" #enable-ddl = true

默认为 0.1 版本

schema-version = "0.1" [output.config.kafka-global-config] broker-addrs = ["cdh01:9092","cdh02:9092","cdh03:9092"] mode = "async"

目标端 kafka SASL 配置

# - 可选

#[output.config.kafka-global-config.net.sasl] #enable = false #user = "" #password = ""

kafka 路由的定义

[[output.config.routes]] match-schema = "test_gravity" match-table = "*" dml-topic = "test_gravity"

Douglas-gwj avatar Mar 27 '20 09:03 Douglas-gwj

目前确实是这样实现的。主要原因是消费端并不是很容易处理 DDL,除非是单 partition 的 topic。

在多 partition 的前提下,如果下发 ddl,由于不同 partition 的消费不能保证顺序,同一个组里的某个客户端消费到 ddl 时,可能有在消费 ddl 执行前数据的客户端,可能有在消费 ddl 执行后数据的客户端,无法保证 ddl 双向 barrier 的语义。

不知道你这边场景是什么样的,针对这个问题想怎么解决呢?如果有好的想法我们也很有兴趣实现。

Ryan-Git avatar Mar 27 '20 10:03 Ryan-Git

我们的需求是希望将binlog实时的写入到hdfs上面去. 目前做的是准实时级别的数仓。 但是后面为了说做实时数仓,想试着说切换到kafka上面去啊。毕竟大家都是用的kafka,所以好交流些。

我们之前是为了同步binlog到hdfs上面,然后load到hive里面去.之前的做法相对来说比较粗糙,之前的做法是用Canal Server读取binlog,Canal Client解析binlog后存储到RocketMQ中,设计的是一个库一个topic,queue size是10,然后一个表存储到一个queue里面去,当有新表产生的时候根据queue size的大小进行判定均衡负载。这样做不能保证size像kafka那种每个queue能做到相对均衡,只能做到一定程度的均衡。 存储到rocketmq后用spark streaming进行消费,对一个批次的数据先进行type=ddl类型的判定,然后先对hive表进行相应的ddl操作后(当然有些操作像删除字段,修改字段名称,特定位置添加字段等,有特殊的做法.),再把type为:update,insert,delete的数据写入到hdfs上面去.。

所以为了实时接入binlog到hive,ddl语句是必须的啊

Douglas-gwj avatar Mar 30 '20 01:03 Douglas-gwj

所以是用每个表只用一个 queue 的方式实现的对吧。跟前面说的单 partition 一个意思。

rmq 不太清楚,kafka 上这个方式吞吐量有瓶颈。我们的环境下单 partition 大概最多 1w rps,流量大一点的表就不太够用了。

不过在流量不是很大的情况下确实也是一个办法,我们增加一个按库名+表名路由 partition 的功能是不是能满足需求?

Ryan-Git avatar Mar 30 '20 02:03 Ryan-Git

嗯,能满足需求。。。只是这种方式目前好像设计上不是那么的友好吧。 topic partition必然会出现数据分布不均衡的现象。 但是相对来说有这种需求的场景不是说有那么高的性能上的要求吧。 我们用这种方式主要是将以前用sqoop或者别的方式抽mysql数据到hive中给替换掉,直接读取日志,在写到hive表文件中。 这样对mysql的压力要小很多了。 而且数据也是相对实时的进入到hive里面吧。

Douglas-gwj avatar Mar 30 '20 07:03 Douglas-gwj

这个不是我们的限制,是ddl需要全局时序而kafka无法提供导致的。 我们考虑过其他方案,比如把ddl发到所有partition,先发一个特定消息再发ddl等,但由于客户端本身也是分布式的,其实还是很难处理。

Ryan-Git avatar Mar 30 '20 13:03 Ryan-Git