lib/rails_failover/redis/handler.rb in rails_failover-0.4.0 vs lib/rails_failover/redis/handler.rb in rails_failover-0.5.0

- old
+ new

@@ -1,109 +1,199 @@ # frozen_string_literal: true require 'monitor' require 'singleton' +require 'digest' module RailsFailover class Redis class Handler include Singleton include MonitorMixin - MASTER_ROLE_STATUS = "role:master" - MASTER_LOADED_STATUS = "loading:0" + PRIMARY_ROLE_STATUS = "role:master" + PRIMARY_LOADED_STATUS = "loading:0" + VERIFY_FREQUENCY_BUFFER_PRECENT = 20 + SEPERATOR = "__RAILS_FAILOVER__" def initialize - @master = true - @clients = [] + @primaries_down = {} + @clients = {} + @ancestor_pid = Process.pid super() # Monitor#initialize end - def verify_master(options) + def verify_primary(options) mon_synchronize do + primary_down(options) + disconnect_clients(options) + return if @thread&.alive? - self.master = false - disconnect_clients - RailsFailover::Redis.master_down_callbacks.each { |callback| callback.call } + logger&.warn "Failover for Redis has been initiated" + begin + RailsFailover::Redis.on_failover_callback&.call + rescue => e + logger&.warn("RailsFailover::Redis.on_failover_callback failed: #{e.class} #{e.message}\n#{e.backtrace.join("\n")}") + end + @thread = Thread.new do loop do - thread = Thread.new { initiate_fallback_to_master(options) } + thread = Thread.new { initiate_fallback_to_primary } thread.join - break if self.master - sleep (RailsFailover::Redis.verify_master_frequency_seconds + (Time.now.to_i % RailsFailover::Redis.verify_master_frequency_seconds)) - ensure - thread.kill + + if all_primaries_up + logger&.warn "Fallback to primary for Redis has been completed." + + begin + RailsFailover::Redis.on_fallback_callback&.call + rescue => e + logger&.warn("RailsFailover::Redis.on_fallback_callback failed: #{e.class} #{e.message}\n#{e.backtrace.join("\n")}") + end + + break + end end end end end - def initiate_fallback_to_master(options) - info = nil + def initiate_fallback_to_primary + frequency = RailsFailover::Redis.verify_primary_frequency_seconds + sleep(frequency * ((rand(VERIFY_FREQUENCY_BUFFER_PRECENT) + 100) / 100.0)) - begin - master_client = ::Redis::Client.new(options.dup) - log "#{log_prefix}: Checking connection to master server..." - info = master_client.call([:info]) - rescue => e - log "#{log_prefix}: Connection to Master server failed with '#{e.message}'" - ensure - master_client&.disconnect + active_primaries_keys = {} + + primaries_down.each do |key, options| + info = nil + options = options.dup + + begin + options[:driver] = options[:original_driver] + primary_client = ::Redis::Client.new(options) + logger&.debug "Checking connection to primary server (#{key})" + info = primary_client.call([:info]) + rescue => e + logger&.debug "Connection to primary server (#{key}) failed with '#{e.message}'" + ensure + primary_client&.disconnect + end + + if info && info.include?(PRIMARY_LOADED_STATUS) && info.include?(PRIMARY_ROLE_STATUS) + active_primaries_keys[key] = options + logger&.debug "Primary server (#{key}) is active, disconnecting clients from replica" + end end - if info && info.include?(MASTER_LOADED_STATUS) && info.include?(MASTER_ROLE_STATUS) - self.master = true - log "#{log_prefix}: Master server is active, disconnecting clients from replica" - disconnect_clients - RailsFailover::Redis.master_up_callbacks.each { |callback| callback.call } + active_primaries_keys.each do |key, options| + primary_up(options) + disconnect_clients(options) end end def register_client(client) + key = client.options[:id] + mon_synchronize do - @clients << client + clients[key] ||= [] + clients[key] << client end end def deregister_client(client) + key = client.options[:id] + mon_synchronize do - @clients.delete(client) + if clients[key] + clients[key].delete(client) + + if clients[key].empty? + clients.delete(key) + end + end end end - def clients - mon_synchronize { @clients } + def primary_down?(options) + mon_synchronize do + primaries_down[options[:id]] + end end - def master - mon_synchronize { @master } + private + + def id_digest(id) + Digest::MD5.hexdigest(id) end - def master=(args) - mon_synchronize { @master = args } + def all_primaries_up + mon_synchronize { primaries_down.empty? } end - private + def primary_up(options) + mon_synchronize do + primaries_down.delete(options[:id]) + end + end - def disconnect_clients + def primary_down(options) mon_synchronize do - @clients.dup.each do |c| - c.disconnect + primaries_down[options[:id]] = options.dup + end + end + + def clients + process_pid = Process.pid + return @clients[process_pid] if @clients[process_pid] + + mon_synchronize do + if !@clients[process_pid] + @clients[process_pid] = {} + + if process_pid != @ancestor_pid + @clients.delete(@ancestor_pid) + end end + + @clients[process_pid] end end - def log(message) - if logger = RailsFailover::Redis.logger - logger.warn(message) + def primaries_down + process_pid = Process.pid + return @primaries_down[process_pid] if @primaries_down[process_pid] + + mon_synchronize do + if !@primaries_down[process_pid] + @primaries_down[process_pid] = @primaries_down[@ancestor_pid] || {} + + if process_pid != @ancestor_pid + @primaries_down.delete(@ancestor_pid).each do |id, options| + verify_primary(options) + end + end + end + + @primaries_down[process_pid] end end - def log_prefix - "#{self.class}" + def disconnect_clients(options) + key = options[:id] + + mon_synchronize do + if clients[key] + clients[key].dup.each do |c| + c.disconnect + end + end + end + end + + def logger + RailsFailover::Redis.logger end end end end