lib/deimos/utils/db_producer.rb in deimos-ruby-1.8.2 vs lib/deimos/utils/db_producer.rb in deimos-ruby-1.8.3

- old
+ new

@@ -188,21 +188,25 @@ if self.class.producer.respond_to?(:sync_producer_shutdown) # Phobos 1.8.3 self.class.producer.sync_producer_shutdown end end + # Produce messages in batches, reducing the size 1/10 if the batch is too + # large. Does not retry batches of messages that have already been sent. # @param batch [Array<Hash>] def produce_messages(batch) batch_size = batch.size + current_index = 0 begin - batch.in_groups_of(batch_size, false).each do |group| + batch[current_index..-1].in_groups_of(batch_size, false).each do |group| @logger.debug("Publishing #{group.size} messages to #{@current_topic}") producer.publish_list(group) Deimos.config.metrics&.increment( 'publish', tags: %W(status:success topic:#{@current_topic}), by: group.size ) + current_index += group.size @logger.info("Sent #{group.size} messages to #{@current_topic}") end rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge, Kafka::RecordListTooLarge => e if batch_size == 1