bronzels
bronzels
package main import ( "flag" "path/filepath" "strings" "os" "github.com/chrislusf/gleam/flow" "github.com/chrislusf/gleam/gio" "github.com/chrislusf/gleam/plugins/kafka" ) var ( brokers = flag.String("brokers", "beta-hbase02:9092,beta-hbase03:9092,beta-hbase04:9092", "a list of comma separated broker:port") topic = flag.String("topic", "test", "the topic...
k is like this:
k = {*github.com/chrislusf/gleam/plugins/kafka.KafkaSource | 0xc0000dc7d0} Brokers = {[]string} len:3, cap:3 0 = {string} "beta-hbase02:9092" 1 = {string} "beta-hbase03:9092" 2 = {string} "beta-hbase04:9092" Group = {string} "___go_build_gocrawler_gleamtest_standalone_kafkareader" Topic = {string} "test"...
added a Map between read and println, still no print, debug with break point set in the 1st line of capitalize, no stop after new msgs sent into the topic.
Capitalize = gio.RegisterMapper(capitalize) ) func main() { gio.Init() flag.Parse() brokerList := strings.Split(*brokers, ",") k := kafka.New(brokerList, *topic, *group) k.TimeoutSeconds = *timeout f := flow.New("kafka " + *topic).Read(k).Map("capitalize", Capitalize).Printlnf("%x") f.Run() }...
i tried connect to a not existing topic, the topic appear after gleam app is started, but nothing active detected in the gleam app, here is the summary after app...
step:testgleam.list0 output : d0 shard:0 time:306.505µs completed 285 step:testgleam.Read.Map1 input : d0 shard:0 time:306.505µs completed 285 output : d1 shard:0 time:1m16.258485573s processed 0 step:capitalize.Map2 input : d1 shard:0 time:1m16.258495764s processed...
BTW, default partition num in the remote kafka cluster is 12
or if there is a QQ group for gleam discussion, pls. share the id
great! after connect to a real topic in production, it is working now. how to control the buffer, i would like each new msg is dealt by the Map ASAP...