lib/deimos/kafka_topic_info.rb in deimos-ruby-1.8.0.pre.beta2 vs lib/deimos/kafka_topic_info.rb in deimos-ruby-1.8.1.pre.beta1
- old
+ new
@@ -11,11 +11,11 @@
# @param lock_id [String]
# @return [Boolean]
def lock(topic, lock_id)
# Try to create it - it's fine if it already exists
begin
- self.create(topic: topic)
+ self.create(topic: topic, last_processed_at: Time.zone.now)
rescue ActiveRecord::RecordNotUnique
# continue on
end
# Lock the record
@@ -50,10 +50,29 @@
# moves on to the next one.
# @param topic [String]
# @param lock_id [String]
def clear_lock(topic, lock_id)
self.where(topic: topic, locked_by: lock_id).
- update_all(locked_by: nil, locked_at: nil, error: false, retries: 0)
+ update_all(locked_by: nil,
+ locked_at: nil,
+ error: false,
+ retries: 0,
+ last_processed_at: Time.zone.now)
+ end
+
+ # Update all topics that aren't currently locked and have no messages
+ # waiting. It's OK if some messages get inserted in the middle of this
+ # because the point is that at least within a few milliseconds of each
+ # other, it wasn't locked and had no messages, meaning the topic
+ # was in a good state.
+ # @param except_topics [Array<String>] the list of topics we've just
+ # realized had messages in them, meaning all other topics were empty.
+ def ping_empty_topics(except_topics)
+ records = KafkaTopicInfo.where(locked_by: nil).
+ where('topic not in(?)', except_topics)
+ records.each do |info|
+ info.update_attribute(:last_processed_at, Time.zone.now)
+ end
end
# The producer calls this if it gets an error sending messages. This
# essentially locks down this topic for 1 minute (for all producers)
# and allows the caller to continue to the next topic.