lib/rails_failover/redis/handler.rb in rails_failover-0.5.7 vs lib/rails_failover/redis/handler.rb in rails_failover-0.5.8
- old
+ new
@@ -10,10 +10,12 @@
include MonitorMixin
PRIMARY_ROLE_STATUS = "role:master"
PRIMARY_LOADED_STATUS = "loading:0"
VERIFY_FREQUENCY_BUFFER_PRECENT = 20
+ SOFT_DISCONNECT_TIMEOUT_SECONDS = 1
+ SOFT_DISCONNECT_POLL_SECONDS = 0.05
def initialize
@primaries_down = {}
@clients = {}
@ancestor_pid = Process.pid
@@ -22,44 +24,51 @@
end
def verify_primary(options)
mon_synchronize do
primary_down(options)
- disconnect_clients(options)
+ ensure_failover_thread_running
+ end
+ end
- return if @thread&.alive?
+ def ensure_failover_thread_running
+ return if @thread&.alive?
- logger&.warn "Failover for Redis has been initiated"
+ logger&.warn "Failover for Redis has been initiated"
- begin
- RailsFailover::Redis.on_failover_callback&.call
- rescue => e
- logger&.warn("RailsFailover::Redis.on_failover_callback failed: #{e.class} #{e.message}\n#{e.backtrace.join("\n")}")
- end
+ begin
+ RailsFailover::Redis.on_failover_callback&.call
+ rescue => e
+ logger&.warn("RailsFailover::Redis.on_failover_callback failed: #{e.class} #{e.message}\n#{e.backtrace.join("\n")}")
+ end
- @thread = Thread.new do
- loop do
- thread = Thread.new { initiate_fallback_to_primary }
- thread.join
+ @thread = Thread.new do
+ loop do
+ ensure_primary_clients_disconnected
+ try_fallback_to_primary
- if all_primaries_up
- logger&.warn "Fallback to primary for Redis has been completed."
+ if all_primaries_up
+ logger&.warn "Fallback to primary for Redis has been completed."
- begin
- RailsFailover::Redis.on_fallback_callback&.call
- rescue => e
- logger&.warn("RailsFailover::Redis.on_fallback_callback failed: #{e.class} #{e.message}\n#{e.backtrace.join("\n")}")
- end
-
- break
+ begin
+ RailsFailover::Redis.on_fallback_callback&.call
+ rescue => e
+ logger&.warn("RailsFailover::Redis.on_fallback_callback failed: #{e.class} #{e.message}\n#{e.backtrace.join("\n")}")
end
+ break
end
end
end
end
- def initiate_fallback_to_primary
+ def ensure_primary_clients_disconnected
+ mon_synchronize { primaries_down.dup }.each do |key, options|
+ disconnect_clients(options, RailsFailover::Redis::PRIMARY)
+ end
+ end
+
+ def try_fallback_to_primary
frequency = RailsFailover::Redis.verify_primary_frequency_seconds
sleep(frequency * ((rand(VERIFY_FREQUENCY_BUFFER_PRECENT) + 100) / 100.0))
active_primaries_keys = {}
@@ -84,11 +93,11 @@
end
end
active_primaries_keys.each do |key, options|
primary_up(options)
- disconnect_clients(options)
+ disconnect_clients(options, RailsFailover::Redis::REPLICA)
end
end
def register_client(client)
key = client.options[:id]
@@ -171,26 +180,51 @@
@primaries_down[process_pid]
end
end
- def disconnect_clients(options)
+ def disconnect_clients(options, role)
key = options[:id]
- mon_synchronize do
- if to_disconnect = clients[key].dup
- # Don't disconnect connections abruptly since it may lead to unexepcted
- # errors. Is there a better way to do this without having to monkey patch
- # the redis-rb gem heavily?
- ObjectSpace.each_object(::Redis).each do |redis|
- to_disconnect.each do |c|
- if redis._client == c
- redis.synchronize { |_client| _client.disconnect }
- end
- end
- end
+ matched_clients = mon_synchronize { clients[key].dup }
+ &.filter { |c| c.connection.rails_failover_role == role }
+ &.to_set
+
+ return if matched_clients.nil? || matched_clients.empty?
+
+ # This is not ideal, but the mutex we need is contained
+ # in the ::Redis instance, not the Redis::Client
+ ObjectSpace.each_object(::Redis) do |redis|
+ # When subscribed, Redis#_client is not a Redis::Client
+ # Instance variable is the only reliable way
+ client = redis.instance_variable_get(:@original_client)
+ next if !matched_clients.include?(client)
+ soft_disconnect(redis, client, role)
+ end
+ end
+
+ # Calling .disconnect can cause a running subscribe() to block forever
+ # Therefore try to acquire the lock
+ def soft_disconnect(redis, client, role)
+ has_lock = redis.mon_try_enter
+
+ if !has_lock
+ client.connection.shutdown_socket
+
+ waiting_since = Process.clock_gettime(Process::CLOCK_MONOTONIC)
+ loop do # Keep trying
+ break if has_lock = redis.mon_try_enter
+ break if !client.connection.connected? # Disconnected by other thread
+ break if client.connection.rails_failover_role != role # Reconnected by other thread
+ time_now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
+ break if time_now > waiting_since + SOFT_DISCONNECT_TIMEOUT_SECONDS
+ sleep SOFT_DISCONNECT_POLL_SECONDS
end
end
+
+ client.disconnect if client.connection&.rails_failover_role == role
+ ensure
+ redis.mon_exit if has_lock
end
def logger
RailsFailover::Redis.logger
end