lib/async/redis/sentinels.rb in async-redis-0.8.1 vs lib/async/redis/sentinels.rb in async-redis-0.9.0
- old
+ new
@@ -9,87 +9,94 @@
module Async
module Redis
class SentinelsClient < Client
def initialize(master_name, sentinels, role = :master, protocol = Protocol::RESP2, **options)
@master_name = master_name
+
@sentinel_endpoints = sentinels.map do |sentinel|
::IO::Endpoint.tcp(sentinel[:host], sentinel[:port])
end
+
@role = role
-
@protocol = protocol
@pool = connect(**options)
end
-
+
private
-
+
# Override the parent method. The only difference is that this one needs
# to resolve the master/slave address.
def connect(**options)
Async::Pool::Controller.wrap(**options) do
endpoint = resolve_address
peer = endpoint.connect
stream = ::IO::Stream(peer)
-
+
@protocol.client(stream)
end
end
-
+
def resolve_address
- address = case @role
- when :master then resolve_master
- when :slave then resolve_slave
- else raise ArgumentError, "Unknown instance role #{@role}"
- end
-
+ case @role
+ when :master
+ resolve_master
+ when :slave
+ resolve_slave
+ else
+ raise ArgumentError, "Unknown instance role #{@role}"
+ end => address
+
address or raise RuntimeError, "Unable to fetch #{@role} via Sentinel."
end
-
+
def resolve_master
@sentinel_endpoints.each do |sentinel_endpoint|
- client = Client.new(sentinel_endpoint)
-
+ client = Client.new(sentinel_endpoint, protocol: @protocol)
+
begin
address = client.call('sentinel', 'get-master-addr-by-name', @master_name)
rescue Errno::ECONNREFUSED
next
end
-
+
return ::IO::Endpoint.tcp(address[0], address[1]) if address
end
-
+
nil
end
-
+
def resolve_slave
@sentinel_endpoints.each do |sentinel_endpoint|
- client = Client.new(sentinel_endpoint)
-
+ client = Client.new(sentinel_endpoint, protocol: @protocol)
+
begin
reply = client.call('sentinel', 'slaves', @master_name)
rescue Errno::ECONNREFUSED
next
end
-
+
slaves = available_slaves(reply)
next if slaves.empty?
-
+
slave = select_slave(slaves)
return ::IO::Endpoint.tcp(slave['ip'], slave['port'])
end
-
+
nil
end
-
- def available_slaves(slaves_cmd_reply)
+
+ def available_slaves(reply)
# The reply is an array with the format: [field1, value1, field2,
# value2, etc.].
# When a slave is marked as down by the sentinel, the "flags" field
# (comma-separated array) contains the "s_down" value.
- slaves_cmd_reply.map { |s| s.each_slice(2).to_h }
- .reject { |s| s.fetch('flags').split(',').include?('s_down') }
+ slaves = reply.map{|fields| fields.each_slice(2).to_h}
+
+ slaves.reject do |slave|
+ slave['flags'].split(',').include?('s_down')
+ end
end
-
+
def select_slave(available_slaves)
available_slaves.sample
end
end
end