lib/rdkafka/config.rb in rdkafka-0.4.2 vs lib/rdkafka/config.rb in rdkafka-0.5.0

- old
+ new

@@ -70,10 +70,11 @@ # @param config_hash [Hash<String,Symbol => String>] The config options for rdkafka # # @return [Config] def initialize(config_hash = {}) @config_hash = DEFAULT_CONFIG.merge(config_hash) + @consumer_rebalance_listener = nil end # Set a config option. # # @param key [String] The config option's key @@ -91,20 +92,37 @@ # @return [String, nil] The config option or `nil` if it is not present def [](key) @config_hash[key] end + # Get notifications on partition assignment/revocation for the subscribed topics + # + # @param listener [Object, #on_partitions_assigned, #on_partitions_revoked] listener instance + def consumer_rebalance_listener=(listener) + @consumer_rebalance_listener = listener + end + # Create a consumer with this configuration. # # @raise [ConfigError] When the configuration contains invalid options # @raise [ClientCreationError] When the native client cannot be created # # @return [Consumer] The created consumer def consumer - kafka = native_kafka(native_config, :rd_kafka_consumer) + opaque = Opaque.new + config = native_config(opaque) + + if @consumer_rebalance_listener + opaque.consumer_rebalance_listener = @consumer_rebalance_listener + Rdkafka::Bindings.rd_kafka_conf_set_rebalance_cb(config, Rdkafka::Bindings::RebalanceCallback) + end + + kafka = native_kafka(config, :rd_kafka_consumer) + # Redirect the main queue to the consumer Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka) + # Return consumer with Kafka client Rdkafka::Consumer.new(kafka) end # Create a producer with this configuration. @@ -202,11 +220,26 @@ end # @private class Opaque attr_accessor :producer + attr_accessor :consumer_rebalance_listener def call_delivery_callback(delivery_handle) producer.call_delivery_callback(delivery_handle) if producer + end + + def call_on_partitions_assigned(consumer, list) + return unless consumer_rebalance_listener + return unless consumer_rebalance_listener.respond_to?(:on_partitions_assigned) + + consumer_rebalance_listener.on_partitions_assigned(consumer, list) + end + + def call_on_partitions_revoked(consumer, list) + return unless consumer_rebalance_listener + return unless consumer_rebalance_listener.respond_to?(:on_partitions_revoked) + + consumer_rebalance_listener.on_partitions_revoked(consumer, list) end end end