lib/deimos/utils/db_producer.rb in deimos-ruby-1.8.0.pre.beta2 vs lib/deimos/utils/db_producer.rb in deimos-ruby-1.8.1.pre.beta1
- old
+ new
@@ -7,10 +7,12 @@
class DbProducer
include Phobos::Producer
attr_accessor :id, :current_topic
BATCH_SIZE = 1000
+ DELETE_BATCH_SIZE = 10
+ MAX_DELETE_ATTEMPTS = 3
# @param logger [Logger]
def initialize(logger=Logger.new(STDOUT))
@id = SecureRandom.uuid
@logger = logger
@@ -46,10 +48,11 @@
# Complete one loop of processing all messages in the DB.
def process_next_messages
topics = retrieve_topics
@logger.info("Found topics: #{topics}")
topics.each(&method(:process_topic))
+ KafkaTopicInfo.ping_empty_topics(topics)
sleep(0.5)
end
# @return [Array<String>]
def retrieve_topics
@@ -85,17 +88,17 @@
log_messages(compacted_messages)
Deimos.instrument('db_producer.produce', topic: @current_topic, messages: compacted_messages) do
begin
produce_messages(compacted_messages.map(&:phobos_message))
rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge, Kafka::RecordListTooLarge
+ delete_messages(messages)
@logger.error('Message batch too large, deleting...')
@logger.error(Deimos::KafkaMessage.decoded(messages))
- Deimos::KafkaMessage.where(id: messages.map(&:id)).delete_all
raise
end
end
- Deimos::KafkaMessage.where(id: messages.map(&:id)).delete_all
+ delete_messages(messages)
Deimos.config.metrics&.increment(
'db_producer.process',
tags: %W(topic:#{@current_topic}),
by: messages.size
)
@@ -104,10 +107,31 @@
KafkaTopicInfo.heartbeat(@current_topic, @id) # keep alive
send_pending_metrics
true
end
+ # @param messages [Array<Deimos::KafkaMessage>]
+ def delete_messages(messages)
+ attempts = 1
+ begin
+ messages.in_groups_of(DELETE_BATCH_SIZE, false).each do |batch|
+ Deimos::KafkaMessage.where(topic: batch.first.topic,
+ id: batch.map(&:id)).
+ delete_all
+ end
+ rescue StandardError => e
+ if (e.message =~ /Lock wait/i || e.message =~ /Lost connection/i) &&
+ attempts <= MAX_DELETE_ATTEMPTS
+ attempts += 1
+ ActiveRecord::Base.connection.verify!
+ sleep(1)
+ retry
+ end
+ raise
+ end
+ end
+
# @return [Array<Deimos::KafkaMessage>]
def retrieve_messages
KafkaMessage.where(topic: @current_topic).order(:id).limit(BATCH_SIZE)
end
@@ -124,23 +148,37 @@
# Send metrics to Datadog.
def send_pending_metrics
metrics = Deimos.config.metrics
return unless metrics
+ topics = KafkaTopicInfo.select(%w(topic last_processed_at))
messages = Deimos::KafkaMessage.
select('count(*) as num_messages, min(created_at) as earliest, topic').
- group(:topic)
- if messages.none?
- metrics.gauge('pending_db_messages_max_wait', 0)
- end
- messages.each do |record|
- earliest = record.earliest
- # SQLite gives a string here
- earliest = Time.zone.parse(earliest) if earliest.is_a?(String)
+ group(:topic).
+ index_by(&:topic)
+ topics.each do |record|
+ message_record = messages[record.topic]
+ # We want to record the last time we saw any activity, meaning either
+ # the oldest message, or the last time we processed, whichever comes
+ # last.
+ if message_record
+ record_earliest = record.earliest
+ # SQLite gives a string here
+ if record_earliest.is_a?(String)
+ record_earliest = Time.zone.parse(record_earliest)
+ end
- time_diff = Time.zone.now - earliest
- metrics.gauge('pending_db_messages_max_wait', time_diff,
+ earliest = [record.last_processed_at, record_earliest].max
+ time_diff = Time.zone.now - earliest
+ metrics.gauge('pending_db_messages_max_wait', time_diff,
+ tags: ["topic:#{record.topic}"])
+ else
+ # no messages waiting
+ metrics.gauge('pending_db_messages_max_wait', 0,
+ tags: ["topic:#{record.topic}"])
+ end
+ metrics.gauge('pending_db_messages_count', message_record&.num_messages || 0,
tags: ["topic:#{record.topic}"])
end
end
# Shut down the sync producer if we have to. Phobos will automatically
@@ -172,23 +210,23 @@
shutdown_producer
raise
end
@logger.error("Got error #{e.class.name} when publishing #{batch.size} in groups of #{batch_size}, retrying...")
- if batch_size < 10
- batch_size = 1
- else
- batch_size /= 10
- end
+ batch_size = if batch_size < 10
+ 1
+ else
+ (batch_size / 10).to_i
+ end
shutdown_producer
retry
end
end
# @param batch [Array<Deimos::KafkaMessage>]
# @return [Array<Deimos::KafkaMessage>]
def compact_messages(batch)
- return batch unless batch.first&.key.present?
+ return batch if batch.first&.key.blank?
topic = batch.first.topic
return batch if config.compact_topics != :all &&
!config.compact_topics.include?(topic)