lib/deimos/utils/db_producer.rb in deimos-kafka-1.0.0.pre.beta19 vs lib/deimos/utils/db_producer.rb in deimos-kafka-1.0.0.pre.beta20

- old
+ new

@@ -79,17 +79,32 @@ KafkaMessage.where(topic: @current_topic).order(:id).limit(BATCH_SIZE) end # @param batch [Array<Hash>] def produce_messages(batch) - @logger.debug("Publishing #{batch.size} messages to #{@current_topic}") - producer.publish_list(batch) - Deimos.config.metrics&.increment( - 'publish', - tags: %W(status:success topic:#{@current_topic}), - by: batch.size - ) - @logger.info("Sent #{batch.size} messages to #{@current_topic}") + batch_size = batch.size + begin + batch.in_groups_of(batch_size, false).each do |group| + @logger.debug("Publishing #{group.size} messages to #{@current_topic}") + producer.publish_list(group) + Deimos.config.metrics&.increment( + 'publish', + tags: %W(status:success topic:#{@current_topic}), + by: group.size + ) + @logger.info("Sent #{group.size} messages to #{@current_topic}") + end + rescue Kafka::BufferOverflow + raise if batch_size == 1 + + @logger.error("Buffer overflow when publishing #{batch.size} in groups of #{batch_size}, retrying...") + if batch_size < 10 + batch_size = 1 + else + batch_size /= 10 + end + retry + end end end end end