lib/kafka/producer.rb in ruby-kafka-0.2.0 vs lib/kafka/producer.rb in ruby-kafka-0.3.0

- old
+ new

@@ -128,57 +128,19 @@ # producer.shutdown # end # class Producer - # Initializes a new Producer. - # - # @param cluster [Cluster] the cluster client. Typically passed in for you. - # - # @param logger [Logger] the logger that should be used. Typically passed - # in for you. - # - # @param ack_timeout [Integer] The number of seconds a broker can wait for - # replicas to acknowledge a write before responding with a timeout. - # - # @param required_acks [Integer] The number of replicas that must acknowledge - # a write. - # - # @param max_retries [Integer] the number of retries that should be attempted - # before giving up sending messages to the cluster. Does not include the - # original attempt. - # - # @param retry_backoff [Integer] the number of seconds to wait between retries. - # - # @param max_buffer_size [Integer] the number of messages allowed in the buffer - # before new writes will raise {BufferOverflow} exceptions. - # - # @param max_buffer_bytesize [Integer] the maximum size of the buffer in bytes. - # attempting to produce messages when the buffer reaches this size will - # result in {BufferOverflow} being raised. - # - # @param compression_codec [Symbol, nil] the name of the compression codec to - # use, or nil if no compression should be performed. Valid codecs: `:snappy` - # and `:gzip`. - # - # @param compression_threshold [Integer] the number of messages that needs to - # be in a message set before it should be compressed. Note that message sets - # are per-partition rather than per-topic or per-producer. - # - def initialize(cluster:, logger:, compression_codec: nil, compression_threshold: 1, ack_timeout: 5, required_acks: 1, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000, max_buffer_bytesize: 10_000_000) + def initialize(cluster:, logger:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:) @cluster = cluster @logger = logger @required_acks = required_acks @ack_timeout = ack_timeout @max_retries = max_retries @retry_backoff = retry_backoff @max_buffer_size = max_buffer_size @max_buffer_bytesize = max_buffer_bytesize - - @compressor = Compressor.new( - codec_name: @compression_codec, - threshold: @compression_threshold, - ) + @compressor = compressor # The set of topics that are produced to. @target_topics = Set.new # A buffer organized by topic/partition.