Sha256: 5a4fc8cdf064fc254b7b4067c314019e4e38061b6c0106b92a3d5951baeabc55

Contents?: true

Size: 1.83 KB

Versions: 1

Compression:

Stored size: 1.83 KB

Contents

require 'socket'
require 'digest/md5'

class Remix::Stash::Cluster
  include Enumerable
  include Socket::Constants

  @@connections = {}

  attr_reader :hosts

  def initialize(hosts)
    @hosts = hosts.map {|x|
      host, port = x.split(':')
      [x, host, (port || 11211).to_i]
    }.sort_by {|(_,h,p)| [h,p]}
  end

  def each
    @hosts.each do |h|
      begin
        io = host_to_io(*h)
        break yield(io)
      rescue Errno::EPIPE, Errno::ECONNRESET, Remix::Stash::ProtocolError
        io.close
        retry
      rescue Errno::EAGAIN, Errno::ECONNREFUSED
        next
      end
    end
  end

  # Note: Later, I'd like to support richer cluster definitions.
  # This should do the trick for now... and it's fast.
  def select(key)
    count = @hosts.size
    hash = nil
    if count > 1
      digest = Digest::MD5.digest(key)
      hash = digest.unpack("L")[0]
    else
      hash = 0
    end
    count.times do |try|
      begin
        io = host_to_io(*@hosts[(hash + try) % count])
        return yield(io)
      rescue Errno::EPIPE, Errno::ECONNRESET, Remix::Stash::ProtocolError
        io.close
        retry
      rescue Errno::EAGAIN, Errno::ECONNREFUSED
        next
      end
    end
    raise Remix::Stash::ClusterError,
      "Unable to find suitable host to communicate with for #{key.inspect} (MD5-32=#{hash})"
  end

private

  def connect(host, port)
    socket = TCPSocket.new(host, port)
    set_timeout(socket, 2)
    socket
  end

  def host_to_io(key, host, port)
    socket = @@connections[key] ||= connect(host, port)
    return socket unless socket.closed?
    @@connections.delete(key)
    host_to_io(key, host, port)
  end

  def set_timeout(socket, seconds)
    timeout = [seconds, 0].pack('l_2') # 2 seconds
    socket.setsockopt(SOL_SOCKET, SO_SNDTIMEO, timeout)
    socket.setsockopt(SOL_SOCKET, SO_RCVTIMEO, timeout)
  end

end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
remix-stash-1.0.0 lib/remix/stash/cluster.rb