lib/deimos/utils/db_producer.rb in deimos-ruby-1.8.2 vs lib/deimos/utils/db_producer.rb in deimos-ruby-1.8.3
- old
+ new
@@ -188,21 +188,25 @@
if self.class.producer.respond_to?(:sync_producer_shutdown) # Phobos 1.8.3
self.class.producer.sync_producer_shutdown
end
end
+ # Produce messages in batches, reducing the size 1/10 if the batch is too
+ # large. Does not retry batches of messages that have already been sent.
# @param batch [Array<Hash>]
def produce_messages(batch)
batch_size = batch.size
+ current_index = 0
begin
- batch.in_groups_of(batch_size, false).each do |group|
+ batch[current_index..-1].in_groups_of(batch_size, false).each do |group|
@logger.debug("Publishing #{group.size} messages to #{@current_topic}")
producer.publish_list(group)
Deimos.config.metrics&.increment(
'publish',
tags: %W(status:success topic:#{@current_topic}),
by: group.size
)
+ current_index += group.size
@logger.info("Sent #{group.size} messages to #{@current_topic}")
end
rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge,
Kafka::RecordListTooLarge => e
if batch_size == 1