Poseidon not fetching from within Redstorm
I'm using Redstorm to run a Storm topology in JRuby (jruby 1.7.19 (1.9.3p551)) and need to consume messages from Kafka topics. I want to build my own Kafka spout (input stream for Storm) but this fails to fetch messages, although a similar JRuby consumer outside of Redstorm does successfully fetch messages.
The RedStorm spout is written as follows, adapted from the example Poseidon consumer, but split into a init and consume part:
require 'red_storm'
require 'poseidon'
class KafkaSpout < RedStorm::DSL::Spout
on_init do
# Poseidon consumer to consume Kafka messages
@consumer = Poseidon::PartitionConsumer.new "storm_consumer", "localhost", 9092, "cashless_transactions", 0, :earliest_offset
# message buffer with initial set of messages, we do a fetch to make sure that all attributes of the consumer are set
@messages = @consumer.fetch
print_debug
end
on_send do
# Try to consume messages from Kafka if the @messages buffer is empty
if @messages.empty?
@messages = @consumer.fetch
print_debug
end
# If we received new messages, output them from the spout, one by one
@messages.shift if @messages.size > 0
end
def print_debug
# Print some debug info to verify that all behaves as expected
puts "Highwater mark = #{@consumer.highwater_mark}"
puts "Offset = #{@consumer.offset}"
puts "Next offset = #{@consumer.next_offset}"
puts "Host = #{@consumer.host}"
puts "Port = #{@consumer.port}"
puts "Topic = #{@consumer.topic}"
puts "Message buffer = #{@messages.size}"
end
end
After every fetch, I'm doing a bit of debugging to check on Poseidon's behavior. When the Redstorm topology is executed (with JRuby), no messages are being fetched. The debug repeats itself and looks as follows:
[...]
Highwater mark = 261037
Offset = 0
Next offset = 0
Host = localhost
Port = 9092
Topic = cashless_transactions
Message buffer = 0
Highwater mark = 261037
Offset = 0
Next offset = 0
Host = localhost
Port = 9092
Topic = cashless_transactions
Message buffer = 0
[...]
Poseidon correctly detects that I have 261037 (highwater mark) messages in my topic, but isn't advancing, not fetching messages and remaining at offset 0 after each fetch. It should be noted that all messages are small (~100 bytes). I have attempted a few fetch option settings, but neither of these help.
Poseidon's example consumer on the same Kafka instance and topic works just fine (without Redstorm, but with JRuby):
require 'poseidon'
consumer = Poseidon::PartitionConsumer.new("my_test_consumer", "localhost", 9092,
"cashless_transactions", 0, :earliest_offset)
loop do
messages = consumer.fetch
puts "#{messages.size} messages fetched"
end
with output:
3016 messages fetched
2955 messages fetched
2938 messages fetched
2930 messages fetched
2923 messages fetched
2912 messages fetched
[...]
I have no idea how to further debug/resolve this issue. All seems fine, except for fetching messages. Any input into getting Poseidon to fetch the messages, or help debugging the issue that only seems to exist when ran from within Redstorm (JRuby works fine on the example consumer), would be very appreciated!
It's very odd that it would work by itself but not in the context of RedStorm. I don't see any issues in your code.
If you're running on JRuby you may want to look at the jruby-kafka gem, poseidon is pretty immature compared to the official Java client.