lib/deimos/utils/db_producer.rb in deimos-ruby-1.1.0.pre.beta2 vs lib/deimos/utils/db_producer.rb in deimos-ruby-1.2.0.pre.beta1

- old
+ new

@@ -14,10 +14,15 @@ @id = SecureRandom.uuid @logger = logger @logger.push_tags("DbProducer #{@id}") if @logger.respond_to?(:push_tags) end + # @return [Deimos::DbProducerConfig] + def config + Deimos.config.db_producer + end + # Start the poll. def start @logger.info('Starting...') @signal_to_stop = false ActiveRecord::Base.connection.reconnect! @@ -58,48 +63,76 @@ unless KafkaTopicInfo.lock(topic, @id) @logger.debug("Could not lock topic #{topic} - continuing") return end @current_topic = topic - messages = retrieve_messages - while messages.any? - @logger.debug do - decoder = messages.first.decoder - "DB producer: Topic #{topic} Producing messages: #{messages.map { |m| m.decoded_message(decoder) }}" - end - Deimos.instrument('db_producer.produce', topic: topic, messages: messages) do - begin - produce_messages(messages.map(&:phobos_message)) - rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge, Kafka::RecordListTooLarge - messages.each(&:delete) - raise - end - end - messages.first.class.where(id: messages.map(&:id)).delete_all - break if messages.size < BATCH_SIZE + loop { break unless process_topic_batch } - KafkaTopicInfo.heartbeat(@current_topic, @id) # keep alive - send_pending_metrics - messages = retrieve_messages - end KafkaTopicInfo.clear_lock(@current_topic, @id) rescue StandardError => e @logger.error("Error processing messages for topic #{@current_topic}: #{e.class.name}: #{e.message} #{e.backtrace.join("\n")}") KafkaTopicInfo.register_error(@current_topic, @id) end + # Process a single batch in a topic. + def process_topic_batch + messages = retrieve_messages + return false if messages.empty? + + batch_size = messages.size + compacted_messages = compact_messages(messages) + log_messages(compacted_messages) + Deimos.instrument('db_producer.produce', topic: @current_topic, messages: compacted_messages) do + begin + produce_messages(compacted_messages.map(&:phobos_message)) + rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge, Kafka::RecordListTooLarge + Deimos::KafkaMessage.where(id: messages.map(&:id)).delete_all + @logger.error('Message batch too large, deleting...') + @logger.error(Deimos::KafkaMessage.decoded(messages)) + raise + end + end + Deimos::KafkaMessage.where(id: messages.map(&:id)).delete_all + return false if batch_size < BATCH_SIZE + + KafkaTopicInfo.heartbeat(@current_topic, @id) # keep alive + send_pending_metrics + true + end + # @return [Array<Deimos::KafkaMessage>] def retrieve_messages KafkaMessage.where(topic: @current_topic).order(:id).limit(BATCH_SIZE) end + # @param messages [Array<Deimos::KafkaMessage>] + def log_messages(messages) + return if config.log_topics != :all && !config.log_topics.include?(@current_topic) + + @logger.debug do + decoded_messages = Deimos::KafkaMessage.decoded(messages) + "DB producer: Topic #{@current_topic} Producing messages: #{decoded_messages}}" + end + end + # Send metrics to Datadog. def send_pending_metrics - first_message = KafkaMessage.first - time_diff = first_message ? Time.zone.now - KafkaMessage.first.created_at : 0 - Deimos.config.metrics&.gauge('pending_db_messages_max_wait', time_diff) + metrics = Deimos.config.metrics + return unless metrics + + messages = Deimos::KafkaMessage. + select('count(*) as num_messages, min(created_at) as earliest, topic'). + group(:topic) + if messages.none? + metrics.gauge('pending_db_messages_max_wait', 0) + end + messages.each do |record| + time_diff = Time.zone.now - record.earliest + metrics.gauge('pending_db_messages_max_wait', time_diff, + tags: ["topic:#{record.topic}"]) + end end # Shut down the sync producer if we have to. Phobos will automatically # create a new one. We should call this if the producer can be in a bad # state and e.g. we need to clear the buffer. @@ -137,9 +170,21 @@ batch_size /= 10 end shutdown_producer retry end + end + + # @param batch [Array<Deimos::KafkaMessage>] + # @return [Array<Deimos::KafkaMessage>] + def compact_messages(batch) + return batch unless batch.first&.key.present? + + topic = batch.first.topic + return batch if config.compact_topics != :all && + !config.compact_topics.include?(topic) + + batch.reverse.uniq!(&:key).reverse! end end end end