dinofei

Results 6 comments of dinofei

> 在kafkaQueue.startProducers中开Consumers个goroutine去拉取消息再向一个channel写,感觉没什么意义啊。FetchMessage本身也是从kafka-go的缓存channel里面获取消息,真实的拉取消息是kafka-go异步批量拉取的。从一个缓冲channel并发读再写入另一个缓冲channel,为什么不省掉这一步呢 感觉是为了做异步,因为fetch是个很轻量的动作,但是consume中有业务会很重,所以中间加一个channel解耦

> 还是没什么意义,fetch本身已经是从一个带缓冲的channel中获取即已经是异步,多增加的这个channel看不出有什么解藕。因为consume消费慢了会导致channel背压,自然会导致调用FetchMessage的生产者goroutine阻塞,再反向传递到kafka-go的自有缓冲channel。 目前这种实现,并发从一个channel拿再并发写入另一个channel,再并发从最后这个channel拿数据consume,相当于凭白多了一轮并发写和并发读,损耗了性能 加的这个channel就是为了在业务层可以并发消费,利用kafka-go的merge commit进行批量提交(底层会对msg的offset进行排序),应该是可以提升性能的,相当于框架帮我们实现了本应该在业务层实现的东西。

> > ( > > ??,加不加channel跟业务层并发有啥关系,在go-queue这个框架的封装里取channel里面的数据跟调用fetch有啥区别,你再看看kafka-go源码,kafka-go这个库已经偏高层api了,很多细节已经实现好了 你说的有道理,看了下别的库的处理方式都是直接fetchMessage后直接consume,然后提交,感觉只保留 startProducer 应该就够了

> 感谢对`kafka-go`进行服务化封装,的确用起来更简单了! > > 我对消费代码有个疑惑,望解答: > > ``` > for i := 0; i < q.c.Processors; i++ { > q.consumerRoutines.Run(func() { > for msg := range q.channel { > if...

> > > 感谢对`kafka-go`进行服务化封装,的确用起来更简单了! > > > 我对消费代码有个疑惑,望解答: > > > ``` > > > for i := 0; i < q.c.Processors; i++ { > > > q.consumerRoutines.Run(func() { >...

> ![image](https://private-user-images.githubusercontent.com/38692228/292849001-21fbee40-47ef-4a1b-8626-f37cb2ba9075.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MTY4OTA0NDUsIm5iZiI6MTcxNjg5MDE0NSwicGF0aCI6Ii8zODY5MjIyOC8yOTI4NDkwMDEtMjFmYmVlNDAtNDdlZi00YTFiLTg2MjYtZjM3Y2IyYmE5MDc1LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA1MjglMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNTI4VDA5NTU0NVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWQ1OGMwMmZlMzM4YzU0NDIxZDY2MTkyOThkZTY3NDI1NTVlZTUzYmY3OTUzMDg0N2UwMmY2NTk4YThkZjliMGYmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.wiuunvjF8HCieXGpz4TpkL9pYBH-bBaTyC502vFOu_A) 看看你的 protoimportpaths 是否包含所有的import path