lib/rails_failover/redis/handler.rb in rails_failover-0.6.1 vs lib/rails_failover/redis/handler.rb in rails_failover-0.6.2

- old
+ new

@@ -1,9 +1,10 @@ # frozen_string_literal: true require 'monitor' require 'singleton' +require 'concurrent' module RailsFailover class Redis class Handler include Singleton @@ -14,55 +15,65 @@ VERIFY_FREQUENCY_BUFFER_PRECENT = 20 SOFT_DISCONNECT_TIMEOUT_SECONDS = 1 SOFT_DISCONNECT_POLL_SECONDS = 0.05 def initialize - @primaries_down = {} - @clients = {} - @ancestor_pid = Process.pid + @primaries_down = Concurrent::Map.new + @clients = Concurrent::Map.new super() # Monitor#initialize end def verify_primary(options) + primary_down(options) + mon_synchronize do - primary_down(options) - ensure_failover_thread_running + return if @thread&.alive? + logger&.warn "Failover for Redis has been initiated" + @thread = Thread.new { loop_until_all_up } end end - def ensure_failover_thread_running - return if @thread&.alive? + def register_client(client) + id = client.options[:id] + clients_for_id(id).put_if_absent(client, true) + end - logger&.warn "Failover for Redis has been initiated" + def deregister_client(client) + id = client.options[:id] + clients_for_id(id).delete(client) + end - @thread = Thread.new do - loop do - ensure_primary_clients_disconnected - try_fallback_to_primary + def primary_down?(options) + primaries_down[options[:id]] + end - if all_primaries_up - logger&.warn "Fallback to primary for Redis has been completed." - break - end - end - end + def primaries_down_count + primaries_down.size end - def ensure_primary_clients_disconnected - mon_synchronize { primaries_down.dup }.each do |key, options| - disconnect_clients(options, RailsFailover::Redis::PRIMARY) + private + + def loop_until_all_up + loop do + ensure_primary_clients_disconnected + try_fallback_to_primary + + if all_primaries_up + logger&.warn "Fallback to primary for Redis has been completed." + break + end end end def try_fallback_to_primary frequency = RailsFailover::Redis.verify_primary_frequency_seconds sleep(frequency * ((rand(VERIFY_FREQUENCY_BUFFER_PRECENT) + 100) / 100.0)) active_primaries_keys = {} - mon_synchronize { primaries_down.dup }.each do |key, options| + primaries_down.each do |key, options| info = nil options = options.dup begin options[:driver] = options[:original_driver] @@ -85,106 +96,61 @@ primary_up(options) disconnect_clients(options, RailsFailover::Redis::REPLICA) end end - def register_client(client) - key = client.options[:id] - - mon_synchronize do - clients[key] ||= [] - clients[key] << client - end - end - - def deregister_client(client) - key = client.options[:id] - - mon_synchronize do - if clients[key] - clients[key].delete(client) - - if clients[key].empty? - clients.delete(key) - end - end - end - end - - def primary_down?(options) - mon_synchronize do - primaries_down[options[:id]] - end - end - - def primaries_down_count - mon_synchronize do - primaries_down.count - end - end - - private - def all_primaries_up - mon_synchronize { primaries_down.empty? } + primaries_down.empty? end def primary_up(options) - already_up = mon_synchronize do - !primaries_down.delete(options[:id]) - end + already_up = !primaries_down.delete(options[:id]) RailsFailover::Redis.on_fallback_callback!(options[:id]) if !already_up end def primary_down(options) - already_down = false - mon_synchronize do - already_down = !!primaries_down[options[:id]] - primaries_down[options[:id]] = options.dup - end + already_down = primaries_down.put_if_absent(options[:id], options.dup) RailsFailover::Redis.on_failover_callback!(options[:id]) if !already_down end - def clients - process_pid = Process.pid - return @clients[process_pid] if @clients[process_pid] + def primaries_down + ancestor_pids = nil + value = @primaries_down.compute_if_absent(Process.pid) do + ancestor_pids = @primaries_down.keys + @primaries_down.values.first || Concurrent::Map.new + end - mon_synchronize do - if !@clients[process_pid] - @clients[process_pid] = {} + ancestor_pids&.each do |pid| + @primaries_down.delete(pid)&.each { |id, options| verify_primary(options) } + end - if process_pid != @ancestor_pid - @clients.delete(@ancestor_pid) - end - end + value + end - @clients[process_pid] + def clients_for_id(id) + clients.compute_if_absent(id) { Concurrent::Map.new } + end + + def clients + ancestor_pids = nil + clients_for_pid = @clients.compute_if_absent(Process.pid) do + ancestor_pids = @clients.keys + Concurrent::Map.new end + ancestor_pids&.each { |k| @clients.delete(k) } + clients_for_pid end - def primaries_down - process_pid = Process.pid - return @primaries_down[process_pid] if @primaries_down[process_pid] - - mon_synchronize do - if !@primaries_down[process_pid] - @primaries_down[process_pid] = @primaries_down[@ancestor_pid] || {} - - if process_pid != @ancestor_pid - @primaries_down.delete(@ancestor_pid)&.each do |id, options| - verify_primary(options) - end - end - end - - @primaries_down[process_pid] + def ensure_primary_clients_disconnected + primaries_down.each do |key, options| + disconnect_clients(options, RailsFailover::Redis::PRIMARY) end end def disconnect_clients(options, role) - key = options[:id] + id = options[:id] - matched_clients = mon_synchronize { clients[key].dup } + matched_clients = clients_for_id(id)&.keys &.filter { |c| c.connection.rails_failover_role == role } &.to_set return if matched_clients.nil? || matched_clients.empty?