lib/deimos/kafka_topic_info.rb in deimos-ruby-1.8.0.pre.beta2 vs lib/deimos/kafka_topic_info.rb in deimos-ruby-1.8.1.pre.beta1

- old
+ new

@@ -11,11 +11,11 @@ # @param lock_id [String] # @return [Boolean] def lock(topic, lock_id) # Try to create it - it's fine if it already exists begin - self.create(topic: topic) + self.create(topic: topic, last_processed_at: Time.zone.now) rescue ActiveRecord::RecordNotUnique # continue on end # Lock the record @@ -50,10 +50,29 @@ # moves on to the next one. # @param topic [String] # @param lock_id [String] def clear_lock(topic, lock_id) self.where(topic: topic, locked_by: lock_id). - update_all(locked_by: nil, locked_at: nil, error: false, retries: 0) + update_all(locked_by: nil, + locked_at: nil, + error: false, + retries: 0, + last_processed_at: Time.zone.now) + end + + # Update all topics that aren't currently locked and have no messages + # waiting. It's OK if some messages get inserted in the middle of this + # because the point is that at least within a few milliseconds of each + # other, it wasn't locked and had no messages, meaning the topic + # was in a good state. + # @param except_topics [Array<String>] the list of topics we've just + # realized had messages in them, meaning all other topics were empty. + def ping_empty_topics(except_topics) + records = KafkaTopicInfo.where(locked_by: nil). + where('topic not in(?)', except_topics) + records.each do |info| + info.update_attribute(:last_processed_at, Time.zone.now) + end end # The producer calls this if it gets an error sending messages. This # essentially locks down this topic for 1 minute (for all producers) # and allows the caller to continue to the next topic.