lib/deimos/utils/db_producer.rb in deimos-ruby-1.16.3 vs lib/deimos/utils/db_producer.rb in deimos-ruby-1.16.4

- old
+ new

@@ -6,27 +6,31 @@ # in the database and sends Kafka messages. class DbProducer include Phobos::Producer attr_accessor :id, :current_topic + # @return [Integer] BATCH_SIZE = 1000 + # @return [Integer] DELETE_BATCH_SIZE = 10 + # @return [Integer] MAX_DELETE_ATTEMPTS = 3 # @param logger [Logger] def initialize(logger=Logger.new(STDOUT)) @id = SecureRandom.uuid @logger = logger @logger.push_tags("DbProducer #{@id}") if @logger.respond_to?(:push_tags) end - # @return [Deimos::DbProducerConfig] + # @return [FigTree] def config Deimos.config.db_producer end # Start the poll. + # @return [void] def start @logger.info('Starting...') @signal_to_stop = false ActiveRecord::Base.connection.reconnect! loop do @@ -38,16 +42,18 @@ process_next_messages end end # Stop the poll. + # @return [void] def stop @logger.info('Received signal to stop') @signal_to_stop = true end # Complete one loop of processing all messages in the DB. + # @return [void] def process_next_messages topics = retrieve_topics @logger.info("Found topics: #{topics}") topics.each(&method(:process_topic)) KafkaTopicInfo.ping_empty_topics(topics) @@ -78,10 +84,11 @@ KafkaTopicInfo.register_error(@current_topic, @id) shutdown_producer end # Process a single batch in a topic. + # @return [void] def process_topic_batch messages = retrieve_messages return false if messages.empty? batch_size = messages.size @@ -109,10 +116,11 @@ send_pending_metrics true end # @param messages [Array<Deimos::KafkaMessage>] + # @return [void] def delete_messages(messages) attempts = 1 begin messages.in_groups_of(DELETE_BATCH_SIZE, false).each do |batch| Deimos::KafkaMessage.where(topic: batch.first.topic, @@ -135,20 +143,22 @@ def retrieve_messages KafkaMessage.where(topic: @current_topic).order(:id).limit(BATCH_SIZE) end # @param messages [Array<Deimos::KafkaMessage>] + # @return [void] def log_messages(messages) return if config.log_topics != :all && !config.log_topics.include?(@current_topic) @logger.debug do decoded_messages = Deimos::KafkaMessage.decoded(messages) "DB producer: Topic #{@current_topic} Producing messages: #{decoded_messages}}" end end - # Send metrics to Datadog. + # Send metrics related to pending messages. + # @return [void] def send_pending_metrics metrics = Deimos.config.metrics return unless metrics topics = KafkaTopicInfo.select(%w(topic last_processed_at)) @@ -183,18 +193,20 @@ end # Shut down the sync producer if we have to. Phobos will automatically # create a new one. We should call this if the producer can be in a bad # state and e.g. we need to clear the buffer. + # @return [void] def shutdown_producer if self.class.producer.respond_to?(:sync_producer_shutdown) # Phobos 1.8.3 self.class.producer.sync_producer_shutdown end end # Produce messages in batches, reducing the size 1/10 if the batch is too # large. Does not retry batches of messages that have already been sent. # @param batch [Array<Hash>] + # @return [void] def produce_messages(batch) batch_size = batch.size current_index = 0 begin batch[current_index..-1].in_groups_of(batch_size, false).each do |group|