lib/deimos/utils/db_producer.rb in deimos-kafka-1.0.0.pre.beta19 vs lib/deimos/utils/db_producer.rb in deimos-kafka-1.0.0.pre.beta20
- old
+ new
@@ -79,17 +79,32 @@
KafkaMessage.where(topic: @current_topic).order(:id).limit(BATCH_SIZE)
end
# @param batch [Array<Hash>]
def produce_messages(batch)
- @logger.debug("Publishing #{batch.size} messages to #{@current_topic}")
- producer.publish_list(batch)
- Deimos.config.metrics&.increment(
- 'publish',
- tags: %W(status:success topic:#{@current_topic}),
- by: batch.size
- )
- @logger.info("Sent #{batch.size} messages to #{@current_topic}")
+ batch_size = batch.size
+ begin
+ batch.in_groups_of(batch_size, false).each do |group|
+ @logger.debug("Publishing #{group.size} messages to #{@current_topic}")
+ producer.publish_list(group)
+ Deimos.config.metrics&.increment(
+ 'publish',
+ tags: %W(status:success topic:#{@current_topic}),
+ by: group.size
+ )
+ @logger.info("Sent #{group.size} messages to #{@current_topic}")
+ end
+ rescue Kafka::BufferOverflow
+ raise if batch_size == 1
+
+ @logger.error("Buffer overflow when publishing #{batch.size} in groups of #{batch_size}, retrying...")
+ if batch_size < 10
+ batch_size = 1
+ else
+ batch_size /= 10
+ end
+ retry
+ end
end
end
end
end