lib/distribute_reads.rb in distribute_reads-0.3.3 vs lib/distribute_reads.rb in distribute_reads-0.3.4
- old
+ new
@@ -41,27 +41,47 @@
with_replica do
case connection.adapter_name
when "PostgreSQL", "PostGIS"
# cache the version number
- @server_version_num ||= {}
+ @aurora_postgres ||= {}
cache_key = connection.pool.object_id
+
+ unless @aurora_postgres.key?(cache_key)
+ @aurora_postgres[cache_key] = connection.select_all("SELECT 1 FROM pg_stat_activity WHERE backend_type = 'aurora runtime'").any?
+ end
+
+ @server_version_num ||= {}
@server_version_num[cache_key] ||= connection.select_all("SHOW server_version_num").first["server_version_num"].to_i
- lag_condition =
- if @server_version_num[cache_key] >= 100000
- "pg_last_wal_receive_lsn() = pg_last_wal_replay_lsn()"
- else
- "pg_last_xlog_receive_location() = pg_last_xlog_replay_location()"
+ if @aurora_postgres[cache_key]
+ # no way to get session_id at the moment
+ # also, pg_is_in_recovery() is always false
+ # and pg_settings are the same for writer and readers
+ # this means we can't tell:
+ # 1. if this is the primary or replica
+ # 2. if replica, which one
+ status = connection.select_all("SELECT MAX(replica_lag_in_msec) AS replica_lag_in_msec, COUNT(*) AS replica_count FROM aurora_replica_status() WHERE session_id != 'MASTER_SESSION_ID'").first
+ if status && status["replica_count"].to_i > 1
+ log "Multiple readers available, taking max lag of all of them"
end
+ status ? status["replica_lag_in_msec"].to_f / 1000.0 : 0.0
+ else
+ lag_condition =
+ if @server_version_num[cache_key] >= 100000
+ "pg_last_wal_receive_lsn() = pg_last_wal_replay_lsn()"
+ else
+ "pg_last_xlog_receive_location() = pg_last_xlog_replay_location()"
+ end
- connection.select_all(
- "SELECT CASE
- WHEN NOT pg_is_in_recovery() OR #{lag_condition} THEN 0
- ELSE EXTRACT (EPOCH FROM NOW() - pg_last_xact_replay_timestamp())
- END AS lag".squish
- ).first["lag"].to_f
+ connection.select_all(
+ "SELECT CASE
+ WHEN NOT pg_is_in_recovery() OR #{lag_condition} THEN 0
+ ELSE EXTRACT (EPOCH FROM NOW() - pg_last_xact_replay_timestamp())
+ END AS lag".squish
+ ).first["lag"].to_f
+ end
when "MySQL", "Mysql2", "Mysql2Spatial", "Mysql2Rgeo"
@aurora_mysql ||= {}
cache_key = connection.pool.object_id
unless @aurora_mysql.key?(cache_key)
@@ -147,9 +167,11 @@
end
end
Makara::Proxy.prepend DistributeReads::AppropriatePool
Object.include DistributeReads::GlobalMethods
+# TODO uncomment in 0.4.0
+# Object.send :private, :distribute_reads
ActiveSupport.on_load(:active_job) do
require "distribute_reads/job_methods"
include DistributeReads::JobMethods
end