lib/deimos/utils/db_producer.rb in deimos-ruby-1.16.3 vs lib/deimos/utils/db_producer.rb in deimos-ruby-1.16.4
- old
+ new
@@ -6,27 +6,31 @@
# in the database and sends Kafka messages.
class DbProducer
include Phobos::Producer
attr_accessor :id, :current_topic
+ # @return [Integer]
BATCH_SIZE = 1000
+ # @return [Integer]
DELETE_BATCH_SIZE = 10
+ # @return [Integer]
MAX_DELETE_ATTEMPTS = 3
# @param logger [Logger]
def initialize(logger=Logger.new(STDOUT))
@id = SecureRandom.uuid
@logger = logger
@logger.push_tags("DbProducer #{@id}") if @logger.respond_to?(:push_tags)
end
- # @return [Deimos::DbProducerConfig]
+ # @return [FigTree]
def config
Deimos.config.db_producer
end
# Start the poll.
+ # @return [void]
def start
@logger.info('Starting...')
@signal_to_stop = false
ActiveRecord::Base.connection.reconnect!
loop do
@@ -38,16 +42,18 @@
process_next_messages
end
end
# Stop the poll.
+ # @return [void]
def stop
@logger.info('Received signal to stop')
@signal_to_stop = true
end
# Complete one loop of processing all messages in the DB.
+ # @return [void]
def process_next_messages
topics = retrieve_topics
@logger.info("Found topics: #{topics}")
topics.each(&method(:process_topic))
KafkaTopicInfo.ping_empty_topics(topics)
@@ -78,10 +84,11 @@
KafkaTopicInfo.register_error(@current_topic, @id)
shutdown_producer
end
# Process a single batch in a topic.
+ # @return [void]
def process_topic_batch
messages = retrieve_messages
return false if messages.empty?
batch_size = messages.size
@@ -109,10 +116,11 @@
send_pending_metrics
true
end
# @param messages [Array<Deimos::KafkaMessage>]
+ # @return [void]
def delete_messages(messages)
attempts = 1
begin
messages.in_groups_of(DELETE_BATCH_SIZE, false).each do |batch|
Deimos::KafkaMessage.where(topic: batch.first.topic,
@@ -135,20 +143,22 @@
def retrieve_messages
KafkaMessage.where(topic: @current_topic).order(:id).limit(BATCH_SIZE)
end
# @param messages [Array<Deimos::KafkaMessage>]
+ # @return [void]
def log_messages(messages)
return if config.log_topics != :all && !config.log_topics.include?(@current_topic)
@logger.debug do
decoded_messages = Deimos::KafkaMessage.decoded(messages)
"DB producer: Topic #{@current_topic} Producing messages: #{decoded_messages}}"
end
end
- # Send metrics to Datadog.
+ # Send metrics related to pending messages.
+ # @return [void]
def send_pending_metrics
metrics = Deimos.config.metrics
return unless metrics
topics = KafkaTopicInfo.select(%w(topic last_processed_at))
@@ -183,18 +193,20 @@
end
# Shut down the sync producer if we have to. Phobos will automatically
# create a new one. We should call this if the producer can be in a bad
# state and e.g. we need to clear the buffer.
+ # @return [void]
def shutdown_producer
if self.class.producer.respond_to?(:sync_producer_shutdown) # Phobos 1.8.3
self.class.producer.sync_producer_shutdown
end
end
# Produce messages in batches, reducing the size 1/10 if the batch is too
# large. Does not retry batches of messages that have already been sent.
# @param batch [Array<Hash>]
+ # @return [void]
def produce_messages(batch)
batch_size = batch.size
current_index = 0
begin
batch[current_index..-1].in_groups_of(batch_size, false).each do |group|