lib/async/redis/sentinels.rb in async-redis-0.8.1 vs lib/async/redis/sentinels.rb in async-redis-0.9.0

- old
+ new

@@ -9,87 +9,94 @@ module Async module Redis class SentinelsClient < Client def initialize(master_name, sentinels, role = :master, protocol = Protocol::RESP2, **options) @master_name = master_name + @sentinel_endpoints = sentinels.map do |sentinel| ::IO::Endpoint.tcp(sentinel[:host], sentinel[:port]) end + @role = role - @protocol = protocol @pool = connect(**options) end - + private - + # Override the parent method. The only difference is that this one needs # to resolve the master/slave address. def connect(**options) Async::Pool::Controller.wrap(**options) do endpoint = resolve_address peer = endpoint.connect stream = ::IO::Stream(peer) - + @protocol.client(stream) end end - + def resolve_address - address = case @role - when :master then resolve_master - when :slave then resolve_slave - else raise ArgumentError, "Unknown instance role #{@role}" - end - + case @role + when :master + resolve_master + when :slave + resolve_slave + else + raise ArgumentError, "Unknown instance role #{@role}" + end => address + address or raise RuntimeError, "Unable to fetch #{@role} via Sentinel." end - + def resolve_master @sentinel_endpoints.each do |sentinel_endpoint| - client = Client.new(sentinel_endpoint) - + client = Client.new(sentinel_endpoint, protocol: @protocol) + begin address = client.call('sentinel', 'get-master-addr-by-name', @master_name) rescue Errno::ECONNREFUSED next end - + return ::IO::Endpoint.tcp(address[0], address[1]) if address end - + nil end - + def resolve_slave @sentinel_endpoints.each do |sentinel_endpoint| - client = Client.new(sentinel_endpoint) - + client = Client.new(sentinel_endpoint, protocol: @protocol) + begin reply = client.call('sentinel', 'slaves', @master_name) rescue Errno::ECONNREFUSED next end - + slaves = available_slaves(reply) next if slaves.empty? - + slave = select_slave(slaves) return ::IO::Endpoint.tcp(slave['ip'], slave['port']) end - + nil end - - def available_slaves(slaves_cmd_reply) + + def available_slaves(reply) # The reply is an array with the format: [field1, value1, field2, # value2, etc.]. # When a slave is marked as down by the sentinel, the "flags" field # (comma-separated array) contains the "s_down" value. - slaves_cmd_reply.map { |s| s.each_slice(2).to_h } - .reject { |s| s.fetch('flags').split(',').include?('s_down') } + slaves = reply.map{|fields| fields.each_slice(2).to_h} + + slaves.reject do |slave| + slave['flags'].split(',').include?('s_down') + end end - + def select_slave(available_slaves) available_slaves.sample end end end