lib/deimos/utils/db_producer.rb in deimos-ruby-1.1.0.pre.beta2 vs lib/deimos/utils/db_producer.rb in deimos-ruby-1.2.0.pre.beta1
- old
+ new
@@ -14,10 +14,15 @@
@id = SecureRandom.uuid
@logger = logger
@logger.push_tags("DbProducer #{@id}") if @logger.respond_to?(:push_tags)
end
+ # @return [Deimos::DbProducerConfig]
+ def config
+ Deimos.config.db_producer
+ end
+
# Start the poll.
def start
@logger.info('Starting...')
@signal_to_stop = false
ActiveRecord::Base.connection.reconnect!
@@ -58,48 +63,76 @@
unless KafkaTopicInfo.lock(topic, @id)
@logger.debug("Could not lock topic #{topic} - continuing")
return
end
@current_topic = topic
- messages = retrieve_messages
- while messages.any?
- @logger.debug do
- decoder = messages.first.decoder
- "DB producer: Topic #{topic} Producing messages: #{messages.map { |m| m.decoded_message(decoder) }}"
- end
- Deimos.instrument('db_producer.produce', topic: topic, messages: messages) do
- begin
- produce_messages(messages.map(&:phobos_message))
- rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge, Kafka::RecordListTooLarge
- messages.each(&:delete)
- raise
- end
- end
- messages.first.class.where(id: messages.map(&:id)).delete_all
- break if messages.size < BATCH_SIZE
+ loop { break unless process_topic_batch }
- KafkaTopicInfo.heartbeat(@current_topic, @id) # keep alive
- send_pending_metrics
- messages = retrieve_messages
- end
KafkaTopicInfo.clear_lock(@current_topic, @id)
rescue StandardError => e
@logger.error("Error processing messages for topic #{@current_topic}: #{e.class.name}: #{e.message} #{e.backtrace.join("\n")}")
KafkaTopicInfo.register_error(@current_topic, @id)
end
+ # Process a single batch in a topic.
+ def process_topic_batch
+ messages = retrieve_messages
+ return false if messages.empty?
+
+ batch_size = messages.size
+ compacted_messages = compact_messages(messages)
+ log_messages(compacted_messages)
+ Deimos.instrument('db_producer.produce', topic: @current_topic, messages: compacted_messages) do
+ begin
+ produce_messages(compacted_messages.map(&:phobos_message))
+ rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge, Kafka::RecordListTooLarge
+ Deimos::KafkaMessage.where(id: messages.map(&:id)).delete_all
+ @logger.error('Message batch too large, deleting...')
+ @logger.error(Deimos::KafkaMessage.decoded(messages))
+ raise
+ end
+ end
+ Deimos::KafkaMessage.where(id: messages.map(&:id)).delete_all
+ return false if batch_size < BATCH_SIZE
+
+ KafkaTopicInfo.heartbeat(@current_topic, @id) # keep alive
+ send_pending_metrics
+ true
+ end
+
# @return [Array<Deimos::KafkaMessage>]
def retrieve_messages
KafkaMessage.where(topic: @current_topic).order(:id).limit(BATCH_SIZE)
end
+ # @param messages [Array<Deimos::KafkaMessage>]
+ def log_messages(messages)
+ return if config.log_topics != :all && !config.log_topics.include?(@current_topic)
+
+ @logger.debug do
+ decoded_messages = Deimos::KafkaMessage.decoded(messages)
+ "DB producer: Topic #{@current_topic} Producing messages: #{decoded_messages}}"
+ end
+ end
+
# Send metrics to Datadog.
def send_pending_metrics
- first_message = KafkaMessage.first
- time_diff = first_message ? Time.zone.now - KafkaMessage.first.created_at : 0
- Deimos.config.metrics&.gauge('pending_db_messages_max_wait', time_diff)
+ metrics = Deimos.config.metrics
+ return unless metrics
+
+ messages = Deimos::KafkaMessage.
+ select('count(*) as num_messages, min(created_at) as earliest, topic').
+ group(:topic)
+ if messages.none?
+ metrics.gauge('pending_db_messages_max_wait', 0)
+ end
+ messages.each do |record|
+ time_diff = Time.zone.now - record.earliest
+ metrics.gauge('pending_db_messages_max_wait', time_diff,
+ tags: ["topic:#{record.topic}"])
+ end
end
# Shut down the sync producer if we have to. Phobos will automatically
# create a new one. We should call this if the producer can be in a bad
# state and e.g. we need to clear the buffer.
@@ -137,9 +170,21 @@
batch_size /= 10
end
shutdown_producer
retry
end
+ end
+
+ # @param batch [Array<Deimos::KafkaMessage>]
+ # @return [Array<Deimos::KafkaMessage>]
+ def compact_messages(batch)
+ return batch unless batch.first&.key.present?
+
+ topic = batch.first.topic
+ return batch if config.compact_topics != :all &&
+ !config.compact_topics.include?(topic)
+
+ batch.reverse.uniq!(&:key).reverse!
end
end
end
end