lib/deimos/utils/db_poller.rb in deimos-ruby-1.17.1 vs lib/deimos/utils/db_poller.rb in deimos-ruby-1.18.0

- old
+ new

@@ -1,201 +1,53 @@ # frozen_string_literal: true -require 'deimos/poll_info' -require 'sigurd' - module Deimos module Utils - # Class which continually polls the database and sends Kafka messages. - class DbPoller - # @return [Integer] - BATCH_SIZE = 1000 - - # Needed for Executor so it can identify the worker - # @return [Integer] - attr_reader :id - - # @return [Hash] - attr_reader :config - + # Overall functionality related to DB poller. + module DbPoller # Begin the DB Poller process. # @return [void] def self.start! if Deimos.config.db_poller_objects.empty? raise('No pollers configured!') end pollers = Deimos.config.db_poller_objects.map do |poller_config| - self.new(poller_config) + self.class_for_config(poller_config.mode).new(poller_config) end executor = Sigurd::Executor.new(pollers, sleep_seconds: 5, logger: Deimos.config.logger) signal_handler = Sigurd::SignalHandler.new(executor) signal_handler.run! end - # @param config [FigTree::ConfigStruct] - def initialize(config) - @config = config - @id = SecureRandom.hex - begin - @producer = @config.producer_class.constantize - rescue NameError - raise "Class #{@config.producer_class} not found!" + # @param config_name [Symbol] + # @return [Class<Deimos::Utils::DbPoller>] + def self.class_for_config(config_name) + case config_name + when :state_based + Deimos::Utils::DbPoller::StateBased + else + Deimos::Utils::DbPoller::TimeBased end - unless @producer < Deimos::ActiveRecordProducer - raise "Class #{@producer.class.name} is not an ActiveRecordProducer!" - end end - # Start the poll: - # 1) Grab the current PollInfo from the database indicating the last - # time we ran - # 2) On a loop, process all the recent updates between the last time - # we ran and now. - # @return [void] - def start - # Don't send asynchronously - if Deimos.config.producers.backend == :kafka_async - Deimos.config.producers.backend = :kafka - end - Deimos.config.logger.info('Starting...') - @signal_to_stop = false - retrieve_poll_info - loop do - if @signal_to_stop - Deimos.config.logger.info('Shutting down') - break - end - process_updates - sleep 0.1 - end - end + PollStatus = Struct.new(:batches_processed, :batches_errored, :messages_processed) do - # Grab the PollInfo or create if it doesn't exist. - # @return [void] - def retrieve_poll_info - ActiveRecord::Base.connection.reconnect! unless ActiveRecord::Base.connection.open_transactions.positive? - new_time = @config.start_from_beginning ? Time.new(0) : Time.zone.now - @info = Deimos::PollInfo.find_by_producer(@config.producer_class) || - Deimos::PollInfo.create!(producer: @config.producer_class, - last_sent: new_time, - last_sent_id: 0) - end - - # Stop the poll. - # @return [void] - def stop - Deimos.config.logger.info('Received signal to stop') - @signal_to_stop = true - end - - # Indicate whether this current loop should process updates. Most loops - # will busy-wait (sleeping 0.1 seconds) until it's ready. - # @return [Boolean] - def should_run? - Time.zone.now - @info.last_sent - @config.delay_time >= @config.run_every - end - - # @param record [ActiveRecord::Base] - # @return [ActiveSupport::TimeWithZone] - def last_updated(record) - record.public_send(@config.timestamp_column) - end - - # Send messages for updated data. - # @return [void] - def process_updates - return unless should_run? - - time_from = @config.full_table ? Time.new(0) : @info.last_sent.in_time_zone - time_to = Time.zone.now - @config.delay_time - Deimos.config.logger.info("Polling #{@producer.topic} from #{time_from} to #{time_to}") - message_count = 0 - batch_count = 0 - error_count = 0 - - # poll_query gets all the relevant data from the database, as defined - # by the producer itself. - loop do - Deimos.config.logger.debug("Polling #{@producer.topic}, batch #{batch_count + 1}") - batch = fetch_results(time_from, time_to).to_a - break if batch.empty? - - if process_batch_with_span(batch) - batch_count += 1 - else - error_count += 1 - end - message_count += batch.size - time_from = last_updated(batch.last) + # @return [Integer] + def current_batch + batches_processed + 1 end - Deimos.config.logger.info("Poll #{@producer.topic} complete at #{time_to} (#{message_count} messages, #{batch_count} successful batches, #{error_count} batches errored}") - end - # @param time_from [ActiveSupport::TimeWithZone] - # @param time_to [ActiveSupport::TimeWithZone] - # @return [ActiveRecord::Relation] - def fetch_results(time_from, time_to) - id = @producer.config[:record_class].primary_key - quoted_timestamp = ActiveRecord::Base.connection.quote_column_name(@config.timestamp_column) - quoted_id = ActiveRecord::Base.connection.quote_column_name(id) - @producer.poll_query(time_from: time_from, - time_to: time_to, - column_name: @config.timestamp_column, - min_id: @info.last_sent_id). - limit(BATCH_SIZE). - order("#{quoted_timestamp}, #{quoted_id}") - end - - # @param batch [Array<ActiveRecord::Base>] - # @return [void] - def process_batch_with_span(batch) - retries = 0 - begin - span = Deimos.config.tracer&.start( - 'deimos-db-poller', - resource: @producer.class.name.gsub('::', '-') - ) - process_batch(batch) - Deimos.config.tracer&.finish(span) - rescue Kafka::Error => e # keep trying till it fixes itself - Deimos.config.logger.error("Error publishing through DB Poller: #{e.message}") - sleep(0.5) - retry - rescue StandardError => e - Deimos.config.logger.error("Error publishing through DB poller: #{e.message}}") - if retries < @config.retries - retries += 1 - sleep(0.5) - retry - else - Deimos.config.logger.error('Retries exceeded, moving on to next batch') - Deimos.config.tracer&.set_error(span, e) - self.touch_info(batch) - return false - end + # @return [String] + def report + "#{batches_processed} batches, #{batches_errored} errored batches, #{messages_processed} processed messages" end - true end - - # @param batch [Array<ActiveRecord::Base>] - # @return [void] - def touch_info(batch) - record = batch.last - id_method = record.class.primary_key - last_id = record.public_send(id_method) - last_updated_at = last_updated(record) - @info.attributes = { last_sent: last_updated_at, last_sent_id: last_id } - @info.save! - end - - # @param batch [Array<ActiveRecord::Base>] - # @return [void] - def process_batch(batch) - @producer.send_events(batch) - self.touch_info(batch) - end end end end + +require 'deimos/utils/db_poller/base' +require 'deimos/utils/db_poller/time_based' +require 'deimos/utils/db_poller/state_based'