bronzels

Results 18 comments of bronzels

package main import ( "flag" "path/filepath" "strings" "os" "github.com/chrislusf/gleam/flow" "github.com/chrislusf/gleam/gio" "github.com/chrislusf/gleam/plugins/kafka" ) var ( brokers = flag.String("brokers", "beta-hbase02:9092,beta-hbase03:9092,beta-hbase04:9092", "a list of comma separated broker:port") topic = flag.String("topic", "test", "the topic...

k = {*github.com/chrislusf/gleam/plugins/kafka.KafkaSource | 0xc0000dc7d0} Brokers = {[]string} len:3, cap:3 0 = {string} "beta-hbase02:9092" 1 = {string} "beta-hbase03:9092" 2 = {string} "beta-hbase04:9092" Group = {string} "___go_build_gocrawler_gleamtest_standalone_kafkareader" Topic = {string} "test"...

added a Map between read and println, still no print, debug with break point set in the 1st line of capitalize, no stop after new msgs sent into the topic.

Capitalize = gio.RegisterMapper(capitalize) ) func main() { gio.Init() flag.Parse() brokerList := strings.Split(*brokers, ",") k := kafka.New(brokerList, *topic, *group) k.TimeoutSeconds = *timeout f := flow.New("kafka " + *topic).Read(k).Map("capitalize", Capitalize).Printlnf("%x") f.Run() }...

i tried connect to a not existing topic, the topic appear after gleam app is started, but nothing active detected in the gleam app, here is the summary after app...

step:testgleam.list0 output : d0 shard:0 time:306.505µs completed 285 step:testgleam.Read.Map1 input : d0 shard:0 time:306.505µs completed 285 output : d1 shard:0 time:1m16.258485573s processed 0 step:capitalize.Map2 input : d1 shard:0 time:1m16.258495764s processed...

BTW, default partition num in the remote kafka cluster is 12

or if there is a QQ group for gleam discussion, pls. share the id

great! after connect to a real topic in production, it is working now. how to control the buffer, i would like each new msg is dealt by the Map ASAP...