lib/distribute_reads.rb in distribute_reads-0.2.4 vs lib/distribute_reads.rb in distribute_reads-0.3.0

- old
+ new

@@ -1,6 +1,10 @@ +# dependencies +require "active_support" require "makara" + +# modules require "distribute_reads/appropriate_pool" require "distribute_reads/cache_store" require "distribute_reads/global_methods" require "distribute_reads/version" @@ -10,85 +14,106 @@ class NoReplicasAvailable < Error; end class << self attr_accessor :by_default attr_accessor :default_options + attr_writer :logger end self.by_default = false self.default_options = { failover: true, lag_failover: false } - def self.replication_lag(connection: nil) - distribute_reads do - lag(connection: connection) + def self.logger + unless defined?(@logger) + @logger = ActiveRecord::Base.logger end + @logger end - def self.lag(connection: nil) - raise DistributeReads::Error, "Don't use outside distribute_reads" unless Thread.current[:distribute_reads] - + def self.replication_lag(connection: nil) connection ||= ActiveRecord::Base.connection replica_pool = connection.instance_variable_get(:@slave_pool) if replica_pool && replica_pool.connections.size > 1 log "Multiple replicas available, lag only reported for one" end - if %w(PostgreSQL PostGIS).include?(connection.adapter_name) - # cache the version number - @server_version_num ||= {} - cache_key = connection.pool.object_id - @server_version_num[cache_key] ||= connection.execute("SHOW server_version_num").first["server_version_num"].to_i + with_replica do + case connection.adapter_name + when "PostgreSQL", "PostGIS" + # cache the version number + @server_version_num ||= {} + cache_key = connection.pool.object_id + @server_version_num[cache_key] ||= connection.execute("SHOW server_version_num").first["server_version_num"].to_i - lag_condition = - if @server_version_num[cache_key] >= 100000 - "pg_last_wal_receive_lsn() = pg_last_wal_replay_lsn()" - else - "pg_last_xlog_receive_location() = pg_last_xlog_replay_location()" - end + lag_condition = + if @server_version_num[cache_key] >= 100000 + "pg_last_wal_receive_lsn() = pg_last_wal_replay_lsn()" + else + "pg_last_xlog_receive_location() = pg_last_xlog_replay_location()" + end - connection.execute( - "SELECT CASE - WHEN NOT pg_is_in_recovery() OR #{lag_condition} THEN 0 - ELSE EXTRACT (EPOCH FROM NOW() - pg_last_xact_replay_timestamp()) - END AS lag".squish - ).first["lag"].to_f - elsif %w(MySQL Mysql2 Mysql2Spatial Mysql2Rgeo).include?(connection.adapter_name) - replica_value = Thread.current[:distribute_reads][:replica] - begin - # makara doesn't send SHOW queries to replica, so we must force it - Thread.current[:distribute_reads][:replica] = true - + connection.execute( + "SELECT CASE + WHEN NOT pg_is_in_recovery() OR #{lag_condition} THEN 0 + ELSE EXTRACT (EPOCH FROM NOW() - pg_last_xact_replay_timestamp()) + END AS lag".squish + ).first["lag"].to_f + when "MySQL", "Mysql2", "Mysql2Spatial", "Mysql2Rgeo" @aurora_mysql ||= {} cache_key = connection.pool.object_id unless @aurora_mysql.key?(cache_key) + # makara doesn't send SHOW queries to replica by default @aurora_mysql[cache_key] = connection.exec_query("SHOW VARIABLES LIKE 'aurora_version'").to_hash.any? end if @aurora_mysql[cache_key] status = connection.exec_query("SELECT Replica_lag_in_msec FROM mysql.ro_replica_status WHERE Server_id = @@aurora_server_id").to_hash.first status ? status["Replica_lag_in_msec"].to_f / 1000.0 : 0.0 else status = connection.exec_query("SHOW SLAVE STATUS").to_hash.first - status ? status["Seconds_Behind_Master"].to_f : 0.0 + if status + if status["Seconds_Behind_Master"].nil? + # replication stopped + # https://dev.mysql.com/doc/refman/8.0/en/show-slave-status.html + nil + else + status["Seconds_Behind_Master"].to_f + end + else + # not a replica + 0.0 + end end - ensure - Thread.current[:distribute_reads][:replica] = replica_value + when "SQLite" + # never a replica + 0.0 + else + raise DistributeReads::Error, "Option not supported with this adapter" end - else - raise DistributeReads::Error, "Option not supported with this adapter" end end def self.log(message) - warn "[distribute_reads] #{message}" + logger.info("[distribute_reads] #{message}") if logger end # private + def self.with_replica + previous_value = Thread.current[:distribute_reads] + begin + Thread.current[:distribute_reads] = {replica: true, failover: false} + yield + ensure + Thread.current[:distribute_reads] = previous_value + end + end + + # private def self.makara3? unless defined?(@makara3) @makara3 = Gem::Version.new(Makara::VERSION.to_s) < Gem::Version.new("0.4.0") end @makara3 @@ -103,11 +128,11 @@ def self.default_to_primary=(value) self.by_default = !value end end -Makara::Proxy.send :prepend, DistributeReads::AppropriatePool -Object.send :include, DistributeReads::GlobalMethods +Makara::Proxy.prepend DistributeReads::AppropriatePool +Object.include DistributeReads::GlobalMethods ActiveSupport.on_load(:active_job) do require "distribute_reads/job_methods" include DistributeReads::JobMethods end