lib/deimos/utils/db_producer.rb in deimos-ruby-1.8.0.pre.beta2 vs lib/deimos/utils/db_producer.rb in deimos-ruby-1.8.1.pre.beta1

- old
+ new

@@ -7,10 +7,12 @@ class DbProducer include Phobos::Producer attr_accessor :id, :current_topic BATCH_SIZE = 1000 + DELETE_BATCH_SIZE = 10 + MAX_DELETE_ATTEMPTS = 3 # @param logger [Logger] def initialize(logger=Logger.new(STDOUT)) @id = SecureRandom.uuid @logger = logger @@ -46,10 +48,11 @@ # Complete one loop of processing all messages in the DB. def process_next_messages topics = retrieve_topics @logger.info("Found topics: #{topics}") topics.each(&method(:process_topic)) + KafkaTopicInfo.ping_empty_topics(topics) sleep(0.5) end # @return [Array<String>] def retrieve_topics @@ -85,17 +88,17 @@ log_messages(compacted_messages) Deimos.instrument('db_producer.produce', topic: @current_topic, messages: compacted_messages) do begin produce_messages(compacted_messages.map(&:phobos_message)) rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge, Kafka::RecordListTooLarge + delete_messages(messages) @logger.error('Message batch too large, deleting...') @logger.error(Deimos::KafkaMessage.decoded(messages)) - Deimos::KafkaMessage.where(id: messages.map(&:id)).delete_all raise end end - Deimos::KafkaMessage.where(id: messages.map(&:id)).delete_all + delete_messages(messages) Deimos.config.metrics&.increment( 'db_producer.process', tags: %W(topic:#{@current_topic}), by: messages.size ) @@ -104,10 +107,31 @@ KafkaTopicInfo.heartbeat(@current_topic, @id) # keep alive send_pending_metrics true end + # @param messages [Array<Deimos::KafkaMessage>] + def delete_messages(messages) + attempts = 1 + begin + messages.in_groups_of(DELETE_BATCH_SIZE, false).each do |batch| + Deimos::KafkaMessage.where(topic: batch.first.topic, + id: batch.map(&:id)). + delete_all + end + rescue StandardError => e + if (e.message =~ /Lock wait/i || e.message =~ /Lost connection/i) && + attempts <= MAX_DELETE_ATTEMPTS + attempts += 1 + ActiveRecord::Base.connection.verify! + sleep(1) + retry + end + raise + end + end + # @return [Array<Deimos::KafkaMessage>] def retrieve_messages KafkaMessage.where(topic: @current_topic).order(:id).limit(BATCH_SIZE) end @@ -124,23 +148,37 @@ # Send metrics to Datadog. def send_pending_metrics metrics = Deimos.config.metrics return unless metrics + topics = KafkaTopicInfo.select(%w(topic last_processed_at)) messages = Deimos::KafkaMessage. select('count(*) as num_messages, min(created_at) as earliest, topic'). - group(:topic) - if messages.none? - metrics.gauge('pending_db_messages_max_wait', 0) - end - messages.each do |record| - earliest = record.earliest - # SQLite gives a string here - earliest = Time.zone.parse(earliest) if earliest.is_a?(String) + group(:topic). + index_by(&:topic) + topics.each do |record| + message_record = messages[record.topic] + # We want to record the last time we saw any activity, meaning either + # the oldest message, or the last time we processed, whichever comes + # last. + if message_record + record_earliest = record.earliest + # SQLite gives a string here + if record_earliest.is_a?(String) + record_earliest = Time.zone.parse(record_earliest) + end - time_diff = Time.zone.now - earliest - metrics.gauge('pending_db_messages_max_wait', time_diff, + earliest = [record.last_processed_at, record_earliest].max + time_diff = Time.zone.now - earliest + metrics.gauge('pending_db_messages_max_wait', time_diff, + tags: ["topic:#{record.topic}"]) + else + # no messages waiting + metrics.gauge('pending_db_messages_max_wait', 0, + tags: ["topic:#{record.topic}"]) + end + metrics.gauge('pending_db_messages_count', message_record&.num_messages || 0, tags: ["topic:#{record.topic}"]) end end # Shut down the sync producer if we have to. Phobos will automatically @@ -172,23 +210,23 @@ shutdown_producer raise end @logger.error("Got error #{e.class.name} when publishing #{batch.size} in groups of #{batch_size}, retrying...") - if batch_size < 10 - batch_size = 1 - else - batch_size /= 10 - end + batch_size = if batch_size < 10 + 1 + else + (batch_size / 10).to_i + end shutdown_producer retry end end # @param batch [Array<Deimos::KafkaMessage>] # @return [Array<Deimos::KafkaMessage>] def compact_messages(batch) - return batch unless batch.first&.key.present? + return batch if batch.first&.key.blank? topic = batch.first.topic return batch if config.compact_topics != :all && !config.compact_topics.include?(topic)