rdkafka-ruby icon indicating copy to clipboard operation
rdkafka-ruby copied to clipboard

Reference to Opaque is not released when Admin, Consumer or Producer is closed

Open wanabe opened this issue 3 years ago • 4 comments

Hello,

I have observed that when a long-lived process such as sidekiq repeatedly creates and destroys Producers, the memory usage of the process increases. Apparently, the references from Rdkafka::Config.opaques seems remain even after closing. In the case of Consumer and Admin, the Opaque itself seems to be recovered by GC, but the Opaque created at the same time of creation remains.

How about releasing the reference to Opaque when the Admin, Consumer or Producer is closed?

Here is an test script:

# test.rb
require "rdkafka"

def count_kafka_objects
  [Rdkafka::Opaque, Rdkafka::Producer, Rdkafka::Consumer, Rdkafka::Admin].to_h do |k|
    [k, ObjectSpace.each_object(k).count]
  end
end
  
config_prop = {
  :"bootstrap.servers" => "localhost:9092",
}

puts " at first: %s" % count_kafka_objects.inspect

10.times do
  config = Rdkafka::Config.new(config_prop)
  producer = config.producer
  producer.close
  consumer = config.consumer
  consumer.close
  admin = config.admin
  admin.close
end

puts "before GC: %s" % count_kafka_objects.inspect
GC.start
puts " after GC: %s" % count_kafka_objects.inspect

and execution result:

$ bundle exec ruby test.rb 
 at first: {Rdkafka::Opaque=>0, Rdkafka::Producer=>0, Rdkafka::Consumer=>0, Rdkafka::Admin=>0}
before GC: {Rdkafka::Opaque=>30, Rdkafka::Producer=>10, Rdkafka::Consumer=>10, Rdkafka::Admin=>10}
 after GC: {Rdkafka::Opaque=>30, Rdkafka::Producer=>10, Rdkafka::Consumer=>0, Rdkafka::Admin=>0}

wanabe avatar May 21 '22 05:05 wanabe

I have written a few patches, but none of them were made into pull requests because I didn't think they were very good approaches.

It increases the method unnecessarily.
diff --git a/lib/rdkafka/admin.rb b/lib/rdkafka/admin.rb
index f3002c3..565d022 100644
--- a/lib/rdkafka/admin.rb
+++ b/lib/rdkafka/admin.rb
@@ -1,5 +1,11 @@
 module Rdkafka
   class Admin
+    # @private
+    # Opaque object to receive callbacks.
+    #
+    # @return [Rdkafka::Opaque, nil]
+    attr_accessor :opaque
+
     # @private
     def initialize(native_kafka)
       @native_kafka = native_kafka
@@ -27,6 +33,7 @@ module Rdkafka
       # Wait for the polling thread to finish up
       @polling_thread.join
       Rdkafka::Bindings.rd_kafka_destroy(@native_kafka)
+      Rdkafka::Config.remove_opaque(@opaque)
       @native_kafka = nil
     end
 
diff --git a/lib/rdkafka/config.rb b/lib/rdkafka/config.rb
index 2fe3726..0ba9b37 100644
--- a/lib/rdkafka/config.rb
+++ b/lib/rdkafka/config.rb
@@ -93,6 +93,12 @@ module Rdkafka
       @@opaques
     end
 
+    # @private
+    def self.remove_opaque(opaque)
+      return unless opaque
+      @@opaques.delete(opaque.object_id)
+    end
+
     # Default config that can be overwritten.
     DEFAULT_CONFIG = {
       # Request api version so advanced features work
@@ -162,7 +168,9 @@ module Rdkafka
       Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka)
 
       # Return consumer with Kafka client
-      Rdkafka::Consumer.new(kafka)
+      Rdkafka::Consumer.new(kafka).tap do |consumer|
+        consumer.opaque = opaque
+      end
     end
 
     # Create a producer with this configuration.
@@ -181,6 +189,7 @@ module Rdkafka
       # Return producer with Kafka client
       Rdkafka::Producer.new(Rdkafka::Producer::Client.new(native_kafka(config, :rd_kafka_producer)), self[:partitioner]).tap do |producer|
         opaque.producer = producer
+        producer.opaque = opaque
       end
     end
 
@@ -194,7 +203,9 @@ module Rdkafka
       opaque = Opaque.new
       config = native_config(opaque)
       Rdkafka::Bindings.rd_kafka_conf_set_background_event_cb(config, Rdkafka::Callbacks::BackgroundEventCallbackFunction)
-      Rdkafka::Admin.new(native_kafka(config, :rd_kafka_producer))
+      Rdkafka::Admin.new(native_kafka(config, :rd_kafka_producer)).tap do |admin|
+        admin.opaque = opaque
+      end
     end
 
     # Error that is returned by the underlying rdkafka error if an invalid configuration option is present.
diff --git a/lib/rdkafka/consumer.rb b/lib/rdkafka/consumer.rb
index 48c4753..7816509 100644
--- a/lib/rdkafka/consumer.rb
+++ b/lib/rdkafka/consumer.rb
@@ -11,6 +11,12 @@ module Rdkafka
   class Consumer
     include Enumerable
 
+    # @private
+    # Opaque object to receive callbacks.
+    #
+    # @return [Rdkafka::Opaque, nil]
+    attr_accessor :opaque
+
     # @private
     def initialize(native_kafka)
       @native_kafka = native_kafka
@@ -25,6 +31,7 @@ module Rdkafka
       @closing = true
       Rdkafka::Bindings.rd_kafka_consumer_close(@native_kafka)
       Rdkafka::Bindings.rd_kafka_destroy(@native_kafka)
+      Rdkafka::Config.remove_opaque(@opaque)
       @native_kafka = nil
     end
 
diff --git a/lib/rdkafka/producer.rb b/lib/rdkafka/producer.rb
index 85b4d7d..7453fb8 100644
--- a/lib/rdkafka/producer.rb
+++ b/lib/rdkafka/producer.rb
@@ -9,6 +9,12 @@ module Rdkafka
     # @return [Proc, nil]
     attr_reader :delivery_callback
 
+    # @private
+    # Opaque object to receive callbacks.
+    #
+    # @return [Rdkafka::Opaque, nil]
+    attr_accessor :opaque
+
     # @private
     def initialize(client, partitioner_name)
       @client = client
@@ -34,6 +40,8 @@ module Rdkafka
       ObjectSpace.undefine_finalizer(self)
 
       @client.close
+      Rdkafka::Config.remove_opaque(@opaque)
+      nil
     end
 
     # Partition count for a given topic.

It is unclear the separation of concerns.
diff --git a/lib/rdkafka/admin.rb b/lib/rdkafka/admin.rb
index f3002c3..f58d2de 100644
--- a/lib/rdkafka/admin.rb
+++ b/lib/rdkafka/admin.rb
@@ -27,6 +27,7 @@ module Rdkafka
       # Wait for the polling thread to finish up
       @polling_thread.join
       Rdkafka::Bindings.rd_kafka_destroy(@native_kafka)
+      Rdkafka::Config.notify_closing(self)
       @native_kafka = nil
     end
 
diff --git a/lib/rdkafka/config.rb b/lib/rdkafka/config.rb
index 2fe3726..4079edd 100644
--- a/lib/rdkafka/config.rb
+++ b/lib/rdkafka/config.rb
@@ -14,6 +14,8 @@ module Rdkafka
     # @private
     @@opaques = {}
     # @private
+    @@opaque_parents = {}
+    # @private
     @@log_queue = Queue.new
 
     Thread.start do
@@ -93,6 +95,17 @@ module Rdkafka
       @@opaques
     end
 
+    # @private
+    def self.opaque_parents
+      @@opaque_parents
+    end
+
+    # @private
+    def self.notify_closing(opaque_parent)
+      opaque = opaque_parents.delete(opaque_parent)
+      opaques.delete(opaque.object_id) if opaque
+    end
+
     # Default config that can be overwritten.
     DEFAULT_CONFIG = {
       # Request api version so advanced features work
@@ -162,7 +175,9 @@ module Rdkafka
       Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka)
 
       # Return consumer with Kafka client
-      Rdkafka::Consumer.new(kafka)
+      Rdkafka::Consumer.new(kafka).tap do |consumer|
+        Rdkafka::Config.opaque_parents[consumer] = opaque
+      end
     end
 
     # Create a producer with this configuration.
@@ -181,6 +196,7 @@ module Rdkafka
       # Return producer with Kafka client
       Rdkafka::Producer.new(Rdkafka::Producer::Client.new(native_kafka(config, :rd_kafka_producer)), self[:partitioner]).tap do |producer|
         opaque.producer = producer
+        Rdkafka::Config.opaque_parents[producer] = opaque
       end
     end
 
@@ -194,7 +210,9 @@ module Rdkafka
       opaque = Opaque.new
       config = native_config(opaque)
       Rdkafka::Bindings.rd_kafka_conf_set_background_event_cb(config, Rdkafka::Callbacks::BackgroundEventCallbackFunction)
-      Rdkafka::Admin.new(native_kafka(config, :rd_kafka_producer))
+      Rdkafka::Admin.new(native_kafka(config, :rd_kafka_producer)).tap do |admin|
+        Rdkafka::Config.opaque_parents[admin] = opaque
+      end
     end
 
     # Error that is returned by the underlying rdkafka error if an invalid configuration option is present.
diff --git a/lib/rdkafka/consumer.rb b/lib/rdkafka/consumer.rb
index 48c4753..461b1cc 100644
--- a/lib/rdkafka/consumer.rb
+++ b/lib/rdkafka/consumer.rb
@@ -25,6 +25,7 @@ module Rdkafka
       @closing = true
       Rdkafka::Bindings.rd_kafka_consumer_close(@native_kafka)
       Rdkafka::Bindings.rd_kafka_destroy(@native_kafka)
+      Rdkafka::Config.notify_closing(self)
       @native_kafka = nil
     end
 
diff --git a/lib/rdkafka/producer.rb b/lib/rdkafka/producer.rb
index 85b4d7d..c2b1b7a 100644
--- a/lib/rdkafka/producer.rb
+++ b/lib/rdkafka/producer.rb
@@ -34,6 +34,8 @@ module Rdkafka
       ObjectSpace.undefine_finalizer(self)
 
       @client.close
+      Rdkafka::Config.notify_closing(self)
+      nil
     end
 
     # Partition count for a given topic.

wanabe avatar May 21 '22 05:05 wanabe

I have observed that when a long-lived process such as sidekiq repeatedly creates and destroys Producers

Just to be clear for anyone else: rdkafka producers should not be initialized per message and they should be persistent.

mensfeld avatar May 21 '22 07:05 mensfeld

Thank you for your comment.

Just to be clear for anyone else: rdkafka producers should not be initialized per message and they should be persistent.

Yes, it shouldn't be generated for each message. That is indeed too inefficient. In my application, I was generating and closing them on a per sidekiq job basis.

If this is still inefficient, it may be possible to introduce some sort of connection pool between jobs. Should I do that?

wanabe avatar May 21 '22 08:05 wanabe

@wanabe in general librdkafka that runs underneath rdkafka is thread safe. There is no real reason not to use it in multiple threads at the same time. I certainly do in scale and had no issues for a really long time now.

If you are looking for something more abstract, you can take a look at https://github.com/karafka/waterdrop - it's built on top of rdkafka with some additional concurrency warranties.

Nonetheless I think it still may be worth either patching rdkafka or at least updating the docs to indicate that this is not the best usecase for it.

mensfeld avatar May 21 '22 08:05 mensfeld

Note to self: examples here should be moved to tests so we ensure nothing leaks.

mensfeld avatar Mar 31 '23 20:03 mensfeld

I totally forgot about this one. :pray:

mensfeld avatar Oct 27 '23 14:10 mensfeld

@wanabe here is the fix: https://github.com/karafka/karafka-rdkafka/pull/29

Can you check it? It is (for now) in karafka-rdkafka mostly because I can run my karafka integration suite against it that way but when proved stable will immediately backport it here (and in long run the plan is to merge them both).

The way it works is as follows:

  1. opaque is passed to admin, consumer and producer and as long as the reference exists, WeakMap will not remove it
  2. the moment any of those is closed AND the variable that references given object (admin, consumer, producer) is removed, WeakMap cleans itself and GC will remove opaque.

Please don't mind failing specs for now.

mensfeld avatar Oct 27 '23 15:10 mensfeld

The above will not work with Ruby 2.6 but based on the rdkafka-ruby readme it should not be supported because it is way after the EOL so it may be worth considering dropping its support.

mensfeld avatar Oct 27 '23 15:10 mensfeld

This solution was done thanks to @peterzhu2118 help :pray:

mensfeld avatar Oct 27 '23 15:10 mensfeld

My solution seems to work:

irb(main):025:0> puts "before GC: %s" % count_kafka_objects.inspect
before GC: {Rdkafka::Opaque=>17, Rdkafka::Producer=>6, Rdkafka::Consumer=>6, Rdkafka::Admin=>6}
=> nil
irb(main):026:0> GC.start
irb(main):027:0> puts " after GC: %s" % count_kafka_objects.inspect
 after GC: {Rdkafka::Opaque=>0, Rdkafka::Producer=>0, Rdkafka::Consumer=>0, Rdkafka::Admin=>0}

mensfeld avatar Oct 28 '23 15:10 mensfeld

also @wanabe your spec is lacking one thing:

# test.rb
require "rdkafka"

def count_kafka_objects
  [Rdkafka::Opaque, Rdkafka::Producer, Rdkafka::Consumer, Rdkafka::Admin].to_h do |k|
    [k, ObjectSpace.each_object(k).count]
  end
end
  
config_prop = {
  :"bootstrap.servers" => "localhost:9092",
}

puts " at first: %s" % count_kafka_objects.inspect

GC.disable # HERE

10.times do
  config = Rdkafka::Config.new(config_prop)
  producer = config.producer
  producer.close
  consumer = config.consumer
  consumer.close
  admin = config.admin
  admin.close
end

puts "before GC: %s" % count_kafka_objects.inspect
GC.start
puts " after GC: %s" % count_kafka_objects.inspect


mensfeld avatar Oct 28 '23 19:10 mensfeld