lib/kafka/producer.rb in ruby-kafka-0.3.2 vs lib/kafka/producer.rb in ruby-kafka-0.3.3

- old
+ new

@@ -128,13 +128,14 @@ # producer.shutdown # end # class Producer - def initialize(cluster:, logger:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:) + def initialize(cluster:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:) @cluster = cluster @logger = logger + @instrumenter = instrumenter @required_acks = required_acks @ack_timeout = ack_timeout @max_retries = max_retries @retry_backoff = retry_backoff @max_buffer_size = max_buffer_size @@ -201,11 +202,11 @@ end @target_topics.add(topic) @pending_message_queue.write(message) - Instrumentation.instrument("produce_message.producer.kafka", { + @instrumenter.instrument("produce_message.producer", { value: value, key: key, topic: topic, create_time: create_time, buffer_size: buffer_size, @@ -226,11 +227,11 @@ # @return [nil] def deliver_messages # There's no need to do anything if the buffer is empty. return if buffer_size == 0 - Instrumentation.instrument("deliver_messages.producer.kafka") do |notification| + @instrumenter.instrument("deliver_messages.producer") do |notification| message_count = buffer_size notification[:message_count] = message_count notification[:attempts] = 0 @@ -251,10 +252,18 @@ def buffer_bytesize @pending_message_queue.bytesize + @buffer.bytesize end + # Deletes all buffered messages. + # + # @return [nil] + def clear_buffer + @buffer.clear + @pending_message_queue.clear + end + # Closes all connections to the brokers. # # @return [nil] def shutdown @cluster.disconnect @@ -272,10 +281,11 @@ buffer: @buffer, required_acks: @required_acks, ack_timeout: @ack_timeout, compressor: @compressor, logger: @logger, + instrumenter: @instrumenter, ) loop do attempt += 1 @@ -284,10 +294,17 @@ @cluster.refresh_metadata_if_necessary! assign_partitions! operation.execute + if @required_acks.zero? + # No response is returned by the brokers, so we can't know which messages + # have been successfully written. Our only option is to assume that they all + # have. + @buffer.clear + end + if buffer_size.zero? break elsif attempt <= @max_retries @logger.warn "Failed to send all messages; attempting retry #{attempt} of #{@max_retries} after #{@retry_backoff}s" @@ -296,14 +313,11 @@ @logger.error "Failed to send all messages; keeping remaining messages in buffer" break end end - if @required_acks == 0 - # No response is returned by the brokers, so we can't know which messages - # have been successfully written. Our only option is to assume that they all - # have. - @buffer.clear + unless @pending_message_queue.empty? + raise DeliveryFailed, "Failed to assign partitions to #{@pending_message_queue.size} messages" end unless @buffer.empty? partitions = @buffer.map {|topic, partition, _| "#{topic}/#{partition}" }.join(", ")