lib/kafka/producer.rb in ruby-kafka-0.7.5 vs lib/kafka/producer.rb in ruby-kafka-0.7.6.beta1

- old
+ new

@@ -128,11 +128,11 @@ class AbortTransaction < StandardError; end def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:) @cluster = cluster @transaction_manager = transaction_manager - @logger = logger + @logger = TaggedLogger.new(logger) @instrumenter = instrumenter @required_acks = required_acks == :all ? -1 : required_acks @ack_timeout = ack_timeout @max_retries = max_retries @retry_backoff = retry_backoff @@ -148,10 +148,14 @@ # Messages added by `#produce` but not yet assigned a partition. @pending_message_queue = PendingMessageQueue.new end + def to_s + "Producer #{@target_topics.to_a.join(', ')}" + end + # Produces a message to the specified topic. Note that messages are buffered in # the producer until {#deliver_messages} is called. # # ## Partitioning # @@ -203,11 +207,11 @@ end # If the producer is in transactional mode, all the message production # must be used when the producer is currently in transaction if @transaction_manager.transactional? && !@transaction_manager.in_transaction? - raise 'You must trigger begin_transaction before producing messages' + raise "Cannot produce to #{topic}: You must trigger begin_transaction before producing messages" end @target_topics.add(topic) @pending_message_queue.write(message) @@ -389,15 +393,15 @@ end if buffer_size.zero? break elsif attempt <= @max_retries - @logger.warn "Failed to send all messages; attempting retry #{attempt} of #{@max_retries} after #{@retry_backoff}s" + @logger.warn "Failed to send all messages to #{pretty_partitions}; attempting retry #{attempt} of #{@max_retries} after #{@retry_backoff}s" sleep @retry_backoff else - @logger.error "Failed to send all messages; keeping remaining messages in buffer" + @logger.error "Failed to send all messages to #{pretty_partitions}; keeping remaining messages in buffer" break end end unless @pending_message_queue.empty? @@ -405,13 +409,15 @@ @cluster.mark_as_stale! raise DeliveryFailed.new("Failed to assign partitions to #{@pending_message_queue.size} messages", buffer_messages) end unless @buffer.empty? - partitions = @buffer.map {|topic, partition, _| "#{topic}/#{partition}" }.join(", ") - - raise DeliveryFailed.new("Failed to send messages to #{partitions}", buffer_messages) + raise DeliveryFailed.new("Failed to send messages to #{pretty_partitions}", buffer_messages) end + end + + def pretty_partitions + @buffer.map {|topic, partition, _| "#{topic}/#{partition}" }.join(", ") end def assign_partitions! failed_messages = [] topics_with_failures = Set.new