lib/deimos/utils/db_poller/time_based.rb in deimos-ruby-1.19.7 vs lib/deimos/utils/db_poller/time_based.rb in deimos-ruby-1.20.0
- old
+ new
@@ -9,11 +9,11 @@
class TimeBased < Base
# :nodoc:
def create_poll_info
new_time = @config.start_from_beginning ? Time.new(0) : Time.zone.now
- Deimos::PollInfo.create!(producer: @config.producer_class,
+ Deimos::PollInfo.create!(producer: @resource_class.to_s,
last_sent: new_time,
last_sent_id: 0)
end
# @param batch [Array<ActiveRecord::Base>]
@@ -26,39 +26,39 @@
# Send messages for updated data.
# @return [void]
def process_updates
time_from = @config.full_table ? Time.new(0) : @info.last_sent.in_time_zone
time_to = Time.zone.now - @config.delay_time
- Deimos.config.logger.info("Polling #{@producer.topic} from #{time_from} to #{time_to}")
+ Deimos.config.logger.info("Polling #{log_identifier} from #{time_from} to #{time_to}")
status = PollStatus.new(0, 0, 0)
# poll_query gets all the relevant data from the database, as defined
# by the producer itself.
loop do
- Deimos.config.logger.debug("Polling #{@producer.topic}, batch #{status.current_batch}")
+ Deimos.config.logger.debug("Polling #{log_identifier}, batch #{status.current_batch}")
batch = fetch_results(time_from, time_to).to_a
if batch.empty?
@info.touch(:last_sent)
break
end
process_and_touch_info(batch, status)
time_from = last_updated(batch.last)
end
- Deimos.config.logger.info("Poll #{@producer.topic} complete at #{time_to} (#{status.report})")
+ Deimos.config.logger.info("Poll #{log_identifier} complete at #{time_to} (#{status.report})")
end
# @param time_from [ActiveSupport::TimeWithZone]
# @param time_to [ActiveSupport::TimeWithZone]
# @return [ActiveRecord::Relation]
def fetch_results(time_from, time_to)
- id = @producer.config[:record_class].primary_key
+ id = self.producer_classes.first.config[:record_class].primary_key
quoted_timestamp = ActiveRecord::Base.connection.quote_column_name(@config.timestamp_column)
quoted_id = ActiveRecord::Base.connection.quote_column_name(id)
- @producer.poll_query(time_from: time_from,
- time_to: time_to,
- column_name: @config.timestamp_column,
- min_id: @info.last_sent_id).
+ @resource_class.poll_query(time_from: time_from,
+ time_to: time_to,
+ column_name: @config.timestamp_column,
+ min_id: @info.last_sent_id).
limit(BATCH_SIZE).
order("#{quoted_timestamp}, #{quoted_id}")
end
# @param record [ActiveRecord::Base]