lib/deimos/utils/db_poller.rb in deimos-ruby-1.16.5 vs lib/deimos/utils/db_poller.rb in deimos-ruby-1.17.0
- old
+ new
@@ -12,10 +12,13 @@
# Needed for Executor so it can identify the worker
# @return [Integer]
attr_reader :id
+ # @return [Hash]
+ attr_reader :config
+
# Begin the DB Poller process.
# @return [void]
def self.start!
if Deimos.config.db_poller_objects.empty?
raise('No pollers configured!')
@@ -108,24 +111,28 @@
time_from = @config.full_table ? Time.new(0) : @info.last_sent.in_time_zone
time_to = Time.zone.now - @config.delay_time
Deimos.config.logger.info("Polling #{@producer.topic} from #{time_from} to #{time_to}")
message_count = 0
batch_count = 0
+ error_count = 0
# poll_query gets all the relevant data from the database, as defined
# by the producer itself.
loop do
Deimos.config.logger.debug("Polling #{@producer.topic}, batch #{batch_count + 1}")
batch = fetch_results(time_from, time_to).to_a
break if batch.empty?
- batch_count += 1
- process_batch(batch)
+ if process_batch_with_span(batch)
+ batch_count += 1
+ else
+ error_count += 1
+ end
message_count += batch.size
time_from = last_updated(batch.last)
end
- Deimos.config.logger.info("Poll #{@producer.topic} complete at #{time_to} (#{message_count} messages, #{batch_count} batches}")
+ Deimos.config.logger.info("Poll #{@producer.topic} complete at #{time_to} (#{message_count} messages, #{batch_count} successful batches, #{error_count} batches errored}")
end
# @param time_from [ActiveSupport::TimeWithZone]
# @param time_to [ActiveSupport::TimeWithZone]
# @return [ActiveRecord::Relation]
@@ -141,17 +148,54 @@
order("#{quoted_timestamp}, #{quoted_id}")
end
# @param batch [Array<ActiveRecord::Base>]
# @return [void]
- def process_batch(batch)
+ def process_batch_with_span(batch)
+ retries = 0
+ begin
+ span = Deimos.config.tracer&.start(
+ 'deimos-db-poller',
+ resource: @producer.class.name.gsub('::', '-')
+ )
+ process_batch(batch)
+ Deimos.config.tracer&.finish(span)
+ rescue Kafka::Error => e # keep trying till it fixes itself
+ Deimos.config.logger.error("Error publishing through DB Poller: #{e.message}")
+ sleep(0.5)
+ retry
+ rescue StandardError => e
+ Deimos.config.logger.error("Error publishing through DB poller: #{e.message}}")
+ if retries < @config.retries
+ retries += 1
+ sleep(0.5)
+ retry
+ else
+ Deimos.config.logger.error('Retries exceeded, moving on to next batch')
+ Deimos.config.tracer&.set_error(span, e)
+ self.touch_info(batch)
+ return false
+ end
+ end
+ true
+ end
+
+ # @param batch [Array<ActiveRecord::Base>]
+ # @return [void]
+ def touch_info(batch)
record = batch.last
id_method = record.class.primary_key
last_id = record.public_send(id_method)
last_updated_at = last_updated(record)
- @producer.send_events(batch)
@info.attributes = { last_sent: last_updated_at, last_sent_id: last_id }
@info.save!
+ end
+
+ # @param batch [Array<ActiveRecord::Base>]
+ # @return [void]
+ def process_batch(batch)
+ @producer.send_events(batch)
+ self.touch_info(batch)
end
end
end
end