lib/deimos/utils/db_poller/time_based.rb in deimos-ruby-1.19.7 vs lib/deimos/utils/db_poller/time_based.rb in deimos-ruby-1.20.0

- old
+ new

@@ -9,11 +9,11 @@ class TimeBased < Base # :nodoc: def create_poll_info new_time = @config.start_from_beginning ? Time.new(0) : Time.zone.now - Deimos::PollInfo.create!(producer: @config.producer_class, + Deimos::PollInfo.create!(producer: @resource_class.to_s, last_sent: new_time, last_sent_id: 0) end # @param batch [Array<ActiveRecord::Base>] @@ -26,39 +26,39 @@ # Send messages for updated data. # @return [void] def process_updates time_from = @config.full_table ? Time.new(0) : @info.last_sent.in_time_zone time_to = Time.zone.now - @config.delay_time - Deimos.config.logger.info("Polling #{@producer.topic} from #{time_from} to #{time_to}") + Deimos.config.logger.info("Polling #{log_identifier} from #{time_from} to #{time_to}") status = PollStatus.new(0, 0, 0) # poll_query gets all the relevant data from the database, as defined # by the producer itself. loop do - Deimos.config.logger.debug("Polling #{@producer.topic}, batch #{status.current_batch}") + Deimos.config.logger.debug("Polling #{log_identifier}, batch #{status.current_batch}") batch = fetch_results(time_from, time_to).to_a if batch.empty? @info.touch(:last_sent) break end process_and_touch_info(batch, status) time_from = last_updated(batch.last) end - Deimos.config.logger.info("Poll #{@producer.topic} complete at #{time_to} (#{status.report})") + Deimos.config.logger.info("Poll #{log_identifier} complete at #{time_to} (#{status.report})") end # @param time_from [ActiveSupport::TimeWithZone] # @param time_to [ActiveSupport::TimeWithZone] # @return [ActiveRecord::Relation] def fetch_results(time_from, time_to) - id = @producer.config[:record_class].primary_key + id = self.producer_classes.first.config[:record_class].primary_key quoted_timestamp = ActiveRecord::Base.connection.quote_column_name(@config.timestamp_column) quoted_id = ActiveRecord::Base.connection.quote_column_name(id) - @producer.poll_query(time_from: time_from, - time_to: time_to, - column_name: @config.timestamp_column, - min_id: @info.last_sent_id). + @resource_class.poll_query(time_from: time_from, + time_to: time_to, + column_name: @config.timestamp_column, + min_id: @info.last_sent_id). limit(BATCH_SIZE). order("#{quoted_timestamp}, #{quoted_id}") end # @param record [ActiveRecord::Base]