lib/rdkafka/config.rb in rdkafka-0.1.11 vs lib/rdkafka/config.rb in rdkafka-0.2.0

- old
+ new

@@ -3,60 +3,106 @@ module Rdkafka # Configuration for a Kafka consumer or producer. You can create an instance and use # the consumer and producer methods to create a client. Documentation of the available # configuration options is available on https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. class Config + # @private @@logger = Logger.new(STDOUT) + # Returns the current logger, by default this is a logger to stdout. + # + # @return [Logger] def self.logger @@logger end + # Set the logger that will be used for all logging output by this library. + # + # @param logger [Logger] The logger to be used + # + # @return [nil] def self.logger=(logger) + raise NoLoggerError if logger.nil? @@logger=logger end + # Default config that can be overwritten. DEFAULT_CONFIG = { # Request api version so advanced features work :"api.version.request" => true }.freeze + # Required config that cannot be overwritten. REQUIRED_CONFIG = { - # Enable log queues so we get callbacks in our own threads + # Enable log queues so we get callbacks in our own Ruby threads :"log.queue" => true }.freeze + # Returns a new config with the provided options which are merged with {DEFAULT_CONFIG}. + # + # @param config_hash [Hash<String,Symbol => String>] The config options for rdkafka + # + # @return [Config] def initialize(config_hash = {}) @config_hash = DEFAULT_CONFIG.merge(config_hash) end + # Set a config option. + # + # @param key [String] The config option's key + # @param value [String] The config option's value + # + # @return [nil] def []=(key, value) @config_hash[key] = value end + # Get a config option with the specified key + # + # @param key [String] The config option's key + # + # @return [String, nil] The config option or `nil` if it is not present def [](key) @config_hash[key] end + # Create a consumer with this configuration. + # + # @raise [ConfigError] When the configuration contains invalid options + # @raise [ClientCreationError] When the native client cannot be created + # + # @return [Consumer] The created consumer def consumer kafka = native_kafka(native_config, :rd_kafka_consumer) # Redirect the main queue to the consumer Rdkafka::FFI.rd_kafka_poll_set_consumer(kafka) # Return consumer with Kafka client Rdkafka::Consumer.new(kafka) end + # Create a producer with this configuration. + # + # @raise [ConfigError] When the configuration contains invalid options + # @raise [ClientCreationError] When the native client cannot be created + # + # @return [Producer] The created producer def producer # Create Kafka config config = native_config # Set callback to receive delivery reports on config Rdkafka::FFI.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::FFI::DeliveryCallback) # Return producer with Kafka client Rdkafka::Producer.new(native_kafka(config, :rd_kafka_producer)) end + # Error that is returned by the underlying rdkafka error if an invalid configuration option is present. class ConfigError < RuntimeError; end + + # Error that is returned by the underlying rdkafka library if the client cannot be created. class ClientCreationError < RuntimeError; end + + # Error that is raised when trying to set a nil logger + class NoLoggerError < RuntimeError; end private # This method is only intented to be used to create a client, # using it in another way will leak memory.