lib/distribute_reads.rb in distribute_reads-0.3.3 vs lib/distribute_reads.rb in distribute_reads-0.3.4

- old
+ new

@@ -41,27 +41,47 @@ with_replica do case connection.adapter_name when "PostgreSQL", "PostGIS" # cache the version number - @server_version_num ||= {} + @aurora_postgres ||= {} cache_key = connection.pool.object_id + + unless @aurora_postgres.key?(cache_key) + @aurora_postgres[cache_key] = connection.select_all("SELECT 1 FROM pg_stat_activity WHERE backend_type = 'aurora runtime'").any? + end + + @server_version_num ||= {} @server_version_num[cache_key] ||= connection.select_all("SHOW server_version_num").first["server_version_num"].to_i - lag_condition = - if @server_version_num[cache_key] >= 100000 - "pg_last_wal_receive_lsn() = pg_last_wal_replay_lsn()" - else - "pg_last_xlog_receive_location() = pg_last_xlog_replay_location()" + if @aurora_postgres[cache_key] + # no way to get session_id at the moment + # also, pg_is_in_recovery() is always false + # and pg_settings are the same for writer and readers + # this means we can't tell: + # 1. if this is the primary or replica + # 2. if replica, which one + status = connection.select_all("SELECT MAX(replica_lag_in_msec) AS replica_lag_in_msec, COUNT(*) AS replica_count FROM aurora_replica_status() WHERE session_id != 'MASTER_SESSION_ID'").first + if status && status["replica_count"].to_i > 1 + log "Multiple readers available, taking max lag of all of them" end + status ? status["replica_lag_in_msec"].to_f / 1000.0 : 0.0 + else + lag_condition = + if @server_version_num[cache_key] >= 100000 + "pg_last_wal_receive_lsn() = pg_last_wal_replay_lsn()" + else + "pg_last_xlog_receive_location() = pg_last_xlog_replay_location()" + end - connection.select_all( - "SELECT CASE - WHEN NOT pg_is_in_recovery() OR #{lag_condition} THEN 0 - ELSE EXTRACT (EPOCH FROM NOW() - pg_last_xact_replay_timestamp()) - END AS lag".squish - ).first["lag"].to_f + connection.select_all( + "SELECT CASE + WHEN NOT pg_is_in_recovery() OR #{lag_condition} THEN 0 + ELSE EXTRACT (EPOCH FROM NOW() - pg_last_xact_replay_timestamp()) + END AS lag".squish + ).first["lag"].to_f + end when "MySQL", "Mysql2", "Mysql2Spatial", "Mysql2Rgeo" @aurora_mysql ||= {} cache_key = connection.pool.object_id unless @aurora_mysql.key?(cache_key) @@ -147,9 +167,11 @@ end end Makara::Proxy.prepend DistributeReads::AppropriatePool Object.include DistributeReads::GlobalMethods +# TODO uncomment in 0.4.0 +# Object.send :private, :distribute_reads ActiveSupport.on_load(:active_job) do require "distribute_reads/job_methods" include DistributeReads::JobMethods end