lib/kafka/producer.rb in ruby-kafka-0.2.0 vs lib/kafka/producer.rb in ruby-kafka-0.3.0
- old
+ new
@@ -128,57 +128,19 @@
# producer.shutdown
# end
#
class Producer
- # Initializes a new Producer.
- #
- # @param cluster [Cluster] the cluster client. Typically passed in for you.
- #
- # @param logger [Logger] the logger that should be used. Typically passed
- # in for you.
- #
- # @param ack_timeout [Integer] The number of seconds a broker can wait for
- # replicas to acknowledge a write before responding with a timeout.
- #
- # @param required_acks [Integer] The number of replicas that must acknowledge
- # a write.
- #
- # @param max_retries [Integer] the number of retries that should be attempted
- # before giving up sending messages to the cluster. Does not include the
- # original attempt.
- #
- # @param retry_backoff [Integer] the number of seconds to wait between retries.
- #
- # @param max_buffer_size [Integer] the number of messages allowed in the buffer
- # before new writes will raise {BufferOverflow} exceptions.
- #
- # @param max_buffer_bytesize [Integer] the maximum size of the buffer in bytes.
- # attempting to produce messages when the buffer reaches this size will
- # result in {BufferOverflow} being raised.
- #
- # @param compression_codec [Symbol, nil] the name of the compression codec to
- # use, or nil if no compression should be performed. Valid codecs: `:snappy`
- # and `:gzip`.
- #
- # @param compression_threshold [Integer] the number of messages that needs to
- # be in a message set before it should be compressed. Note that message sets
- # are per-partition rather than per-topic or per-producer.
- #
- def initialize(cluster:, logger:, compression_codec: nil, compression_threshold: 1, ack_timeout: 5, required_acks: 1, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000, max_buffer_bytesize: 10_000_000)
+ def initialize(cluster:, logger:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:)
@cluster = cluster
@logger = logger
@required_acks = required_acks
@ack_timeout = ack_timeout
@max_retries = max_retries
@retry_backoff = retry_backoff
@max_buffer_size = max_buffer_size
@max_buffer_bytesize = max_buffer_bytesize
-
- @compressor = Compressor.new(
- codec_name: @compression_codec,
- threshold: @compression_threshold,
- )
+ @compressor = compressor
# The set of topics that are produced to.
@target_topics = Set.new
# A buffer organized by topic/partition.