lib/distribute_reads.rb in distribute_reads-0.1.2 vs lib/distribute_reads.rb in distribute_reads-0.2.0
- old
+ new
@@ -1,35 +1,60 @@
require "makara"
require "distribute_reads/appropriate_pool"
+require "distribute_reads/cache_store"
require "distribute_reads/global_methods"
require "distribute_reads/version"
module DistributeReads
- class TooMuchLag < StandardError; end
+ class Error < StandardError; end
+ class TooMuchLag < Error; end
+ class NoReplicasAvailable < Error; end
class << self
- attr_accessor :default_to_primary
+ attr_accessor :by_default
+ attr_accessor :default_options
end
- self.default_to_primary = true
+ self.by_default = false
+ self.default_options = {
+ failover: true,
+ lag_failover: false
+ }
- def self.lag
- conn = ActiveRecord::Base.connection
- if %w(PostgreSQL PostGIS).include?(conn.adapter_name)
- conn.execute(
+ def self.lag(connection: nil)
+ raise DistributeReads::Error, "Don't use outside distribute_reads" unless Thread.current[:distribute_reads]
+
+ connection ||= ActiveRecord::Base.connection
+ if %w(PostgreSQL PostGIS).include?(connection.adapter_name)
+ replica_pool = connection.instance_variable_get(:@slave_pool)
+ if replica_pool && replica_pool.connections.size > 1
+ warn "[distribute_reads] Multiple replicas available, lag only reported for one"
+ end
+
+ connection.execute(
"SELECT CASE
- WHEN pg_last_xlog_receive_location() = pg_last_xlog_replay_location() THEN 0
+ WHEN NOT pg_is_in_recovery() OR pg_last_xlog_receive_location() = pg_last_xlog_replay_location() THEN 0
ELSE EXTRACT (EPOCH FROM NOW() - pg_last_xact_replay_timestamp())
END AS lag"
).first["lag"].to_f
else
- raise "Option not supported with this adapter"
+ raise DistributeReads::Error, "Option not supported with this adapter"
end
end
+
+ # legacy
+ def self.default_to_primary
+ !by_default
+ end
+
+ # legacy
+ def self.default_to_primary=(value)
+ self.by_default = !value
+ end
end
Makara::Proxy.send :prepend, DistributeReads::AppropriatePool
Object.send :include, DistributeReads::GlobalMethods
ActiveSupport.on_load(:active_job) do
require "distribute_reads/job_methods"
- extend DistributeReads::JobMethods
+ include DistributeReads::JobMethods
end