Sha256: 1493009e1614a2eb1b78ee5fd9438cd4d36fe9231998fd227fe08a5ec7e5726f

Contents?: true

Size: 1.89 KB

Versions: 5

Compression:

Stored size: 1.89 KB

Contents

# frozen_string_literal: true

require 'deimos/utils/db_poller/base'

module Deimos
  module Utils
    module DbPoller
      # Poller that uses state columns to determine the records to publish.
      class StateBased < Base
        # Send messages for updated data.
        # @return [void]
        def process_updates
          Deimos.config.logger.info("Polling #{log_identifier}")
          status = PollStatus.new(0, 0, 0)

          # poll_query gets all the relevant data from the database, as defined
          # by the producer itself.
          loop do
            Deimos.config.logger.debug("Polling #{log_identifier}, batch #{status.current_batch}")
            batch = fetch_results.to_a
            if batch.empty?
              @info.touch(:last_sent)
              break
            end

            success = process_batch_with_span(batch, status)
            finalize_batch(batch, success)
          end
          Deimos.config.logger.info("Poll #{log_identifier} complete (#{status.report}")
        end

        # @return [ActiveRecord::Relation]
        def fetch_results
          @resource_class.poll_query.limit(BATCH_SIZE).order(@config.timestamp_column)
        end

        # @param batch [Array<ActiveRecord::Base>]
        # @param success [Boolean]
        # @return [void]
        def finalize_batch(batch, success)
          @info.touch(:last_sent)

          state = success ? @config.published_state : @config.failed_state
          klass = batch.first.class
          id_col = klass.primary_key.to_sym
          timestamp_col = @config.timestamp_column

          attrs = { timestamp_col => Time.zone.now }
          attrs[@config.state_column] = state if state
          if @config.publish_timestamp_column
            attrs[@config.publish_timestamp_column] = Time.zone.now
          end

          klass.where(id_col => batch.map(&id_col)).update_all(attrs)
        end

      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
deimos-ruby-1.22.2 lib/deimos/utils/db_poller/state_based.rb
deimos-ruby-1.22.1 lib/deimos/utils/db_poller/state_based.rb
deimos-ruby-1.22 lib/deimos/utils/db_poller/state_based.rb
deimos-ruby-1.20.1 lib/deimos/utils/db_poller/state_based.rb
deimos-ruby-1.20.0 lib/deimos/utils/db_poller/state_based.rb