Sha256: 1a601ce6937b4262069cf07017087c9d5bea3940163111944d1e4ee0298afcf8

Contents?: true

Size: 1.8 KB

Versions: 1

Compression:

Stored size: 1.8 KB

Contents

module DistributeReads
  module GlobalMethods
    def distribute_reads(**options)
      raise ArgumentError, "Missing block" unless block_given?

      unknown_keywords = options.keys - [:failover, :lag_failover, :lag_on, :max_lag, :primary, :replica]
      raise ArgumentError, "Unknown keywords: #{unknown_keywords.join(", ")}" if unknown_keywords.any?

      options = DistributeReads.default_options.merge(options)

      previous_value = Thread.current[:distribute_reads]
      begin
        Thread.current[:distribute_reads] = {
          failover: options[:failover],
          primary: options[:primary],
          replica: options[:replica]
        }

        # TODO ensure same connection is used to test lag and execute queries
        max_lag = options[:max_lag]
        if max_lag && !options[:primary]
          Array(options[:lag_on] || [ActiveRecord::Base]).each do |base_model|
            if DistributeReads.lag(connection: base_model.connection) > max_lag
              message = "Replica lag over #{max_lag} seconds#{options[:lag_on] ? " on #{base_model.name} connection" : ""}"

              if options[:lag_failover]
                # TODO possibly per connection
                Thread.current[:distribute_reads][:primary] = true
                Thread.current[:distribute_reads][:replica] = false
                DistributeReads.log "#{message}. Falling back to master pool."
                break
              else
                raise DistributeReads::TooMuchLag, message
              end
            end
          end
        end

        value = yield
        DistributeReads.log "Call `to_a` inside block to execute query on replica" if value.is_a?(ActiveRecord::Relation) && !previous_value
        value
      ensure
        Thread.current[:distribute_reads] = previous_value
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
distribute_reads-0.2.4 lib/distribute_reads/global_methods.rb