lib/rails_failover/redis/handler.rb in rails_failover-0.5.7 vs lib/rails_failover/redis/handler.rb in rails_failover-0.5.8

- old
+ new

@@ -10,10 +10,12 @@ include MonitorMixin PRIMARY_ROLE_STATUS = "role:master" PRIMARY_LOADED_STATUS = "loading:0" VERIFY_FREQUENCY_BUFFER_PRECENT = 20 + SOFT_DISCONNECT_TIMEOUT_SECONDS = 1 + SOFT_DISCONNECT_POLL_SECONDS = 0.05 def initialize @primaries_down = {} @clients = {} @ancestor_pid = Process.pid @@ -22,44 +24,51 @@ end def verify_primary(options) mon_synchronize do primary_down(options) - disconnect_clients(options) + ensure_failover_thread_running + end + end - return if @thread&.alive? + def ensure_failover_thread_running + return if @thread&.alive? - logger&.warn "Failover for Redis has been initiated" + logger&.warn "Failover for Redis has been initiated" - begin - RailsFailover::Redis.on_failover_callback&.call - rescue => e - logger&.warn("RailsFailover::Redis.on_failover_callback failed: #{e.class} #{e.message}\n#{e.backtrace.join("\n")}") - end + begin + RailsFailover::Redis.on_failover_callback&.call + rescue => e + logger&.warn("RailsFailover::Redis.on_failover_callback failed: #{e.class} #{e.message}\n#{e.backtrace.join("\n")}") + end - @thread = Thread.new do - loop do - thread = Thread.new { initiate_fallback_to_primary } - thread.join + @thread = Thread.new do + loop do + ensure_primary_clients_disconnected + try_fallback_to_primary - if all_primaries_up - logger&.warn "Fallback to primary for Redis has been completed." + if all_primaries_up + logger&.warn "Fallback to primary for Redis has been completed." - begin - RailsFailover::Redis.on_fallback_callback&.call - rescue => e - logger&.warn("RailsFailover::Redis.on_fallback_callback failed: #{e.class} #{e.message}\n#{e.backtrace.join("\n")}") - end - - break + begin + RailsFailover::Redis.on_fallback_callback&.call + rescue => e + logger&.warn("RailsFailover::Redis.on_fallback_callback failed: #{e.class} #{e.message}\n#{e.backtrace.join("\n")}") end + break end end end end - def initiate_fallback_to_primary + def ensure_primary_clients_disconnected + mon_synchronize { primaries_down.dup }.each do |key, options| + disconnect_clients(options, RailsFailover::Redis::PRIMARY) + end + end + + def try_fallback_to_primary frequency = RailsFailover::Redis.verify_primary_frequency_seconds sleep(frequency * ((rand(VERIFY_FREQUENCY_BUFFER_PRECENT) + 100) / 100.0)) active_primaries_keys = {} @@ -84,11 +93,11 @@ end end active_primaries_keys.each do |key, options| primary_up(options) - disconnect_clients(options) + disconnect_clients(options, RailsFailover::Redis::REPLICA) end end def register_client(client) key = client.options[:id] @@ -171,26 +180,51 @@ @primaries_down[process_pid] end end - def disconnect_clients(options) + def disconnect_clients(options, role) key = options[:id] - mon_synchronize do - if to_disconnect = clients[key].dup - # Don't disconnect connections abruptly since it may lead to unexepcted - # errors. Is there a better way to do this without having to monkey patch - # the redis-rb gem heavily? - ObjectSpace.each_object(::Redis).each do |redis| - to_disconnect.each do |c| - if redis._client == c - redis.synchronize { |_client| _client.disconnect } - end - end - end + matched_clients = mon_synchronize { clients[key].dup } + &.filter { |c| c.connection.rails_failover_role == role } + &.to_set + + return if matched_clients.nil? || matched_clients.empty? + + # This is not ideal, but the mutex we need is contained + # in the ::Redis instance, not the Redis::Client + ObjectSpace.each_object(::Redis) do |redis| + # When subscribed, Redis#_client is not a Redis::Client + # Instance variable is the only reliable way + client = redis.instance_variable_get(:@original_client) + next if !matched_clients.include?(client) + soft_disconnect(redis, client, role) + end + end + + # Calling .disconnect can cause a running subscribe() to block forever + # Therefore try to acquire the lock + def soft_disconnect(redis, client, role) + has_lock = redis.mon_try_enter + + if !has_lock + client.connection.shutdown_socket + + waiting_since = Process.clock_gettime(Process::CLOCK_MONOTONIC) + loop do # Keep trying + break if has_lock = redis.mon_try_enter + break if !client.connection.connected? # Disconnected by other thread + break if client.connection.rails_failover_role != role # Reconnected by other thread + time_now = Process.clock_gettime(Process::CLOCK_MONOTONIC) + break if time_now > waiting_since + SOFT_DISCONNECT_TIMEOUT_SECONDS + sleep SOFT_DISCONNECT_POLL_SECONDS end end + + client.disconnect if client.connection&.rails_failover_role == role + ensure + redis.mon_exit if has_lock end def logger RailsFailover::Redis.logger end