lib/rails_failover/redis/handler.rb in rails_failover-0.4.0 vs lib/rails_failover/redis/handler.rb in rails_failover-0.5.0
- old
+ new
@@ -1,109 +1,199 @@
# frozen_string_literal: true
require 'monitor'
require 'singleton'
+require 'digest'
module RailsFailover
class Redis
class Handler
include Singleton
include MonitorMixin
- MASTER_ROLE_STATUS = "role:master"
- MASTER_LOADED_STATUS = "loading:0"
+ PRIMARY_ROLE_STATUS = "role:master"
+ PRIMARY_LOADED_STATUS = "loading:0"
+ VERIFY_FREQUENCY_BUFFER_PRECENT = 20
+ SEPERATOR = "__RAILS_FAILOVER__"
def initialize
- @master = true
- @clients = []
+ @primaries_down = {}
+ @clients = {}
+ @ancestor_pid = Process.pid
super() # Monitor#initialize
end
- def verify_master(options)
+ def verify_primary(options)
mon_synchronize do
+ primary_down(options)
+ disconnect_clients(options)
+
return if @thread&.alive?
- self.master = false
- disconnect_clients
- RailsFailover::Redis.master_down_callbacks.each { |callback| callback.call }
+ logger&.warn "Failover for Redis has been initiated"
+ begin
+ RailsFailover::Redis.on_failover_callback&.call
+ rescue => e
+ logger&.warn("RailsFailover::Redis.on_failover_callback failed: #{e.class} #{e.message}\n#{e.backtrace.join("\n")}")
+ end
+
@thread = Thread.new do
loop do
- thread = Thread.new { initiate_fallback_to_master(options) }
+ thread = Thread.new { initiate_fallback_to_primary }
thread.join
- break if self.master
- sleep (RailsFailover::Redis.verify_master_frequency_seconds + (Time.now.to_i % RailsFailover::Redis.verify_master_frequency_seconds))
- ensure
- thread.kill
+
+ if all_primaries_up
+ logger&.warn "Fallback to primary for Redis has been completed."
+
+ begin
+ RailsFailover::Redis.on_fallback_callback&.call
+ rescue => e
+ logger&.warn("RailsFailover::Redis.on_fallback_callback failed: #{e.class} #{e.message}\n#{e.backtrace.join("\n")}")
+ end
+
+ break
+ end
end
end
end
end
- def initiate_fallback_to_master(options)
- info = nil
+ def initiate_fallback_to_primary
+ frequency = RailsFailover::Redis.verify_primary_frequency_seconds
+ sleep(frequency * ((rand(VERIFY_FREQUENCY_BUFFER_PRECENT) + 100) / 100.0))
- begin
- master_client = ::Redis::Client.new(options.dup)
- log "#{log_prefix}: Checking connection to master server..."
- info = master_client.call([:info])
- rescue => e
- log "#{log_prefix}: Connection to Master server failed with '#{e.message}'"
- ensure
- master_client&.disconnect
+ active_primaries_keys = {}
+
+ primaries_down.each do |key, options|
+ info = nil
+ options = options.dup
+
+ begin
+ options[:driver] = options[:original_driver]
+ primary_client = ::Redis::Client.new(options)
+ logger&.debug "Checking connection to primary server (#{key})"
+ info = primary_client.call([:info])
+ rescue => e
+ logger&.debug "Connection to primary server (#{key}) failed with '#{e.message}'"
+ ensure
+ primary_client&.disconnect
+ end
+
+ if info && info.include?(PRIMARY_LOADED_STATUS) && info.include?(PRIMARY_ROLE_STATUS)
+ active_primaries_keys[key] = options
+ logger&.debug "Primary server (#{key}) is active, disconnecting clients from replica"
+ end
end
- if info && info.include?(MASTER_LOADED_STATUS) && info.include?(MASTER_ROLE_STATUS)
- self.master = true
- log "#{log_prefix}: Master server is active, disconnecting clients from replica"
- disconnect_clients
- RailsFailover::Redis.master_up_callbacks.each { |callback| callback.call }
+ active_primaries_keys.each do |key, options|
+ primary_up(options)
+ disconnect_clients(options)
end
end
def register_client(client)
+ key = client.options[:id]
+
mon_synchronize do
- @clients << client
+ clients[key] ||= []
+ clients[key] << client
end
end
def deregister_client(client)
+ key = client.options[:id]
+
mon_synchronize do
- @clients.delete(client)
+ if clients[key]
+ clients[key].delete(client)
+
+ if clients[key].empty?
+ clients.delete(key)
+ end
+ end
end
end
- def clients
- mon_synchronize { @clients }
+ def primary_down?(options)
+ mon_synchronize do
+ primaries_down[options[:id]]
+ end
end
- def master
- mon_synchronize { @master }
+ private
+
+ def id_digest(id)
+ Digest::MD5.hexdigest(id)
end
- def master=(args)
- mon_synchronize { @master = args }
+ def all_primaries_up
+ mon_synchronize { primaries_down.empty? }
end
- private
+ def primary_up(options)
+ mon_synchronize do
+ primaries_down.delete(options[:id])
+ end
+ end
- def disconnect_clients
+ def primary_down(options)
mon_synchronize do
- @clients.dup.each do |c|
- c.disconnect
+ primaries_down[options[:id]] = options.dup
+ end
+ end
+
+ def clients
+ process_pid = Process.pid
+ return @clients[process_pid] if @clients[process_pid]
+
+ mon_synchronize do
+ if !@clients[process_pid]
+ @clients[process_pid] = {}
+
+ if process_pid != @ancestor_pid
+ @clients.delete(@ancestor_pid)
+ end
end
+
+ @clients[process_pid]
end
end
- def log(message)
- if logger = RailsFailover::Redis.logger
- logger.warn(message)
+ def primaries_down
+ process_pid = Process.pid
+ return @primaries_down[process_pid] if @primaries_down[process_pid]
+
+ mon_synchronize do
+ if !@primaries_down[process_pid]
+ @primaries_down[process_pid] = @primaries_down[@ancestor_pid] || {}
+
+ if process_pid != @ancestor_pid
+ @primaries_down.delete(@ancestor_pid).each do |id, options|
+ verify_primary(options)
+ end
+ end
+ end
+
+ @primaries_down[process_pid]
end
end
- def log_prefix
- "#{self.class}"
+ def disconnect_clients(options)
+ key = options[:id]
+
+ mon_synchronize do
+ if clients[key]
+ clients[key].dup.each do |c|
+ c.disconnect
+ end
+ end
+ end
+ end
+
+ def logger
+ RailsFailover::Redis.logger
end
end
end
end