lib/deimos/kafka_topic_info.rb in deimos-ruby-1.16.3 vs lib/deimos/kafka_topic_info.rb in deimos-ruby-1.16.4

- old
+ new

@@ -48,10 +48,11 @@ # This is called once a producer is finished working on a topic, i.e. # there are no more messages to fetch. It unlocks the topic and # moves on to the next one. # @param topic [String] # @param lock_id [String] + # @return [void] def clear_lock(topic, lock_id) self.where(topic: topic, locked_by: lock_id). update_all(locked_by: nil, locked_at: nil, error: false, @@ -64,10 +65,11 @@ # because the point is that at least within a few milliseconds of each # other, it wasn't locked and had no messages, meaning the topic # was in a good state. # @param except_topics [Array<String>] the list of topics we've just # realized had messages in them, meaning all other topics were empty. + # @return [void] def ping_empty_topics(except_topics) records = KafkaTopicInfo.where(locked_by: nil). where('topic not in(?)', except_topics) records.each do |info| info.update_attribute(:last_processed_at, Time.zone.now) @@ -77,10 +79,11 @@ # The producer calls this if it gets an error sending messages. This # essentially locks down this topic for 1 minute (for all producers) # and allows the caller to continue to the next topic. # @param topic [String] # @param lock_id [String] + # @return [void] def register_error(topic, lock_id) record = self.where(topic: topic, locked_by: lock_id).last attr_hash = { locked_by: nil, locked_at: Time.zone.now, error: true, @@ -91,9 +94,10 @@ # Update the locked_at timestamp to indicate that the producer is still # working on those messages and to continue. # @param topic [String] # @param lock_id [String] + # @return [void] def heartbeat(topic, lock_id) self.where(topic: topic, locked_by: lock_id). update_all(locked_at: Time.zone.now) end end