lib/deimos/kafka_topic_info.rb in deimos-ruby-1.16.3 vs lib/deimos/kafka_topic_info.rb in deimos-ruby-1.16.4
- old
+ new
@@ -48,10 +48,11 @@
# This is called once a producer is finished working on a topic, i.e.
# there are no more messages to fetch. It unlocks the topic and
# moves on to the next one.
# @param topic [String]
# @param lock_id [String]
+ # @return [void]
def clear_lock(topic, lock_id)
self.where(topic: topic, locked_by: lock_id).
update_all(locked_by: nil,
locked_at: nil,
error: false,
@@ -64,10 +65,11 @@
# because the point is that at least within a few milliseconds of each
# other, it wasn't locked and had no messages, meaning the topic
# was in a good state.
# @param except_topics [Array<String>] the list of topics we've just
# realized had messages in them, meaning all other topics were empty.
+ # @return [void]
def ping_empty_topics(except_topics)
records = KafkaTopicInfo.where(locked_by: nil).
where('topic not in(?)', except_topics)
records.each do |info|
info.update_attribute(:last_processed_at, Time.zone.now)
@@ -77,10 +79,11 @@
# The producer calls this if it gets an error sending messages. This
# essentially locks down this topic for 1 minute (for all producers)
# and allows the caller to continue to the next topic.
# @param topic [String]
# @param lock_id [String]
+ # @return [void]
def register_error(topic, lock_id)
record = self.where(topic: topic, locked_by: lock_id).last
attr_hash = { locked_by: nil,
locked_at: Time.zone.now,
error: true,
@@ -91,9 +94,10 @@
# Update the locked_at timestamp to indicate that the producer is still
# working on those messages and to continue.
# @param topic [String]
# @param lock_id [String]
+ # @return [void]
def heartbeat(topic, lock_id)
self.where(topic: topic, locked_by: lock_id).
update_all(locked_at: Time.zone.now)
end
end