lib/deimos/utils/db_poller.rb in deimos-ruby-1.16.3 vs lib/deimos/utils/db_poller.rb in deimos-ruby-1.16.4

- old
+ new

@@ -5,16 +5,19 @@ module Deimos module Utils # Class which continually polls the database and sends Kafka messages. class DbPoller + # @return [Integer] BATCH_SIZE = 1000 # Needed for Executor so it can identify the worker + # @return [Integer] attr_reader :id # Begin the DB Poller process. + # @return [void] def self.start! if Deimos.config.db_poller_objects.empty? raise('No pollers configured!') end @@ -26,11 +29,11 @@ logger: Deimos.config.logger) signal_handler = Sigurd::SignalHandler.new(executor) signal_handler.run! end - # @param config [Deimos::Configuration::ConfigStruct] + # @param config [FigTree::ConfigStruct] def initialize(config) @config = config @id = SecureRandom.hex begin @producer = @config.producer_class.constantize @@ -45,10 +48,11 @@ # Start the poll: # 1) Grab the current PollInfo from the database indicating the last # time we ran # 2) On a loop, process all the recent updates between the last time # we ran and now. + # @return [void] def start # Don't send asynchronously if Deimos.config.producers.backend == :kafka_async Deimos.config.producers.backend = :kafka end @@ -64,20 +68,22 @@ sleep 0.1 end end # Grab the PollInfo or create if it doesn't exist. + # @return [void] def retrieve_poll_info ActiveRecord::Base.connection.reconnect! unless ActiveRecord::Base.connection.open_transactions.positive? new_time = @config.start_from_beginning ? Time.new(0) : Time.zone.now @info = Deimos::PollInfo.find_by_producer(@config.producer_class) || Deimos::PollInfo.create!(producer: @config.producer_class, last_sent: new_time, last_sent_id: 0) end # Stop the poll. + # @return [void] def stop Deimos.config.logger.info('Received signal to stop') @signal_to_stop = true end @@ -93,10 +99,11 @@ def last_updated(record) record.public_send(@config.timestamp_column) end # Send messages for updated data. + # @return [void] def process_updates return unless should_run? time_from = @config.full_table ? Time.new(0) : @info.last_sent.in_time_zone time_to = Time.zone.now - @config.delay_time @@ -133,9 +140,10 @@ limit(BATCH_SIZE). order("#{quoted_timestamp}, #{quoted_id}") end # @param batch [Array<ActiveRecord::Base>] + # @return [void] def process_batch(batch) record = batch.last id_method = record.class.primary_key last_id = record.public_send(id_method) last_updated_at = last_updated(record)