Sha256: 729a62bd2e137f82e6557f0b0683284e129c04bd835b8de888c5dc77a123be5f

Contents?: true

Size: 1.89 KB

Versions: 12

Compression:

Stored size: 1.89 KB

Contents

# frozen_string_literal: true

require 'deimos/utils/db_poller/base'

module Deimos
  module Utils
    module DbPoller
      # Poller that uses state columns to determine the records to publish.
      class StateBased < Base
        # Send messages for updated data.
        # @return [void]
        def process_updates
          Deimos.config.logger.info("Polling #{@producer.topic}")
          status = PollStatus.new(0, 0, 0)

          # poll_query gets all the relevant data from the database, as defined
          # by the producer itself.
          loop do
            Deimos.config.logger.debug("Polling #{@producer.topic}, batch #{status.current_batch}")
            batch = fetch_results.to_a
            if batch.empty?
              @info.touch(:last_sent)
              break
            end

            success = process_batch_with_span(batch, status)
            finalize_batch(batch, success)
          end
          Deimos.config.logger.info("Poll #{@producer.topic} complete (#{status.report}")
        end

        # @return [ActiveRecord::Relation]
        def fetch_results
          @producer.poll_query.limit(BATCH_SIZE).order(@config.timestamp_column)
        end

        # @param batch [Array<ActiveRecord::Base>]
        # @param success [Boolean]
        # @return [void]
        def finalize_batch(batch, success)
          @info.touch(:last_sent)

          state = success ? @config.published_state : @config.failed_state
          klass = batch.first.class
          id_col = klass.primary_key.to_sym
          timestamp_col = @config.timestamp_column

          attrs = { timestamp_col => Time.zone.now }
          attrs[@config.state_column] = state if state
          if @config.publish_timestamp_column
            attrs[@config.publish_timestamp_column] = Time.zone.now
          end

          klass.where(id_col => batch.map(&id_col)).update_all(attrs)
        end

      end
    end
  end
end

Version data entries

12 entries across 12 versions & 1 rubygems

Version Path
deimos-ruby-1.19.7 lib/deimos/utils/db_poller/state_based.rb
deimos-ruby-1.19.6 lib/deimos/utils/db_poller/state_based.rb
deimos-ruby-1.19.5 lib/deimos/utils/db_poller/state_based.rb
deimos-ruby-1.19.4 lib/deimos/utils/db_poller/state_based.rb
deimos-ruby-1.19.3 lib/deimos/utils/db_poller/state_based.rb
deimos-ruby-1.19.2 lib/deimos/utils/db_poller/state_based.rb
deimos-ruby-1.19.1 lib/deimos/utils/db_poller/state_based.rb
deimos-ruby-1.19.1.pre.beta1 lib/deimos/utils/db_poller/state_based.rb
deimos-ruby-1.19.0 lib/deimos/utils/db_poller/state_based.rb
deimos-ruby-1.19.beta2 lib/deimos/utils/db_poller/state_based.rb
deimos-ruby-1.19.beta1 lib/deimos/utils/db_poller/state_based.rb
deimos-ruby-1.18.2 lib/deimos/utils/db_poller/state_based.rb