lib/karafka/connection/config_adapter.rb in karafka-1.1.2 vs lib/karafka/connection/config_adapter.rb in karafka-1.2.0.beta1

- old
+ new

@@ -12,11 +12,14 @@ # into the client and breaking stuff module ConfigAdapter class << self # Builds all the configuration settings for Kafka.new method # @param _consumer_group [Karafka::Routing::ConsumerGroup] consumer group details - # @return [Hash] hash with all the settings required by Kafka.new method + # @return [Array<Hash>] Array with all the client arguments including hash with all + # the settings required by Kafka.new method + # @note We return array, so we can inject any arguments we want, in case of changes in the + # raw driver def client(_consumer_group) # This one is a default that takes all the settings except special # cases defined in the map settings = { logger: ::Karafka.logger, @@ -31,31 +34,36 @@ next if AttributesMap.config_adapter.values.flatten.include?(setting_name) settings[setting_name] = setting_value end - sanitize(settings) + settings_hash = sanitize(settings) + + # Normalization for the way Kafka::Client accepts arguments from 0.5.3 + [settings_hash.delete(:seed_brokers), settings_hash] end # Builds all the configuration settings for kafka#consumer method # @param consumer_group [Karafka::Routing::ConsumerGroup] consumer group details - # @return [Hash] hash with all the settings required by Kafka#consumer method + # @return [Array<Hash>] array with all the consumer arguments including hash with all + # the settings required by Kafka#consumer def consumer(consumer_group) settings = { group_id: consumer_group.id } settings = fetch_for(:consumer, consumer_group, settings) - sanitize(settings) + [sanitize(settings)] end # Builds all the configuration settings for kafka consumer consume_each_batch and # consume_each_message methods # @param consumer_group [Karafka::Routing::ConsumerGroup] consumer group details - # @return [Hash] hash with all the settings required by + # @return [Array<Hash>] Array with all the arguments required by consuming method + # including hash with all the settings required by # Kafka::Consumer#consume_each_message and Kafka::Consumer#consume_each_batch method def consuming(consumer_group) settings = { automatically_mark_as_processed: consumer_group.automatically_mark_as_consumed } - sanitize(fetch_for(:consuming, consumer_group, settings)) + [sanitize(fetch_for(:consuming, consumer_group, settings))] end # Builds all the configuration settings for kafka consumer#subscribe method # @param topic [Karafka::Routing::Topic] topic that holds details for a given subscription # @return [Hash] hash with all the settings required by kafka consumer#subscribe method