lib/deimos/utils/db_poller.rb in deimos-ruby-1.16.3 vs lib/deimos/utils/db_poller.rb in deimos-ruby-1.16.4
- old
+ new
@@ -5,16 +5,19 @@
module Deimos
module Utils
# Class which continually polls the database and sends Kafka messages.
class DbPoller
+ # @return [Integer]
BATCH_SIZE = 1000
# Needed for Executor so it can identify the worker
+ # @return [Integer]
attr_reader :id
# Begin the DB Poller process.
+ # @return [void]
def self.start!
if Deimos.config.db_poller_objects.empty?
raise('No pollers configured!')
end
@@ -26,11 +29,11 @@
logger: Deimos.config.logger)
signal_handler = Sigurd::SignalHandler.new(executor)
signal_handler.run!
end
- # @param config [Deimos::Configuration::ConfigStruct]
+ # @param config [FigTree::ConfigStruct]
def initialize(config)
@config = config
@id = SecureRandom.hex
begin
@producer = @config.producer_class.constantize
@@ -45,10 +48,11 @@
# Start the poll:
# 1) Grab the current PollInfo from the database indicating the last
# time we ran
# 2) On a loop, process all the recent updates between the last time
# we ran and now.
+ # @return [void]
def start
# Don't send asynchronously
if Deimos.config.producers.backend == :kafka_async
Deimos.config.producers.backend = :kafka
end
@@ -64,20 +68,22 @@
sleep 0.1
end
end
# Grab the PollInfo or create if it doesn't exist.
+ # @return [void]
def retrieve_poll_info
ActiveRecord::Base.connection.reconnect! unless ActiveRecord::Base.connection.open_transactions.positive?
new_time = @config.start_from_beginning ? Time.new(0) : Time.zone.now
@info = Deimos::PollInfo.find_by_producer(@config.producer_class) ||
Deimos::PollInfo.create!(producer: @config.producer_class,
last_sent: new_time,
last_sent_id: 0)
end
# Stop the poll.
+ # @return [void]
def stop
Deimos.config.logger.info('Received signal to stop')
@signal_to_stop = true
end
@@ -93,10 +99,11 @@
def last_updated(record)
record.public_send(@config.timestamp_column)
end
# Send messages for updated data.
+ # @return [void]
def process_updates
return unless should_run?
time_from = @config.full_table ? Time.new(0) : @info.last_sent.in_time_zone
time_to = Time.zone.now - @config.delay_time
@@ -133,9 +140,10 @@
limit(BATCH_SIZE).
order("#{quoted_timestamp}, #{quoted_id}")
end
# @param batch [Array<ActiveRecord::Base>]
+ # @return [void]
def process_batch(batch)
record = batch.last
id_method = record.class.primary_key
last_id = record.public_send(id_method)
last_updated_at = last_updated(record)