lib/deimos/utils/db_poller.rb in deimos-ruby-1.16.5 vs lib/deimos/utils/db_poller.rb in deimos-ruby-1.17.0

- old
+ new

@@ -12,10 +12,13 @@ # Needed for Executor so it can identify the worker # @return [Integer] attr_reader :id + # @return [Hash] + attr_reader :config + # Begin the DB Poller process. # @return [void] def self.start! if Deimos.config.db_poller_objects.empty? raise('No pollers configured!') @@ -108,24 +111,28 @@ time_from = @config.full_table ? Time.new(0) : @info.last_sent.in_time_zone time_to = Time.zone.now - @config.delay_time Deimos.config.logger.info("Polling #{@producer.topic} from #{time_from} to #{time_to}") message_count = 0 batch_count = 0 + error_count = 0 # poll_query gets all the relevant data from the database, as defined # by the producer itself. loop do Deimos.config.logger.debug("Polling #{@producer.topic}, batch #{batch_count + 1}") batch = fetch_results(time_from, time_to).to_a break if batch.empty? - batch_count += 1 - process_batch(batch) + if process_batch_with_span(batch) + batch_count += 1 + else + error_count += 1 + end message_count += batch.size time_from = last_updated(batch.last) end - Deimos.config.logger.info("Poll #{@producer.topic} complete at #{time_to} (#{message_count} messages, #{batch_count} batches}") + Deimos.config.logger.info("Poll #{@producer.topic} complete at #{time_to} (#{message_count} messages, #{batch_count} successful batches, #{error_count} batches errored}") end # @param time_from [ActiveSupport::TimeWithZone] # @param time_to [ActiveSupport::TimeWithZone] # @return [ActiveRecord::Relation] @@ -141,17 +148,54 @@ order("#{quoted_timestamp}, #{quoted_id}") end # @param batch [Array<ActiveRecord::Base>] # @return [void] - def process_batch(batch) + def process_batch_with_span(batch) + retries = 0 + begin + span = Deimos.config.tracer&.start( + 'deimos-db-poller', + resource: @producer.class.name.gsub('::', '-') + ) + process_batch(batch) + Deimos.config.tracer&.finish(span) + rescue Kafka::Error => e # keep trying till it fixes itself + Deimos.config.logger.error("Error publishing through DB Poller: #{e.message}") + sleep(0.5) + retry + rescue StandardError => e + Deimos.config.logger.error("Error publishing through DB poller: #{e.message}}") + if retries < @config.retries + retries += 1 + sleep(0.5) + retry + else + Deimos.config.logger.error('Retries exceeded, moving on to next batch') + Deimos.config.tracer&.set_error(span, e) + self.touch_info(batch) + return false + end + end + true + end + + # @param batch [Array<ActiveRecord::Base>] + # @return [void] + def touch_info(batch) record = batch.last id_method = record.class.primary_key last_id = record.public_send(id_method) last_updated_at = last_updated(record) - @producer.send_events(batch) @info.attributes = { last_sent: last_updated_at, last_sent_id: last_id } @info.save! + end + + # @param batch [Array<ActiveRecord::Base>] + # @return [void] + def process_batch(batch) + @producer.send_events(batch) + self.touch_info(batch) end end end end