lib/rails_failover/redis/handler.rb in rails_failover-0.6.1 vs lib/rails_failover/redis/handler.rb in rails_failover-0.6.2
- old
+ new
@@ -1,9 +1,10 @@
# frozen_string_literal: true
require 'monitor'
require 'singleton'
+require 'concurrent'
module RailsFailover
class Redis
class Handler
include Singleton
@@ -14,55 +15,65 @@
VERIFY_FREQUENCY_BUFFER_PRECENT = 20
SOFT_DISCONNECT_TIMEOUT_SECONDS = 1
SOFT_DISCONNECT_POLL_SECONDS = 0.05
def initialize
- @primaries_down = {}
- @clients = {}
- @ancestor_pid = Process.pid
+ @primaries_down = Concurrent::Map.new
+ @clients = Concurrent::Map.new
super() # Monitor#initialize
end
def verify_primary(options)
+ primary_down(options)
+
mon_synchronize do
- primary_down(options)
- ensure_failover_thread_running
+ return if @thread&.alive?
+ logger&.warn "Failover for Redis has been initiated"
+ @thread = Thread.new { loop_until_all_up }
end
end
- def ensure_failover_thread_running
- return if @thread&.alive?
+ def register_client(client)
+ id = client.options[:id]
+ clients_for_id(id).put_if_absent(client, true)
+ end
- logger&.warn "Failover for Redis has been initiated"
+ def deregister_client(client)
+ id = client.options[:id]
+ clients_for_id(id).delete(client)
+ end
- @thread = Thread.new do
- loop do
- ensure_primary_clients_disconnected
- try_fallback_to_primary
+ def primary_down?(options)
+ primaries_down[options[:id]]
+ end
- if all_primaries_up
- logger&.warn "Fallback to primary for Redis has been completed."
- break
- end
- end
- end
+ def primaries_down_count
+ primaries_down.size
end
- def ensure_primary_clients_disconnected
- mon_synchronize { primaries_down.dup }.each do |key, options|
- disconnect_clients(options, RailsFailover::Redis::PRIMARY)
+ private
+
+ def loop_until_all_up
+ loop do
+ ensure_primary_clients_disconnected
+ try_fallback_to_primary
+
+ if all_primaries_up
+ logger&.warn "Fallback to primary for Redis has been completed."
+ break
+ end
end
end
def try_fallback_to_primary
frequency = RailsFailover::Redis.verify_primary_frequency_seconds
sleep(frequency * ((rand(VERIFY_FREQUENCY_BUFFER_PRECENT) + 100) / 100.0))
active_primaries_keys = {}
- mon_synchronize { primaries_down.dup }.each do |key, options|
+ primaries_down.each do |key, options|
info = nil
options = options.dup
begin
options[:driver] = options[:original_driver]
@@ -85,106 +96,61 @@
primary_up(options)
disconnect_clients(options, RailsFailover::Redis::REPLICA)
end
end
- def register_client(client)
- key = client.options[:id]
-
- mon_synchronize do
- clients[key] ||= []
- clients[key] << client
- end
- end
-
- def deregister_client(client)
- key = client.options[:id]
-
- mon_synchronize do
- if clients[key]
- clients[key].delete(client)
-
- if clients[key].empty?
- clients.delete(key)
- end
- end
- end
- end
-
- def primary_down?(options)
- mon_synchronize do
- primaries_down[options[:id]]
- end
- end
-
- def primaries_down_count
- mon_synchronize do
- primaries_down.count
- end
- end
-
- private
-
def all_primaries_up
- mon_synchronize { primaries_down.empty? }
+ primaries_down.empty?
end
def primary_up(options)
- already_up = mon_synchronize do
- !primaries_down.delete(options[:id])
- end
+ already_up = !primaries_down.delete(options[:id])
RailsFailover::Redis.on_fallback_callback!(options[:id]) if !already_up
end
def primary_down(options)
- already_down = false
- mon_synchronize do
- already_down = !!primaries_down[options[:id]]
- primaries_down[options[:id]] = options.dup
- end
+ already_down = primaries_down.put_if_absent(options[:id], options.dup)
RailsFailover::Redis.on_failover_callback!(options[:id]) if !already_down
end
- def clients
- process_pid = Process.pid
- return @clients[process_pid] if @clients[process_pid]
+ def primaries_down
+ ancestor_pids = nil
+ value = @primaries_down.compute_if_absent(Process.pid) do
+ ancestor_pids = @primaries_down.keys
+ @primaries_down.values.first || Concurrent::Map.new
+ end
- mon_synchronize do
- if !@clients[process_pid]
- @clients[process_pid] = {}
+ ancestor_pids&.each do |pid|
+ @primaries_down.delete(pid)&.each { |id, options| verify_primary(options) }
+ end
- if process_pid != @ancestor_pid
- @clients.delete(@ancestor_pid)
- end
- end
+ value
+ end
- @clients[process_pid]
+ def clients_for_id(id)
+ clients.compute_if_absent(id) { Concurrent::Map.new }
+ end
+
+ def clients
+ ancestor_pids = nil
+ clients_for_pid = @clients.compute_if_absent(Process.pid) do
+ ancestor_pids = @clients.keys
+ Concurrent::Map.new
end
+ ancestor_pids&.each { |k| @clients.delete(k) }
+ clients_for_pid
end
- def primaries_down
- process_pid = Process.pid
- return @primaries_down[process_pid] if @primaries_down[process_pid]
-
- mon_synchronize do
- if !@primaries_down[process_pid]
- @primaries_down[process_pid] = @primaries_down[@ancestor_pid] || {}
-
- if process_pid != @ancestor_pid
- @primaries_down.delete(@ancestor_pid)&.each do |id, options|
- verify_primary(options)
- end
- end
- end
-
- @primaries_down[process_pid]
+ def ensure_primary_clients_disconnected
+ primaries_down.each do |key, options|
+ disconnect_clients(options, RailsFailover::Redis::PRIMARY)
end
end
def disconnect_clients(options, role)
- key = options[:id]
+ id = options[:id]
- matched_clients = mon_synchronize { clients[key].dup }
+ matched_clients = clients_for_id(id)&.keys
&.filter { |c| c.connection.rails_failover_role == role }
&.to_set
return if matched_clients.nil? || matched_clients.empty?